Python Rule Caching

Overview

Caching allows previous detection runs to directly influence subsequent runs. Panther's real-time analysis engine examines events one-by-one, and sometimes it's helpful to keep state across invocations. Rules can cache values by using built-in helper functions. These helper functions interface with a Panther-hosted DynamoDB table. This feature is sometimes referred to as the "panther-kv-store."

Detections can store and retrieve arbitrary key-value pairs, allowing for state preservation between detection runs. Rather than using a Scheduled Search and Scheduled Rule, detections can instead gather and analyze event metadata in real-time.

If you'd like to read from your Panther-managed DynamoDB table, reach out to your Panther support team. You will be provisioned an Amazon Web Services (AWS) role with read-only permissions to DynamoDB.

Because using the cache adds significant latency to detection processing (which can have downstream effects like delayed alerts), it's recommended to:

Common use cases

  • Aggregating data from raw events, enrichment, external sources, and more

    • Leveraging a cache allows detections to deduplicate and then aggregate data for use in later detection runs and/or alert context.

  • Correlating data from multiple events and/or log sources

    • A single event may not provide much insight in isolation. However, a series of events can form a more complete picture that can be very useful.

    • Since the DynamoDB cache can be referenced in any detection run across all of Panther, it is possible to use the cache to widen the scope of detections considerably.

  • Risk-based alerting, User Entity and Behavior Analytics (UEBA)

    • The DynamoDB cache can be used to monitor and score entities based on events that have come into Panther. This provides an abstraction layer around disparate events, enabling detections to track, score, and classify risky behavior.

    • Detections can incorporate scores on otherwise random event combinations without the use of explicit field-based logic.

While caching can be used to count events and generate an alert after some event threshold is met, it's recommended to accomplish this using the built-in deduplication feature instead.

Key-value pairs in DynamoDB

DynamoDB, which powers Panther's detection cache, is a fast and lightweight NoSQL key-value database. Panther has implemented a single DynamoDB table that powers detection caching.

All rows within DynamoDB are key-value pairs:

  • Key: A unique identifier for the row (cannot be duplicated within a table)

  • Value: Any data paired with a given key

Both keys and values can be generated in detection code.

Values stored in DynamoDB can be up to 400 KB in size.

Generating keys

All Panther detections share the same DynamoDB table as a cache. This provides benefits with cross-detection caching, but also requires you to choose keys that:

  • Can be programmatically generated at detection runtime

    • The code used to generate the key is often placed into a function.

    • We recommend storing key generator functions in a Global Helper to implement the same keys across multiple detections.

  • Leverage event values

    • For example: IP addresses, usernames, hashes, IDs, ARNs.

  • Provide sufficient entropy and uniqueness within their intended scope

    • A cache may be implemented within a single detection, or for multiple detections and Log Sources simultaneously.

    • When intending to use the same cache across multiple detections and Log Sources, you may need to leverage Data Models to create a common field value taxonomy.

  • Do not conflict with one another

    • It is possible to erroneously overwrite key-value pairs, so keys need to be carefully constructed to prevent this.

A cached value can be accessed across different detections using the same key.

Cache helper functions in panther_detection_helpers

Panther maintains panther_detection_helpers, a pip package you can use in your detections.

To reference panther_detection_helpers in your detection files, add the following import statement:

import panther_detection_helpers

You may also import specific functions with a statement like the following:

from panther_detection_helpers.caching import get_dictionary

Dictionaries

These Panther-provided helper functions allow detections to cache dictionaries:

  • get_dictionary: Get the dictionary's current value

  • put_dictionary: Overwrite a dictionary

Dictionaries are serialized and deserialized using the Python json library. Therefore, the dictionaries that are cached cannot contain:

  • Sets

  • Complex numbers or formulas

  • Custom objects

  • Keys that are not strings

Examples

Events can be cached natively, since they are always passed to detections as dictionaries:

from panther_detection_helpers.caching import get_dictionary, put_dictionary


def rule(event):
    key = __name__ + ":" + event.get("username")

    # Retrieve previous event
    previous_event_data = get_dictionary(key)

    # Store current event
    put_dictionary(key, event)

    # If no previous event data, exit
    if not previous_event_data:
        return False

    # Compare the IP between previous and current events
    if event.get("ipAddress") != previous_event_data.get("ipAddress"):
        return True

    return False

