# Python Rule Caching

## Overview

Caching allows previous detection runs to directly influence subsequent runs. Panther's real-time analysis engine examines events one-by-one, and sometimes it's helpful to keep state across invocations. Rules can cache values by using built-in helper functions. These helper functions interface with a Panther-hosted DynamoDB table. This feature is sometimes referred to as the "panther-kv-store."

Detections can store and retrieve arbitrary key-value pairs, allowing for state preservation between detection runs. Rather than using a [Scheduled Search](https://docs.panther.com/search/scheduled-searches) and Scheduled Rule, detections can instead gather and analyze event metadata in real-time.

If you'd like to read from your Panther-managed DynamoDB table, reach out to your Panther support team. You will be provisioned an Amazon Web Services (AWS) role with read-only permissions to DynamoDB.

{% hint style="warning" %}
Because using the cache adds significant latency to detection processing (which can have downstream effects like delayed alerts), it's recommended to:

* Only use the cache in detections for log types that ingest a small amount of data
* Write detections such that the cache is only used when absolutely necessary (see [Pitfall: Using the cache before it is necessary](#pitfall-using-the-cache-before-it-is-necessary))
  {% endhint %}

## Common use cases

* **Aggregating data from raw events, enrichment, external sources, and more**
  * Leveraging a cache allows detections to deduplicate and then aggregate data for use in later detection runs and/or alert context.
* **Correlating data from multiple events and/or log sources**
  * A single event may not provide much insight in isolation. However, a series of events can form a more complete picture that can be very useful.
  * Since the DynamoDB cache can be referenced in any detection run across all of Panther, it is possible to use the cache to widen the scope of detections considerably.
* **Risk-based alerting, User Entity and Behavior Analytics (UEBA)**
  * The DynamoDB cache can be used to monitor and score entities based on events that have come into Panther. This provides an abstraction layer around disparate events, enabling detections to track, score, and classify risky behavior.
  * Detections can incorporate scores on otherwise random event combinations without the use of explicit field-based logic.

{% hint style="warning" %}
While caching *can* be used to count events and generate an alert after some event threshold is met, it's recommended to accomplish this using the built-in [deduplication](https://docs.panther.com/detections/rules/..#deduplication-of-alerts) feature instead.
{% endhint %}

## Key-value pairs in DynamoDB

DynamoDB, which powers Panther's detection cache, is a fast and lightweight NoSQL key-value database. Panther has implemented a single DynamoDB table that powers detection caching.

All rows within DynamoDB are **key-value pairs**:

* **Key**: A unique identifier for the row (cannot be duplicated within a table)
* **Value**: Any data paired with a given key

Both keys and values can be generated in detection code.

{% hint style="info" %}
Values stored in DynamoDB can be up to 400 KB in size.
{% endhint %}

### Generating keys

All Panther detections share the same DynamoDB table as a cache. This provides benefits with cross-detection caching, but also requires you to choose keys that:

* **Can be programmatically generated at detection runtime**
  * The code used to generate the key is often placed into a function.
  * We recommend storing key generator functions in a [Global Helper](https://docs.panther.com/detections/rules/python/globals) to implement the same keys across multiple detections.
* **Leverage event values**
  * For example: IP addresses, usernames, hashes, IDs, ARNs.
* **Provide sufficient entropy and uniqueness within their intended scope**
  * A cache may be implemented within a single detection, or for multiple detections and Log Sources simultaneously.
  * When intending to use the same cache across multiple detections and Log Sources, you may need to leverage [Data Models](https://docs.panther.com/detections/rules/python/data-models) to create a common field value taxonomy.
* **Do not conflict with one another**
  * It is possible to erroneously overwrite key-value pairs, so keys need to be carefully constructed to prevent this.

{% hint style="info" %}
A cached value can be accessed across different detections using the same key.
{% endhint %}

## Cache helper functions in `panther_detection_helpers`

Panther maintains [`panther_detection_helpers`](https://pypi.org/project/panther-detection-helpers/), a pip package you can use in your detections.

To reference `panther_detection_helpers` in your detection files, add the following import statement:

```python
import panther_detection_helpers
```

You may also import specific functions with a statement like the following:

```python
from panther_detection_helpers.caching import get_dictionary
```

### Dictionaries

These Panther-provided helper functions allow detections to cache dictionaries:

* `get_dictionary`: Get the dictionary's current value
* `put_dictionary`: Overwrite a dictionary

Dictionaries are serialized and deserialized using the Python `json` library. Therefore, the dictionaries that are cached cannot contain:

* Sets
* Complex numbers or formulas
* Custom objects
* Keys that are not strings

#### Examples

Events can be cached natively, since they are always passed to detections as dictionaries:

```python
from panther_detection_helpers.caching import get_dictionary, put_dictionary


def rule(event):
    key = __name__ + ":" + event.get("username")

    # Retrieve previous event
    previous_event_data = get_dictionary(key)

    # Store current event
    put_dictionary(key, event)

    # If no previous event data, exit
    if not previous_event_data:
        return False

    # Compare the IP between previous and current events
    if event.get("ipAddress") != previous_event_data.get("ipAddress"):
        return True

    return False
```

It is also possible to construct dictionaries in code and cache those:

```python
from panther_base_helpers import deep_get
from panther_detection_helpers.caching import get_dictionary, put_dictionary


def store_login_info(key, event):
    # Map the user to the lon/lat and time of the most recent login
    put_dictionary(
        key,
        {
            "city": deep_get(event, "client", "geographicalContext", "city"),
            "lon": deep_get(event, "client", "geographicalContext", "geolocation", "lon"),
            "lat": deep_get(event, "client", "geographicalContext", "geolocation", "lat"),
            "time": event.get("p_event_time")
        }
    )
```

{% hint style="info" %}
This methodology can be extended to store highly complex data sets in DynamoDB.
{% endhint %}

### String sets

These Panther-provided helper functions allow detections to cache string sets:

* [`get_string_set`](https://docs.panther.com/detections/rules/globals#get_string_set): Get the string set's current value
* [`put_string_set`](https://docs.panther.com/detections/rules/globals#put_string_set): Overwrite a string set
* `add_to_string_set`: Add one or more strings to a set
* `remove_from_string_set`: Remove one or more strings from a set
* `reset_string_set`: Empty the set
* [`set_key_expiration`](#time-to-live-ttl): Set the lifetime of the string set

#### Example

The rule below provides a demonstration of caching string sets.

```python
from panther_detection_helpers.caching import add_to_string_set, get_string_set


def rule(event):
    if event['eventName'] != 'AssumeRole':
        return False

    role_arn = event['requestParameters'].get('roleArn')
    if not role_arn:
        return False

    role_arn_key = '{}-UniqueSourceIPs'.format(role_arn)
    ip_addr = event['sourceIPAddress']

    previously_seen_ips = get_string_set(role_arn_key)

    # If this the only value, trust on first use
    if len(previously_seen_ips) == 0:
        add_to_string_set(role_arn_key, ip_addr)
        return False

    if ip_addr not in previously_seen_ips:
        return True

    return False
```

### Counters

To implement a counter-based rule, use one or more of the following functions:

* `get_counter`: Get the latest counter value
* `increment_counter`: Add to the counter (default of 1)
* `reset_counter`: Reset the counter to 0
* `set_key_expiration`: Set the lifetime of the counter

#### Example

The rule below provides a demonstration of using counters.

```python
from panther_detection_helpers.caching import increment_counter, set_key_expiration, reset_counter


def rule(event):
    # Filter to only analyze AccessDenied calls
    if event.get('errorCode') != 'AccessDenied':
        return False

    # Create our counter key, which should be fairly unique
    key = '{}-AccessDeniedCounter'.format(event['userIdentity'].get('arn'))

    # Increment the counter, and then check the current value
    hourly_error_count = increment_counter(key)
    if hourly_error_count == 1:
        set_key_expiration(key, time.time() + 3600)
    elif failure_hourly_count >= 10:
    # If it exceeds our threshold, reset and then return an alert
        reset_counter(key)
        return True
    return False
```

## Using timestamps to track state

A common use case for a DynamoDB cache is to track groups of events in a given period of time. Since all key-value pairs must be generated in code, timestamp tracking is not provided unless provided in values.

Detection writers should consider storing `p_event_time` when aggregating events.

{% hint style="info" %}
Timestamps should not used in keys, since they are very rarely reproducible across an unpredictable series of event logs.
{% endhint %}

### Time to Live

Time to Live (TTL) lets you set an expiration timestamp on items in your cache. This automatic deletion can be useful as a deduplication strategy, as well as for efficient data cleanup. The default TTL for all cache entries is 90 days, but it's possible to [configure your own TTL value](#setting-the-ttl).

{% hint style="info" %}
The TTL is associated with a single cache key, regardless of the data type of the associated value. For example, if `add_to_string_set()` is called, then the TTL of the entire string set is reset to the value passed in `epoch_seconds` (or the default of 90 days if no value is passed).
{% endhint %}

#### Setting the TTL

It's possible to override the 90-day default TTL by using either:

* The `epoch_seconds` parameter available in caching helper functions that write to the cache, such as `put_string_set()` and `increment_counter()`
* The `set_key_expiration()` function

Both `epoch_seconds` and `set_key_expiration()` define a timestamp at which an entry should be deleted. These functions are available in [`panther_detection_helpers`](https://pypi.org/project/panther-detection-helpers/).

{% hint style="warning" %}
If you are not passing in a value for `epoch_seconds`, be sure to call `set_key_expiration()` after all functions taking in `epoch_seconds.`If a function taking in `epoch_seconds` is called after `set_key_expiration()` and no value is provided for `epoch_seconds`, the TTL will be reset to the default, 90 days.
{% endhint %}

To generate an expiration timestamp, take the unix timestamp associated with the event time, via `event.event_time_epoch()`, and add a given number of seconds. After the resulting timestamp passes, the row is automatically deleted within 48 hours.

{% hint style="info" %}
It's recommended to use the event time (`p_event_time)` rather than the processing time (`p_parse_time` or `datetime.datetime.now()`) as the base for the TTL to account for any delays in processing events, as well as to ensure that old events, such as those found in unit tests, do not clutter the cache.
{% endhint %}

#### Examples

Example from Panther's `Geographically Improbable Okta Login` detection using `epoch_seconds`:

```python
# Expire the entry after a week so the table doesn't fill up with past users
put_string_set(
    key,
    [
        dumps(
            {
                "city": deep_get(event, "client", "geographicalContext", "city"),
                "lon": deep_get(event, "client", "geographicalContext", "geolocation", "lon"),
                "lat": deep_get(event, "client", "geographicalContext", "geolocation", "lat"),
                "time": event.get("p_event_time"),
            }
        )
    ],
    epoch_seconds=event.event_time_epoch() + timedelta(days=7).total_seconds(),
)
```

The same example using `set_key_expiration()`:

```python
# Expire the entry after a week so the table doesn't fill up with past users
put_string_set(
    key,
    [
        dumps(
            {
                "city": deep_get(event, "client", "geographicalContext", "city"),
                "lon": deep_get(event, "client", "geographicalContext", "geolocation", "lon"),
                "lat": deep_get(event, "client", "geographicalContext", "geolocation", "lat"),
                "time": event.get("p_event_time"),
            }
        )
    ]
)
set_key_expiration(key, event.event_time_epoch() + timedelta(days=7).total_seconds())
```

## Testing

The dependency on DynamoDB for a cache means that special considerations need to be made to test and validate detection code:

### Testing in the Panther Console

* Unit Test invocations will communicate with DynamoDB unless the applicable functions are overridden with mocks.
* The data sent and received to/from DynamoDB can be committed to `alert_context()` for debugging in Unit Test results.
* It is not possible to browse the raw contents of DynamoDB.

### Testing with CLI workflows

* Detections in Panther leverage an AWS IAM Role to communicate with DynamoDB.
  * When using the [panther\_analysis\_tool](https://docs.panther.com/panther-developer-workflows/detections-repo/pat) to execute Unit Tests locally or as part of a CI/CD workflow, this IAM Role is not accessible.
  * It is not possible to interact with the DynamoDB cache outside the context of the Panther Console, and so testing needs to simulate inputs and outputs.
* To accommodate CI/CD workflows, we recommend mocking any functions that interact with DynamoDB to simulate the expected outputs.
  * See Panther's documentation for [more information about Mocks](https://docs.panther.com/testing#mocks).

## Common pitfalls while using the cache

### Pitfall: Using the cache before it is necessary

When writing detections, it is important to only call the kv-store when you need to, and not before. For example, consider the following detection that checks to see if we see a bad actor twice:

{% code title="BAD EXAMPLE" %}

```python
from panther_detection_helpers.caching import reset_string_set, get_string_set, add_to_string_set

def rule(event):
    bad_guys = get_string_set('BadGuys') # <-- #1
    bad_guy = event.get('BadGuyName')
    
    if event.get('eventType') == 'BadGuyDetected':
        add_to_string_set('BadGuys', bad_guy) # <-- #2
        
        if bad_guy in bad_guys:
            # repeat bad guy, alert
            reset_string_set('BadGuys')
            return True

    return False
```

{% endcode %}

There are two places where this detection can be significantly improved:

1. It gets the `BadGuys` string set before checking if this is a `BadGuyDetected` event. If it is *not* a bad guy event, there is no need to fetch the string set. This call adds latency to every detection run even though it is not always needed.
2. It adds the new `bad_guy` to the string set before checking if this is a repeat bad guy. If it is a repeat bad guy, we will alert and reset the set, so there is no need to add to the string set.

After making these changes, the detection looks like:

```python
from panther_detection_helpers.caching import reset_string_set, get_string_set, add_to_string_set

def rule(event):   
    if event.get('eventType') == 'BadGuyDetected':
        bad_guy = event.get('BadGuyName')
        bad_guys = get_string_set('BadGuys') # <-- #1

        if bad_guy in bad_guys:
            # repeat bad guy, alert
            reset_string_set('BadGuys')
            return True
            
        add_to_string_set('BadGuys', bad_guy) # <-- #2
        
    return False
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.panther.com/detections/rules/python/caching.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
