# Python 룰 캐싱

## 개요

캐싱을 통해 이전 디택션 실행이 이후 실행에 직접 영향을 줄 수 있습니다. Panther의 실시간 분석 엔진은 이벤트를 하나씩 검사하며 때때로 호출 간 상태를 유지하는 것이 유용합니다. 룰은 내장 헬퍼 함수를 사용하여 값을 캐시할 수 있습니다. 이러한 헬퍼 함수는 Panther가 호스팅하는 DynamoDB 테이블과 인터페이스합니다. 이 기능은 때때로 "panther-kv-store"라고 불립니다.

디텍션은 임의의 키-값 쌍을 저장하고 검색할 수 있어 디텍션 실행 간 상태 보존이 가능합니다. 다음을 사용하는 대신 [스케줄된 검색](https://docs.panther.com/ko/search/scheduled-searches) 및 스케줄된 룰, 디텍션은 대신 실시간으로 이벤트 메타데이터를 수집하고 분석할 수 있습니다.

Panther가 관리하는 DynamoDB 테이블에서 읽기를 원하면 Panther 지원팀에 문의하세요. DynamoDB에 대한 읽기 전용 권한이 있는 AWS 역할이 할당됩니다.

{% hint style="warning" %}
캐시 사용은 디텍션 처리에 상당한 지연을 추가하므로(이는 경보 지연과 같은 하류 영향이 있을 수 있음) 다음을 권장합니다:

* 데이터 양이 적게 수집되는 로그 유형의 디텍션에서만 캐시를 사용하세요
* 캐시는 절대적으로 필요한 경우에만 사용되도록 디텍션을 작성하세요(자세한 내용은 [함수가 필요하기 전에 캐시를 사용하는 함정](#pitfall-using-the-cache-before-it-is-necessary))
  {% endhint %}

## 일반적인 사용 사례

* **원시 이벤트, 인리치먼트, 외부 소스 등에서 데이터 집계**
  * 캐시를 활용하면 디텍션이 중복을 제거한 다음 나중 디텍션 실행 및/또는 경보 컨텍스트에서 사용할 수 있도록 데이터를 집계할 수 있습니다.
* **여러 이벤트 및/또는 로그 소스의 데이터 상관관계**
  * 단일 이벤트는 고립된 상태에서는 많은 통찰을 제공하지 않을 수 있습니다. 그러나 일련의 이벤트는 보다 완전한 그림을 형성할 수 있어 매우 유용할 수 있습니다.
  * DynamoDB 캐시는 Panther 전반의 모든 디텍션 실행에서 참조될 수 있으므로 캐시를 사용하여 디텍션의 범위를 상당히 넓힐 수 있습니다.
* **리스크 기반 알림, 사용자 엔터티 및 행동 분석(UEBA)**
  * DynamoDB 캐시는 Panther에 들어온 이벤트를 기반으로 엔터티를 모니터링하고 점수를 매기는 데 사용할 수 있습니다. 이는 다양한 이벤트에 대한 추상화 계층을 제공하여 디텍션이 위험한 행동을 추적하고 점수화하며 분류할 수 있게 합니다.
  * 디텍션은 명시적인 필드 기반 로직 없이도 무작위 이벤트 조합에 점수를 통합할 수 있습니다.

{% hint style="warning" %}
캐싱 *는* 이벤트를 카운트하고 일정 이벤트 임계값이 충족되면 경보를 생성하는 데 사용할 수 있지만, 대신 내장된 [중복 제거](https://docs.panther.com/ko/detections/rules/..#deduplication-of-alerts) 기능을 사용하는 것이 권장됩니다.
{% endhint %}

## DynamoDB의 키-값 쌍

Panther의 디텍션 캐시를 지원하는 DynamoDB는 빠르고 가벼운 NoSQL 키-값 데이터베이스입니다. Panther는 디텍션 캐싱을 지원하는 단일 DynamoDB 테이블을 구현했습니다.

DynamoDB 내의 모든 행은 **키-값 쌍**:

* **키**: 행의 고유 식별자(테이블 내에서 중복될 수 없음)
* **값**: 주어진 키와 쌍을 이루는 모든 데이터

키와 값은 모두 디텍션 코드에서 생성될 수 있습니다.

{% hint style="info" %}
DynamoDB에 저장된 값은 최대 400KB까지 가능합니다.
{% endhint %}

### 키 생성

모든 Panther 디텍션은 캐시로 동일한 DynamoDB 테이블을 공유합니다. 이는 교차 디텍션 캐싱의 이점을 제공하지만 다음과 같은 키 선택을 요구합니다:

* **디텍션 런타임에 프로그래밍적으로 생성될 수 있어야 함**
  * 키를 생성하는 데 사용되는 코드는 종종 함수에 배치됩니다.
  * 동일한 키를 여러 디텍션에서 구현하려면 키 생성기 함수를 [글로벌 헬퍼](https://docs.panther.com/ko/detections/rules/python/globals) 에 저장하는 것을 권장합니다.
* **이벤트 값을 활용**
  * 예: IP 주소, 사용자 이름, 해시, ID, ARN.
* **의도된 범위 내에서 충분한 엔트로피와 고유성을 제공**
  * 캐시는 단일 디텍션 내에서 구현되거나 여러 디텍션 및 로그 소스에 대해 동시에 구현될 수 있습니다.
  * 여러 디텍션 및 로그 소스에서 동일한 캐시를 사용하려는 경우, 공통 필드 값 분류 체계를 만들기 위해 [데이터 모델](https://docs.panther.com/ko/detections/rules/python/data-models) 를 활용해야 할 수 있습니다.
* **서로 충돌하지 않음**
  * 키-값 쌍을 잘못 덮어쓸 수 있으므로 이를 방지하기 위해 키를 신중하게 구성해야 합니다.

{% hint style="info" %}
캐시된 값은 동일한 키를 사용하여 다른 디텍션에서 액세스할 수 있습니다.
{% endhint %}

## 캐시 헬퍼 함수는 `panther_detection_helpers`

Panther는 [`panther_detection_helpers`](https://pypi.org/project/panther-detection-helpers/)라는 pip 패키지를 유지 관리하며 디텍션에서 사용할 수 있습니다.

디텍션 파일에서 `panther_detection_helpers` 를 참조하려면 다음 import 문을 추가하세요:

```python
import panther_detection_helpers
```

다음과 같은 문장으로 특정 함수를 가져올 수도 있습니다:

```python
from panther_detection_helpers.caching import get_dictionary
```

### 사전

이러한 Panther 제공 헬퍼 함수는 디텍션이 사전을 캐시할 수 있게 합니다:

* `get_dictionary`: 사전의 현재 값을 가져옵니다
* `put_dictionary`: 사전을 덮어씁니다

사전은 Python의 `json 형으로 분류됩니다.` 라이브러리를 사용하여 직렬화 및 역직렬화됩니다. 따라서 캐시된 사전은 다음을 포함할 수 없습니다:

* 셋(집합)
* 복소수 또는 수식
* 커스텀 객체
* 문자열이 아닌 키

#### 예제

이벤트는 항상 디텍션에 사전으로 전달되므로 네이티브로 캐시할 수 있습니다:

```python
from panther_detection_helpers.caching import get_dictionary, put_dictionary


def rule(event):
    key = __name__ + ":" + event.get("username")

    # 이전 이벤트를 검색
    previous_event_data = get_dictionary(key)

    # 현재 이벤트 저장
    put_dictionary(key, event)

    # 이전 이벤트 데이터가 없으면 종료
    if not previous_event_data:
        return False

    # 이전 이벤트와 현재 이벤트 간의 IP를 비교
    if event.get("ipAddress") != previous_event_data.get("ipAddress"):
        return True

    return False
```

코드에서 사전을 구성하고 이를 캐시하는 것도 가능합니다:

```python
from panther_base_helpers import deep_get
from panther_detection_helpers.caching import get_dictionary, put_dictionary


def store_login_info(key, event):
    # 사용자를 최신 로그인 위치(경도/위도) 및 시간에 매핑
    put_dictionary(
        key,
        {
            "city": deep_get(event, "client", "geographicalContext", "city"),
            "lon": deep_get(event, "client", "geographicalContext", "geolocation", "lon"),
            "lat": deep_get(event, "client", "geographicalContext", "geolocation", "lat"),
            "time": event.get("p_event_time")
        }
    )
```

{% hint style="info" %}
이 방법론은 DynamoDB에 매우 복잡한 데이터 세트를 저장하도록 확장될 수 있습니다.
{% endhint %}

### 문자열 집합

이러한 Panther 제공 헬퍼 함수는 디텍션이 문자열 집합을 캐시할 수 있게 합니다:

* [`get_string_set`](https://docs.panther.com/ko/detections/rules/globals#get_string_set): 문자열 집합의 현재 값을 가져옵니다
* [`put_string_set`](https://docs.panther.com/ko/detections/rules/globals#put_string_set): 문자열 집합을 덮어씁니다
* `add_to_string_set`: 하나 이상의 문자열을 집합에 추가합니다
* `remove_from_string_set`: 하나 이상의 문자열을 집합에서 제거합니다
* `reset_string_set`: 집합을 비웁니다
* [`set_key_expiration`](#time-to-live-ttl): 문자열 집합의 수명을 설정합니다

#### 예시

아래 룰은 문자열 집합 캐싱의 데모를 제공합니다.

```python
from panther_detection_helpers.caching import add_to_string_set, get_string_set


def rule(event):
    if event['eventName'] != 'AssumeRole':
        return False

    role_arn = event['requestParameters'].get('roleArn')
    if not role_arn:
        return False

    role_arn_key = '{}-UniqueSourceIPs'.format(role_arn)
    ip_addr = event['sourceIPAddress']

    previously_seen_ips = get_string_set(role_arn_key)

    # 이것이 유일한 값이라면, 최초 사용 시 신뢰
    if len(previously_seen_ips) == 0:
        add_to_string_set(role_arn_key, ip_addr)
        return False

    if ip_addr not in previously_seen_ips:
        return True

    return False
```

### 카운터

카운터 기반 룰을 구현하려면 다음 함수 중 하나 이상을 사용하세요:

* `get_counter`: 최신 카운터 값을 가져옵니다
* `increment_counter`: 카운터에 값을 더합니다(기본값 1)
* `reset_counter`: 카운터를 0으로 재설정합니다
* `set_key_expiration`: 카운터의 수명을 설정합니다

#### 예시

아래 룰은 카운터 사용의 예시를 제공합니다.

```python
from panther_detection_helpers.caching import increment_counter, set_key_expiration, reset_counter


def rule(event):
    # AccessDenied 호출만 분석하도록 필터
    if event.get('errorCode') != 'AccessDenied':
        return False

    # 상당히 고유해야 하는 카운터 키 생성
    key = '{}-AccessDeniedCounter'.format(event['userIdentity'].get('arn'))

    # 카운터를 증가시키고 현재 값을 확인
    hourly_error_count = increment_counter(key)
    if hourly_error_count == 1:
        set_key_expiration(key, time.time() + 3600)
    elif failure_hourly_count >= 10:
    # 임계값을 초과하면 재설정한 후 경고 반환
        reset_counter(key)
        return True
    return False
```

## 타임스탬프를 사용한 상태 추적

DynamoDB 캐시의 일반적인 사용 사례는 주어진 기간 내의 이벤트 그룹을 추적하는 것입니다. 모든 키-값 쌍은 코드에서 생성되어야 하므로, 값에 제공되지 않는 한 타임스탬프 추적은 제공되지 않습니다.

디텍션 작성자는 이벤트를 집계할 때 `p_event_time` 를 저장하는 것을 고려해야 합니다.

{% hint style="info" %}
타임스탬프는 매우 드물게 예측 불가능한 일련의 이벤트 로그에서 재현 가능하므로 키에 사용하면 안 됩니다.
{% endhint %}

### 수명(Time to Live)

수명( TTL )을 사용하면 캐시 항목에 만료 타임스탬프를 설정할 수 있습니다. 이 자동 삭제는 중복 제거 전략과 효율적인 데이터 정리에 유용합니다. 모든 캐시 항목의 기본 TTL은 90일이지만, [자체 TTL 값을 구성할 수 있습니다](#setting-the-ttl).

{% hint style="info" %}
TTL은 관련 값의 데이터 유형과 관계없이 단일 캐시 키에 연관됩니다. 예를 들어, `add_to_string_set()` 가 호출되면 전체 문자열 집합의 TTL이 전달된 `epoch_seconds` (또는 값이 전달되지 않으면 기본 90일)로 재설정됩니다.
{% endhint %}

#### TTL 설정

90일 기본 TTL은 다음 중 하나를 사용하여 재정의할 수 있습니다:

* 사용자를 사용할 것이며, `epoch_seconds` put\_string\_set()와 같은 캐시에 쓰는 헬퍼 함수에서 사용할 수 있는 매개변수 `put_string_set()` 와 `increment_counter()`
* 사용자를 사용할 것이며, `set_key_expiration()` function

둘 다 `epoch_seconds` 와 `set_key_expiration()` 항목을 삭제해야 하는 타임스탬프를 정의합니다. 이러한 함수는 [`panther_detection_helpers`](https://pypi.org/project/panther-detection-helpers/).

{% hint style="warning" %}
에서 제공됩니다. `epoch_seconds`만약 당신이 `set_key_expiration()` 에 값을 전달하지 않는다면, `를 호출하는 것을 잊지 마세요`모든 epoch\_seconds를 받는 함수들 이후에. `epoch_seconds` 만약 epoch\_seconds를 받는 함수가 `set_key_expiration()` 다음 호출 이후에 호출되고 `epoch_seconds`에 대한 값이 제공되지 않으면, TTL은 기본값인 90일로 재설정됩니다.
{% endhint %}

만료 타임스탬프를 생성하려면 이벤트 시간과 관련된 유닉스 타임스탬프를 가져와서 `event.event_time_epoch()`에 주어진 초 수를 더하세요. 결과 타임스탬프가 지나면 해당 행은 48시간 이내에 자동으로 삭제됩니다.

{% hint style="info" %}
처리 지연을 고려하고 단위 테스트에서 발견되는 오래된 이벤트가 캐시를 어지럽히지 않도록 TTL의 기준으로 처리 시간(`p_event_time)` 대신 이벤트 시간(`p_parse_time` 이전에 생성한 Snowflake 사용자 이름, 예를 들면 `datetime.datetime.now()`)을 사용하는 것이 권장됩니다.
{% endhint %}

#### 예제

Panther의 예시에서 `지리적으로 불가능한 Okta 로그인` 디텍션은 다음을 사용합니다 `epoch_seconds`:

```python
# 테이블이 과거 사용자로 가득 차지 않도록 항목을 일주일 후에 만료시키기
put_string_set(
    key,
    [
        dumps(
            {
                "city": deep_get(event, "client", "geographicalContext", "city"),
                "lon": deep_get(event, "client", "geographicalContext", "geolocation", "lon"),
                "lat": deep_get(event, "client", "geographicalContext", "geolocation", "lat"),
                "time": event.get("p_event_time"),
            }
        )
    ],
    epoch_seconds=event.event_time_epoch() + timedelta(days=7).total_seconds(),
)
```

동일한 예시를 사용하는 경우 `set_key_expiration()`:

```python
# 테이블이 과거 사용자로 가득 차지 않도록 항목을 일주일 후에 만료시키기
put_string_set(
    key,
    [
        dumps(
            {
                "city": deep_get(event, "client", "geographicalContext", "city"),
                "lon": deep_get(event, "client", "geographicalContext", "geolocation", "lon"),
                "lat": deep_get(event, "client", "geographicalContext", "geolocation", "lat"),
                "time": event.get("p_event_time"),
            }
        )
    ]
)
set_key_expiration(key, event.event_time_epoch() + timedelta(days=7).total_seconds())
```

## 테스트

캐시용 DynamoDB에 대한 의존성으로 인해 디텍션 코드를 테스트하고 검증할 때 특별한 고려가 필요합니다:

### Panther 콘솔에서의 테스트

* 단위 테스트 호출은 해당 함수들이 모킹으로 대체되지 않는 한 DynamoDB와 통신합니다.
* DynamoDB로 송수신되는 데이터는 단위 테스트 결과에서 디버깅을 위해 `alert_context()` 에 커밋될 수 있습니다.
* DynamoDB의 원시 내용을 탐색하는 것은 불가능합니다.

### CLI 워크플로우로 테스트하기

* Panther의 디텍션은 DynamoDB와 통신하기 위해 AWS IAM 역할을 활용합니다.
  * 다음을 사용할 때 [panther\_analysis\_tool](https://docs.panther.com/ko/panther/detections-repo/pat) 로컬에서 또는 CI/CD 워크플로우의 일부로 단위 테스트를 실행하면 이 IAM 역할에 접근할 수 없습니다.
  * Panther 콘솔의 컨텍스트 외부에서 DynamoDB 캐시와 상호작용할 수 없으므로 테스트는 입력과 출력을 시뮬레이션해야 합니다.
* CI/CD 워크플로우를 수용하기 위해 DynamoDB와 상호작용하는 모든 함수를 모킹하여 예상 출력을 시뮬레이트할 것을 권장합니다.
  * 모킹에 대한 자세한 정보는 Panther 문서를 참조하세요 [모킹에 대한 추가 정보](https://docs.panther.com/ko/testing#mocks).

## 캐시 사용 시의 일반적인 함정

### 함수가 필요하기 전에 캐시를 사용하는 함정

디텍션을 작성할 때는 kv-store를 필요할 때만 호출하고 그 이전에는 호출하지 않는 것이 중요합니다. 예를 들어, 동일한 악성 행위를 두 번 목격하는지를 확인하는 다음 디텍션을 고려해 보세요:

{% code title="나쁜 예시" %}

```python
from panther_detection_helpers.caching import reset_string_set, get_string_set, add_to_string_set

def rule(event):
    bad_guys = get_string_set('BadGuys') # <-- #1
    bad_guy = event.get('BadGuyName')
    
    if event.get('eventType') == 'BadGuyDetected':
        add_to_string_set('BadGuys', bad_guy) # <-- #2
        
        if bad_guy in bad_guys:
            # 반복되는 악성 행위자, 경고
            reset_string_set('BadGuys')
            return True

    return False
```

{% endcode %}

이 디텍션은 다음 두 가지 점에서 크게 개선될 수 있습니다:

1. 다음을 가져옵니다: `BadGuys` 문자열 집합을 `BadGuyDetected` 이벤트인지 확인하기 전에. 만약 이것이 *Enterprise 조직* 나쁜 행위자 이벤트라면, 문자열 집합을 가져올 필요가 없습니다. 이 호출은 항상 필요하지 않음에도 모든 디텍션 실행에 지연을 추가합니다.
2. 새로운 `bad_guy` 를 반복 악성 행위자인지 확인하기 전에 문자열 집합에 추가합니다. 만약 반복 악성 행위자라면 우리는 경고를 발생시키고 집합을 재설정할 것이므로 문자열 집합에 추가할 필요가 없습니다.

이 변경을 적용한 후 디텍션은 다음과 같이 보입니다:

```python
from panther_detection_helpers.caching import reset_string_set, get_string_set, add_to_string_set

def rule(event):   
    if event.get('eventType') == 'BadGuyDetected':
        bad_guy = event.get('BadGuyName')
        bad_guys = get_string_set('BadGuys') # <-- #1

        if bad_guy in bad_guys:
            # 반복되는 악성 행위자, 경고
            reset_string_set('BadGuys')
            return True
            
        add_to_string_set('BadGuys', bad_guy) # <-- #2
        
    return False
```