It is also possible to construct dictionaries in code and cache those:

from panther_base_helpers import deep_get
from panther_detection_helpers.caching import get_dictionary, put_dictionary


def store_login_info(key, event):
    # Map the user to the lon/lat and time of the most recent login
    put_dictionary(
        key,
        {
            "city": deep_get(event, "client", "geographicalContext", "city"),
            "lon": deep_get(event, "client", "geographicalContext", "geolocation", "lon"),
            "lat": deep_get(event, "client", "geographicalContext", "geolocation", "lat"),
            "time": event.get("p_event_time")
        }
    )

This methodology can be extended to store highly complex data sets in DynamoDB.

String sets

These Panther-provided helper functions allow detections to cache string sets:

  • get_string_set: Get the string set's current value

  • put_string_set: Overwrite a string set

  • add_to_string_set: Add one or more strings to a set

  • remove_from_string_set: Remove one or more strings from a set

  • reset_string_set: Empty the set

  • set_key_expiration: Set the lifetime of the string set

Example

The rule below provides a demonstration of caching string sets.

from panther_detection_helpers.caching import add_to_string_set, get_string_set


def rule(event):
    if event['eventName'] != 'AssumeRole':
        return False

    role_arn = event['requestParameters'].get('roleArn')
    if not role_arn:
        return False

    role_arn_key = '{}-UniqueSourceIPs'.format(role_arn)
    ip_addr = event['sourceIPAddress']

    previously_seen_ips = get_string_set(role_arn_key)

    # If this the only value, trust on first use
    if len(previously_seen_ips) == 0:
        add_to_string_set(role_arn_key, ip_addr)
        return False

    if ip_addr not in previously_seen_ips:
        return True

    return False

Counters

To implement a counter-based rule, use one or more of the following functions:

  • get_counter: Get the latest counter value

  • increment_counter: Add to the counter (default of 1)

  • reset_counter: Reset the counter to 0

  • set_key_expiration: Set the lifetime of the counter

Example

The rule below provides a demonstration of using counters.

from panther_detection_helpers.caching import increment_counter, set_key_expiration, reset_counter


def rule(event):
    # Filter to only analyze AccessDenied calls
    if event.get('errorCode') != 'AccessDenied':
        return False

    # Create our counter key, which should be fairly unique
    key = '{}-AccessDeniedCounter'.format(event['userIdentity'].get('arn'))

    # Increment the counter, and then check the current value
    hourly_error_count = increment_counter(key)
    if hourly_error_count == 1:
        set_key_expiration(key, time.time() + 3600)
    elif failure_hourly_count >= 10:
    # If it exceeds our threshold, reset and then return an alert
        reset_counter(key)
        return True
    return False

Using timestamps to track state

A common use case for a DynamoDB cache is to track groups of events in a given period of time. Since all key-value pairs must be generated in code, timestamp tracking is not provided unless provided in values.

Detection writers should consider storing p_event_time when aggregating events.

Timestamps should not used in keys, since they are very rarely reproducible across an unpredictable series of event logs.

Time to Live

Time to Live (TTL) lets you set an expiration timestamp on items in your cache. This automatic deletion can be useful as a deduplication strategy, as well as for efficient data cleanup. The default TTL for all cache entries is 90 days, but it's possible to configure your own TTL value.

The TTL is associated with a single cache key, regardless of the data type of the associated value. For example, if add_to_string_set() is called, then the TTL of the entire string set is reset to the value passed in epoch_seconds (or the default of 90 days if no value is passed).

Setting the TTL

It's possible to override the 90-day default TTL by using either:

  • The epoch_seconds parameter available in caching helper functions that write to the cache, such as put_string_set() and increment_counter()

  • The set_key_expiration() function

Both epoch_seconds and set_key_expiration() define a timestamp at which an entry should be deleted. These functions are available in panther_detection_helpers.

If you are not passing in a value for epoch_seconds, be sure to call set_key_expiration() after all functions taking in epoch_seconds.If a function taking in epoch_seconds is called after set_key_expiration() and no value is provided for epoch_seconds, the TTL will be reset to the default, 90 days.

