Python Rule Caching
Caching allows previous detection runs to directly influence subsequent runs. Panther's real-time analysis engine examines events one-by-one, and sometimes it's helpful to keep state across invocations. Rules can cache values by using built-in helper functions. These helper functions interface with a Panther-hosted DynamoDB table. This feature is sometimes referred to as the "panther-kv-store."
Detections can store and retrieve arbitrary key-value pairs, allowing for state preservation between detection runs. Rather than using a Scheduled Query and Scheduled Rule, detections can instead gather and analyze event metadata in real-time.
If you'd like to read from your Panther-managed DynamoDB table, reach out to your Panther support team. You will be provisioned an Amazon Web Services (AWS) role with read-only permissions to DynamoDB.
- Aggregating data from raw events, enrichment, external sources, and more
- Leveraging a cache allows detections to deduplicate and then aggregate data for use in later detection runs and/or alert context.
- Correlating data from multiple events and/or log sources
- A single event may not provide much insight in isolation. However, a series of events can form a more complete picture that can be very useful.
- Since the DynamoDB cache can be referenced in any detection run across all of Panther, it is possible to use the cache to widen the scope of detections considerably.
- Risk-based alerting, User Entity and Behavior Analytics (UEBA)
- The DynamoDB cache can be used to monitor and score entities based on events that have come into Panther. This provides an abstraction layer around disparate events, enabling detections to track, score, and classify risky behavior.
- Detections can incorporate scores on otherwise random event combinations without the use of explicit field-based logic.
DynamoDB, which powers Panther's detection cache, is a fast and lightweight NoSQL key-value database. Panther has implemented a single DynamoDB table that powers detection caching.
All rows within DynamoDB are key-value pairs:
- Key: A unique identifier for the row (cannot be duplicated within a table)
- Value: Any data paired with a given key
Both keys and values can be generated in detection code.
Values stored in DynamoDB can be up to 400 KB in size.
All Panther detections share the same DynamoDB table as a cache. This provides benefits with cross-detection caching, but also requires you to choose keys that:
- Can be programmatically generated at detection runtime
- The code used to generate the key is often placed into a function.
- We recommend storing key generator functions in a Global Helper to implement the same keys across multiple detections.
- Leverage event values
- For example: IP addresses, usernames, hashes, IDs, ARNs.
- Provide sufficient entropy and uniqueness within their intended scope
- A cache may be implemented within a single detection, or for multiple detections and Log Sources simultaneously.
- When intending to use the same cache across multiple detections and Log Sources, you may need to leverage Data Models to create a common field value taxonomy.
- Do not conflict with one another
- It is possible to erroneously overwrite key-value pairs, so keys need to be carefully constructed to prevent this.
A cached value can be accessed across different detections using the same key.
Run the following command to import the open source helper library:
import panther_oss_helpers
You may also import specific functions:
from panther_oss_helpers import get_dictionary
These Panther-provided helper functions allow detections to cache dictionaries:
get_dictionary
: Get the dictionary's current valueput_dictionary
: Overwrite a dictionary
Dictionaries are serialized and deserialized using the Python
json
library. Therefore, the dictionaries that are cached cannot contain:- Sets
- Complex numbers or formulas
- Custom objects
- Keys that are not strings
Events can be cached natively, since they are always passed to detections as dictionaries:
from panther_oss_helpers import get_dictionary, put_dictionary
def rule(event):
key = __name__ + ":" + event.get("username")
# Retrieve previous event
previous_event_data = get_dictionary(key)
# Store current event
put_dictionary(key, event)
# If no previous event data, exit
if not previous_event_data:
return False
# Compare the IP between previous and current events
if event.get("ipAddress") != previous_event_data.get("ipAddress"):
return True
return False
It is also possible to construct dictionaries in code and cache those:
from panther_base_helpers import deep_get
from panther_oss_helpers import get_dictionary, put_dictionary
def store_login_info(key, event):
# Map the user to the lon/lat and time of the most recent login
put_dictionary(
key,
{
"city": deep_get(event, "client", "geographicalContext", "city"),
"lon": deep_get(event, "client", "geographicalContext", "geolocation", "lon"),
"lat": deep_get(event, "client", "geographicalContext", "geolocation", "lat"),
"time": event.get("p_event_time")
}
)
This methodology can be extended to store highly complex data sets in DynamoDB.
These Panther-provided helper functions allow detections to cache string sets:
get_string_set
: Get the string set's current valueput_string_set
: Overwrite a string setadd_to_string_set
: Add one or more strings to a setremove_from_string_set
: Remove one or more strings from a setreset_string_set
: Empty the setset_key_expiration
: Set the lifetime of the string set
The rule below provides a demonstration of caching string sets.
from panther_oss_helpers import add_to_string_set, get_string_set
def rule(event):
if event['eventName'] != 'AssumeRole':
return False
role_arn = event['requestParameters'].get('roleArn')
if not role_arn:
return False
role_arn_key = '{}-UniqueSourceIPs'.format(role_arn)
ip_addr = event['sourceIPAddress']
previously_seen_ips = get_string_set(role_arn_key)
# If this the only value, trust on first use
if len(previously_seen_ips) == 0:
add_to_string_set(role_arn_key, ip_addr)
return False
if ip_addr not in previously_seen_ips:
return True
return False
To implement a counter-based rule, use one or more of the following functions:
get_counter
: Get the latest counter valueincrement_counter
: Add to the counter (default of 1)reset_counter
: Reset the counter to 0set_key_expiration
: Set the lifetime of the counter
The rule below provides a demonstration of using counters.
from panther_oss_helpers import increment_counter, set_key_expiration, reset_counter
def rule(event):
# Filter to only analyze AccessDenied calls
if event.get('errorCode') != 'AccessDenied':
return False
# Create our counter key, which should be fairly unique
key = '{}-AccessDeniedCounter'.format(event['userIdentity'].get('arn'))
# Increment the counter, and then check the current value
hourly_error_count = increment_counter(key)
if hourly_error_count == 1:
set_key_expiration(key, time.time() + 3600)
elif failure_hourly_count >= 10:
# If it exceeds our threshold, reset and then return an alert
reset_counter(key)
return True
return False
A common use case for a DynamoDB cache is to track groups of events in a given period of time. Since all key-value pairs must be generated in code, timestamp tracking is not provided unless provided in values.
Detection writers should consider storing
p_event_time
when aggregating events.Tip: Timestamps should not used in keys, since they are very rarely reproducible across an unpredictable series of event logs.
Time to Live (TTL) allows rows in Panther's DynamoDB cache to be automatically deleted at a given timestamp. It can be useful as a deduplication strategy, or it can provide for efficient data cleanup.
- An expiration date can optionally be defined for each row using
set_key_expiration()
- To generate an expiration timestamp, take the current unix timestamp at runtime and add a given number of seconds.
- After the resulting timestamp passes, the row is automatically deleted within 48 hours.
Example from Panther's
Geographically Improbable Okta Login
detection:# Expire the entry after a week so the table doesn't fill up with past users
set_key_expiration(key, str((datetime.now() + timedelta(days=7)).timestamp()))
The dependency on DynamoDB for a cache means that special considerations need to be made to test and validate detection code:
- Unit Test invocations will communicate with DynamoDB unless the applicable functions are overridden with mocks.
- The data sent and received to/from DynamoDB can be committed to
alert_context()
for debugging in Unit Test results. - It is not possible to browse the raw contents of DynamoDB.
- Detections in Panther leverage an AWS IAM Role to communicate with DynamoDB.
- When using the panther_analysis_tool to execute Unit Tests locally or as part of a CI/CD workflow, this IAM Role is not accessible.
- It is not possible to interact with the DynamoDB cache outside the context of the Panther Console, and so testing needs to simulate inputs and outputs.
- To accommodate CI/CD workflows, we recommend mocking any functions that interact with DynamoDB to simulate the expected outputs.
Last modified 3d ago