To generate an expiration timestamp, take the unix timestamp associated with the event time, via event.event_time_epoch(), and add a given number of seconds. After the resulting timestamp passes, the row is automatically deleted within 48 hours.

It's recommended to use the event time (p_event_time) rather than the processing time (p_parse_time or datetime.datetime.now()) as the base for the TTL to account for any delays in processing events, as well as to ensure that old events, such as those found in unit tests, do not clutter the cache.

Examples

Example from Panther's Geographically Improbable Okta Login detection using epoch_seconds:

# Expire the entry after a week so the table doesn't fill up with past users
put_string_set(
    key,
    [
        dumps(
            {
                "city": deep_get(event, "client", "geographicalContext", "city"),
                "lon": deep_get(event, "client", "geographicalContext", "geolocation", "lon"),
                "lat": deep_get(event, "client", "geographicalContext", "geolocation", "lat"),
                "time": event.get("p_event_time"),
            }
        )
    ],
    epoch_seconds=event.event_time_epoch() + timedelta(days=7).total_seconds(),
)

The same example using set_key_expiration():

# Expire the entry after a week so the table doesn't fill up with past users
put_string_set(
    key,
    [
        dumps(
            {
                "city": deep_get(event, "client", "geographicalContext", "city"),
                "lon": deep_get(event, "client", "geographicalContext", "geolocation", "lon"),
                "lat": deep_get(event, "client", "geographicalContext", "geolocation", "lat"),
                "time": event.get("p_event_time"),
            }
        )
    ]
)
set_key_expiration(key, event.event_time_epoch() + timedelta(days=7).total_seconds())

Testing

The dependency on DynamoDB for a cache means that special considerations need to be made to test and validate detection code:

Testing in the Panther Console

  • Unit Test invocations will communicate with DynamoDB unless the applicable functions are overridden with mocks.

  • The data sent and received to/from DynamoDB can be committed to alert_context() for debugging in Unit Test results.

  • It is not possible to browse the raw contents of DynamoDB.

Testing with CLI workflows

  • Detections in Panther leverage an AWS IAM Role to communicate with DynamoDB.

    • When using the panther_analysis_tool to execute Unit Tests locally or as part of a CI/CD workflow, this IAM Role is not accessible.

    • It is not possible to interact with the DynamoDB cache outside the context of the Panther Console, and so testing needs to simulate inputs and outputs.

  • To accommodate CI/CD workflows, we recommend mocking any functions that interact with DynamoDB to simulate the expected outputs.

Common pitfalls while using the cache

Pitfall: Using the cache before it is necessary

When writing detections, it is important to only call the kv-store when you need to, and not before. For example, consider the following detection that checks to see if we see a bad actor twice:

BAD EXAMPLE
from panther_detection_helpers.caching import reset_string_set, get_string_set, add_to_string_set

def rule(event):
    bad_guys = get_string_set('BadGuys') # <-- #1
    bad_guy = event.get('BadGuyName')
    
    if event.get('eventType') == 'BadGuyDetected':
        add_to_string_set('BadGuys', bad_guy) # <-- #2
        
        if bad_guy in bad_guys:
            # repeat bad guy, alert
            reset_string_set('BadGuys')
            return True

    return False

There are two places where this detection can be significantly improved:

  1. It gets the BadGuys string set before checking if this is a BadGuyDetected event. If it is not a bad guy event, there is no need to fetch the string set. This call adds latency to every detection run even though it is not always needed.

  2. It adds the new bad_guy to the string set before checking if this is a repeat bad guy. If it is a repeat bad guy, we will alert and reset the set, so there is no need to add to the string set.

After making these changes, the detection looks like:

from panther_detection_helpers.caching import reset_string_set, get_string_set, add_to_string_set

def rule(event):   
    if event.get('eventType') == 'BadGuyDetected':
        bad_guy = event.get('BadGuyName')
        bad_guys = get_string_set('BadGuys') # <-- #1

        if bad_guy in bad_guys:
            # repeat bad guy, alert
            reset_string_set('BadGuys')
            return True
            
        add_to_string_set('BadGuys', bad_guy) # <-- #2
        
    return False

Last updated