LogoLogo
Knowledge BaseCommunityRelease NotesRequest Demo
  • Overview
  • Quick Start
    • Onboarding Guide
  • Data Sources & Transports
    • Supported Logs
      • 1Password Logs
      • Apache Logs
      • AppOmni Logs
      • Asana Logs
      • Atlassian Logs
      • Auditd Logs
      • Auth0 Logs
      • AWS Logs
        • AWS ALB
        • AWS Aurora
        • AWS CloudFront
        • AWS CloudTrail
        • AWS CloudWatch
        • AWS Config
        • AWS EKS
        • AWS GuardDuty
        • AWS Security Hub
        • Amazon Security Lake
        • AWS S3
        • AWS Transit Gateway
        • AWS VPC
        • AWS WAF
      • Azure Monitor Logs
      • Bitwarden Logs
      • Box Logs
      • Carbon Black Logs
      • Cisco Umbrella Logs
      • Cloudflare Logs
      • CrowdStrike Logs
        • CrowdStrike Falcon Data Replicator
        • CrowdStrike Event Streams
      • Docker Logs
      • Dropbox Logs
      • Duo Security Logs
      • Envoy Logs
      • Fastly Logs
      • Fluentd Logs
      • GCP Logs
      • GitHub Logs
      • GitLab Logs
      • Google Workspace Logs
      • Heroku Logs
      • Jamf Pro Logs
      • Juniper Logs
      • Lacework Logs
        • Lacework Alert Channel Webhook
        • Lacework Export
      • Material Security Logs
      • Microsoft 365 Logs
      • Microsoft Entra ID Audit Logs
      • Microsoft Graph Logs
      • MongoDB Atlas Logs
      • Netskope Logs
      • Nginx Logs
      • Notion Logs
      • Okta Logs
      • OneLogin Logs
      • Orca Security Logs (Beta)
      • Osquery Logs
      • OSSEC Logs
      • Proofpoint Logs
      • Push Security Logs
      • Rapid7 Logs
      • Salesforce Logs
      • SentinelOne Logs
      • Slack Logs
      • Snowflake Audit Logs (Beta)
      • Snyk Logs
      • Sophos Logs
      • Sublime Security Logs
      • Suricata Logs
      • Sysdig Logs
      • Syslog Logs
      • Tailscale Logs
      • Teleport Logs
      • Tenable Vulnerability Management Logs
      • Thinkst Canary Logs
      • Tines Logs
      • Tracebit Logs
      • Windows Event Logs
      • Wiz Logs
      • Zeek Logs
      • Zendesk Logs
      • Zoom Logs
      • Zscaler Logs
        • Zscaler ZIA
        • Zscaler ZPA
    • Custom Logs
      • Log Schema Reference
      • Transformations
      • Script Log Parser (Beta)
      • Fastmatch Log Parser
      • Regex Log Parser
      • CSV Log Parser
    • Data Transports
      • HTTP Source
      • AWS Sources
        • S3 Source
        • CloudWatch Logs Source
        • SQS Source
          • SNS Source
        • EventBridge
      • Google Cloud Sources
        • Cloud Storage (GCS) Source
        • Pub/Sub Source
      • Azure Blob Storage Source
    • Monitoring Log Sources
    • Ingestion Filters
      • Raw Event Filters
      • Normalized Event Filters (Beta)
    • Data Pipeline Tools
      • Chronosphere Onboarding Guide
      • Cribl Onboarding Guide
      • Fluent Bit Onboarding Guide
        • Fluent Bit Configuration Examples
      • Fluentd Onboarding Guide
        • General log forwarding via Fluentd
        • MacOS System Logs to S3 via Fluentd
        • Syslog to S3 via Fluentd
        • Windows Event Logs to S3 via Fluentd (Legacy)
        • GCP Audit to S3 via Fluentd
      • Observo Onboarding Guide
      • Tarsal Onboarding Guide
    • Tech Partner Log Source Integrations
  • Detections
    • Using Panther-managed Detections
      • Detection Packs
    • Rules and Scheduled Rules
      • Writing Python Detections
        • Python Rule Caching
        • Data Models
        • Global Helper Functions
      • Modifying Detections with Inline Filters (Beta)
      • Derived Detections (Beta)
        • Using Derived Detections to Avoid Merge Conflicts
      • Using the Simple Detection Builder
      • Writing Simple Detections
        • Simple Detection Match Expression Reference
        • Simple Detection Error Codes
    • Correlation Rules (Beta)
      • Correlation Rule Reference
    • PyPanther Detections (Beta)
      • Creating PyPanther Detections
      • Registering, Testing, and Uploading PyPanther Detections
      • Managing PyPanther Detections in the Panther Console
      • PyPanther Detections Style Guide
      • pypanther Library Reference
      • Using the pypanther Command Line Tool
    • Signals
    • Policies
    • Testing
      • Data Replay (Beta)
    • Framework Mapping and MITRE ATT&CK® Matrix
  • Cloud Security Scanning
    • Cloud Resource Attributes
      • AWS
        • ACM Certificate
        • CloudFormation Stack
        • CloudWatch Log Group
        • CloudTrail
        • CloudTrail Meta
        • Config Recorder
        • Config Recorder Meta
        • DynamoDB Table
        • EC2 AMI
        • EC2 Instance
        • EC2 Network ACL
        • EC2 SecurityGroup
        • EC2 Volume
        • EC2 VPC
        • ECS Cluster
        • EKS Cluster
        • ELBV2 Application Load Balancer
        • GuardDuty Detector
        • GuardDuty Detector Meta
        • IAM Group
        • IAM Policy
        • IAM Role
        • IAM Root User
        • IAM User
        • KMS Key
        • Lambda Function
        • Password Policy
        • RDS Instance
        • Redshift Cluster
        • Route 53 Domains
        • Route 53 Hosted Zone
        • S3 Bucket
        • WAF Web ACL
  • Alerts & Destinations
    • Alert Destinations
      • Amazon SNS Destination
      • Amazon SQS Destination
      • Asana Destination
      • Blink Ops Destination
      • Custom Webhook Destination
      • Discord Destination
      • GitHub Destination
      • Google Pub/Sub Destination (Beta)
      • Incident.io Destination
      • Jira Cloud Destination
      • Jira Data Center Destination (Beta)
      • Microsoft Teams Destination
      • Mindflow Destination
      • OpsGenie Destination
      • PagerDuty Destination
      • Rapid7 Destination
      • ServiceNow Destination (Custom Webhook)
      • Slack Bot Destination
      • Slack Destination (Webhook)
      • Splunk Destination (Beta)
      • Tines Destination
      • Torq Destination
    • Assigning and Managing Alerts
      • Managing Alerts in Slack
    • Alert Runbooks
      • Panther-managed Policies Runbooks
        • AWS CloudTrail Is Enabled In All Regions
        • AWS CloudTrail Sending To CloudWatch Logs
        • AWS KMS CMK Key Rotation Is Enabled
        • AWS Application Load Balancer Has Web ACL
        • AWS Access Keys Are Used Every 90 Days
        • AWS Access Keys are Rotated Every 90 Days
        • AWS ACM Certificate Is Not Expired
        • AWS Access Keys not Created During Account Creation
        • AWS CloudTrail Has Log Validation Enabled
        • AWS CloudTrail S3 Bucket Has Access Logging Enabled
        • AWS CloudTrail Logs S3 Bucket Not Publicly Accessible
        • AWS Config Is Enabled for Global Resources
        • AWS DynamoDB Table Has Autoscaling Targets Configured
        • AWS DynamoDB Table Has Autoscaling Enabled
        • AWS DynamoDB Table Has Encryption Enabled
        • AWS EC2 AMI Launched on Approved Host
        • AWS EC2 AMI Launched on Approved Instance Type
        • AWS EC2 AMI Launched With Approved Tenancy
        • AWS EC2 Instance Has Detailed Monitoring Enabled
        • AWS EC2 Instance Is EBS Optimized
        • AWS EC2 Instance Running on Approved AMI
        • AWS EC2 Instance Running on Approved Instance Type
        • AWS EC2 Instance Running in Approved VPC
        • AWS EC2 Instance Running On Approved Host
        • AWS EC2 Instance Running With Approved Tenancy
        • AWS EC2 Instance Volumes Are Encrypted
        • AWS EC2 Volume Is Encrypted
        • AWS GuardDuty is Logging to a Master Account
        • AWS GuardDuty Is Enabled
        • AWS IAM Group Has Users
        • AWS IAM Policy Blocklist Is Respected
        • AWS IAM Policy Does Not Grant Full Administrative Privileges
        • AWS IAM Policy Is Not Assigned Directly To User
        • AWS IAM Policy Role Mapping Is Respected
        • AWS IAM User Has MFA Enabled
        • AWS IAM Password Used Every 90 Days
        • AWS Password Policy Enforces Complexity Guidelines
        • AWS Password Policy Enforces Password Age Limit Of 90 Days Or Less
        • AWS Password Policy Prevents Password Reuse
        • AWS RDS Instance Is Not Publicly Accessible
        • AWS RDS Instance Snapshots Are Not Publicly Accessible
        • AWS RDS Instance Has Storage Encrypted
        • AWS RDS Instance Has Backups Enabled
        • AWS RDS Instance Has High Availability Configured
        • AWS Redshift Cluster Allows Version Upgrades
        • AWS Redshift Cluster Has Encryption Enabled
        • AWS Redshift Cluster Has Logging Enabled
        • AWS Redshift Cluster Has Correct Preferred Maintenance Window
        • AWS Redshift Cluster Has Sufficient Snapshot Retention Period
        • AWS Resource Has Minimum Number of Tags
        • AWS Resource Has Required Tags
        • AWS Root Account Has MFA Enabled
        • AWS Root Account Does Not Have Access Keys
        • AWS S3 Bucket Name Has No Periods
        • AWS S3 Bucket Not Publicly Readable
        • AWS S3 Bucket Not Publicly Writeable
        • AWS S3 Bucket Policy Does Not Use Allow With Not Principal
        • AWS S3 Bucket Policy Enforces Secure Access
        • AWS S3 Bucket Policy Restricts Allowed Actions
        • AWS S3 Bucket Policy Restricts Principal
        • AWS S3 Bucket Has Versioning Enabled
        • AWS S3 Bucket Has Encryption Enabled
        • AWS S3 Bucket Lifecycle Configuration Expires Data
        • AWS S3 Bucket Has Logging Enabled
        • AWS S3 Bucket Has MFA Delete Enabled
        • AWS S3 Bucket Has Public Access Block Enabled
        • AWS Security Group Restricts Ingress On Administrative Ports
        • AWS VPC Default Security Group Restricts All Traffic
        • AWS VPC Flow Logging Enabled
        • AWS WAF Has Correct Rule Ordering
        • AWS CloudTrail Logs Encrypted Using KMS CMK
      • Panther-managed Rules Runbooks
        • AWS CloudTrail Modified
        • AWS Config Service Modified
        • AWS Console Login Failed
        • AWS Console Login Without MFA
        • AWS EC2 Gateway Modified
        • AWS EC2 Network ACL Modified
        • AWS EC2 Route Table Modified
        • AWS EC2 SecurityGroup Modified
        • AWS EC2 VPC Modified
        • AWS IAM Policy Modified
        • AWS KMS CMK Loss
        • AWS Root Activity
        • AWS S3 Bucket Policy Modified
        • AWS Unauthorized API Call
    • Tech Partner Alert Destination Integrations
  • Investigations & Search
    • Search
      • Search Filter Operators
    • Data Explorer
      • Data Explorer SQL Search Examples
        • CloudTrail logs queries
        • GitHub Audit logs queries
        • GuardDuty logs queries
        • Nginx and ALB Access logs queries
        • Okta logs queries
        • S3 Access logs queries
        • VPC logs queries
    • Visualization and Dashboards
      • Custom Dashboards (Beta)
      • Panther-Managed Dashboards
    • Standard Fields
    • Saved and Scheduled Searches
      • Templated Searches
        • Behavioral Analytics and Anomaly Detection Template Macros (Beta)
      • Scheduled Search Examples
    • Search History
    • Data Lakes
      • Snowflake
        • Snowflake Configuration for Optimal Search Performance
      • Athena
  • PantherFlow (Beta)
    • PantherFlow Quick Reference
    • PantherFlow Statements
    • PantherFlow Operators
      • Datatable Operator
      • Extend Operator
      • Join Operator
      • Limit Operator
      • Project Operator
      • Range Operator
      • Sort Operator
      • Search Operator
      • Summarize Operator
      • Union Operator
      • Visualize Operator
      • Where Operator
    • PantherFlow Data Types
    • PantherFlow Expressions
    • PantherFlow Functions
      • Aggregation Functions
      • Date/time Functions
      • String Functions
      • Array Functions
      • Math Functions
      • Control Flow Functions
      • Regular Expression Functions
      • Snowflake Functions
      • Data Type Functions
      • Other Functions
    • PantherFlow Example Queries
      • PantherFlow Examples: Threat Hunting Scenarios
      • PantherFlow Examples: SOC Operations
      • PantherFlow Examples: Panther Audit Logs
  • Enrichment
    • Custom Lookup Tables
      • Creating a GreyNoise Lookup Table
      • Lookup Table Examples
        • Using Lookup Tables: 1Password UUIDs
      • Lookup Table Specification Reference
    • Identity Provider Profiles
      • Okta Profiles
      • Google Workspace Profiles
    • Anomali ThreatStream
    • IPinfo
    • Tor Exit Nodes
    • TrailDiscover (Beta)
  • Panther AI (Beta)
    • Managing Panther AI Response History
  • System Configuration
    • Role-Based Access Control
    • Identity & Access Integrations
      • Azure Active Directory SSO
      • Duo SSO
      • G Suite SSO
      • Okta SSO
        • Okta SCIM
      • OneLogin SSO
      • Generic SSO
    • Panther Audit Logs
      • Querying and Writing Detections for Panther Audit Logs
      • Panther Audit Log Actions
    • Notifications and Errors (Beta)
      • System Errors
    • Panther Deployment Types
      • SaaS
      • Cloud Connected
        • Setting Up a Cloud Connected Panther Instance
      • Legacy Configurations
        • Snowflake Connected (Legacy)
        • Customer-configured Snowflake Integration (Legacy)
        • Self-Hosted Deployments (Legacy)
          • Runtime Environment
  • Panther Developer Workflows
    • Panther Developer Workflows Overview
    • Using panther-analysis
      • Public Fork
      • Private Clone
      • Panther Analysis Tool
        • Install, Configure, and Authenticate with the Panther Analysis Tool
        • Panther Analysis Tool Commands
        • Managing Lookup Tables and Enrichment Providers with the Panther Analysis Tool
      • CI/CD for Panther Content
        • Deployment Workflows Using Panther Analysis Tool
          • Managing Panther Content via CircleCI
          • Managing Panther Content via GitHub Actions
        • Migrating to a CI/CD Workflow
    • Panther API
      • REST API (Beta)
        • Alerts
        • Alert Comments
        • API Tokens
        • Data Models
        • Globals
        • Log Sources
        • Queries
        • Roles
        • Rules
        • Scheduled Rules
        • Simple Rules
        • Policies
        • Users
      • GraphQL API
        • Alerts & Errors
        • Cloud Account Management
        • Data Lake Queries
        • Log Source Management
        • Metrics
        • Schemas
        • Token Rotation
        • User & Role Management
      • API Playground
    • Terraform
      • Managing AWS S3 Log Sources with Terraform
      • Managing HTTP Log Sources with Terraform
    • pantherlog Tool
    • Converting Sigma Rules
    • MCP Server (Beta)
  • Resources
    • Help
      • Operations
      • Security and Privacy
        • Security Without AWS External ID
      • Glossary
      • Legal
    • Panther System Architecture
Powered by GitBook
On this page
  • Overview
  • Common use cases
  • Key-value pairs in DynamoDB
  • Generating keys
  • Cache helper functions in panther_detection_helpers
  • Dictionaries
  • String sets
  • Counters
  • Using timestamps to track state
  • Time to Live
  • Testing
  • Testing in the Panther Console
  • Testing with CLI workflows
  • Common pitfalls while using the cache
  • Pitfall: Using the cache before it is necessary

Was this helpful?

  1. Detections
  2. Rules and Scheduled Rules
  3. Writing Python Detections

Python Rule Caching

PreviousWriting Python DetectionsNextData Models

Last updated 5 months ago

Was this helpful?

Overview

Caching allows previous detection runs to directly influence subsequent runs. Panther's real-time analysis engine examines events one-by-one, and sometimes it's helpful to keep state across invocations. Rules can cache values by using built-in helper functions. These helper functions interface with a Panther-hosted DynamoDB table. This feature is sometimes referred to as the "panther-kv-store."

Detections can store and retrieve arbitrary key-value pairs, allowing for state preservation between detection runs. Rather than using a and Scheduled Rule, detections can instead gather and analyze event metadata in real-time.

If you'd like to read from your Panther-managed DynamoDB table, reach out to your Panther support team. You will be provisioned an Amazon Web Services (AWS) role with read-only permissions to DynamoDB.

Because using the cache adds significant latency to detection processing (which can have downstream effects like delayed alerts), it's recommended to:

  • Only use the cache in detections for log types that ingest a small amount of data

  • Write detections such that the cache is only used when absolutely necessary (see )

Common use cases

  • Aggregating data from raw events, enrichment, external sources, and more

    • Leveraging a cache allows detections to deduplicate and then aggregate data for use in later detection runs and/or alert context.

  • Correlating data from multiple events and/or log sources

    • A single event may not provide much insight in isolation. However, a series of events can form a more complete picture that can be very useful.

    • Since the DynamoDB cache can be referenced in any detection run across all of Panther, it is possible to use the cache to widen the scope of detections considerably.

  • Risk-based alerting, User Entity and Behavior Analytics (UEBA)

    • The DynamoDB cache can be used to monitor and score entities based on events that have come into Panther. This provides an abstraction layer around disparate events, enabling detections to track, score, and classify risky behavior.

    • Detections can incorporate scores on otherwise random event combinations without the use of explicit field-based logic.

Key-value pairs in DynamoDB

DynamoDB, which powers Panther's detection cache, is a fast and lightweight NoSQL key-value database. Panther has implemented a single DynamoDB table that powers detection caching.

All rows within DynamoDB are key-value pairs:

  • Key: A unique identifier for the row (cannot be duplicated within a table)

  • Value: Any data paired with a given key

Both keys and values can be generated in detection code.

Values stored in DynamoDB can be up to 400 KB in size.

Generating keys

All Panther detections share the same DynamoDB table as a cache. This provides benefits with cross-detection caching, but also requires you to choose keys that:

  • Can be programmatically generated at detection runtime

    • The code used to generate the key is often placed into a function.

  • Leverage event values

    • For example: IP addresses, usernames, hashes, IDs, ARNs.

  • Provide sufficient entropy and uniqueness within their intended scope

    • A cache may be implemented within a single detection, or for multiple detections and Log Sources simultaneously.

  • Do not conflict with one another

    • It is possible to erroneously overwrite key-value pairs, so keys need to be carefully constructed to prevent this.

A cached value can be accessed across different detections using the same key.

Cache helper functions in panther_detection_helpers

To reference panther_detection_helpers in your detection files, add the following import statement:

import panther_detection_helpers

You may also import specific functions with a statement like the following:

from panther_detection_helpers.caching import get_dictionary

Dictionaries

These Panther-provided helper functions allow detections to cache dictionaries:

  • get_dictionary: Get the dictionary's current value

  • put_dictionary: Overwrite a dictionary

Dictionaries are serialized and deserialized using the Python json library. Therefore, the dictionaries that are cached cannot contain:

  • Sets

  • Complex numbers or formulas

  • Custom objects

  • Keys that are not strings

Examples

Events can be cached natively, since they are always passed to detections as dictionaries:

from panther_detection_helpers.caching import get_dictionary, put_dictionary


def rule(event):
    key = __name__ + ":" + event.get("username")

    # Retrieve previous event
    previous_event_data = get_dictionary(key)

    # Store current event
    put_dictionary(key, event)

    # If no previous event data, exit
    if not previous_event_data:
        return False

    # Compare the IP between previous and current events
    if event.get("ipAddress") != previous_event_data.get("ipAddress"):
        return True

    return False

It is also possible to construct dictionaries in code and cache those:

from panther_base_helpers import deep_get
from panther_detection_helpers.caching import get_dictionary, put_dictionary


def store_login_info(key, event):
    # Map the user to the lon/lat and time of the most recent login
    put_dictionary(
        key,
        {
            "city": deep_get(event, "client", "geographicalContext", "city"),
            "lon": deep_get(event, "client", "geographicalContext", "geolocation", "lon"),
            "lat": deep_get(event, "client", "geographicalContext", "geolocation", "lat"),
            "time": event.get("p_event_time")
        }
    )

This methodology can be extended to store highly complex data sets in DynamoDB.

String sets

These Panther-provided helper functions allow detections to cache string sets:

  • add_to_string_set: Add one or more strings to a set

  • remove_from_string_set: Remove one or more strings from a set

  • reset_string_set: Empty the set

Example

The rule below provides a demonstration of caching string sets.

from panther_detection_helpers.caching import add_to_string_set, get_string_set


def rule(event):
    if event['eventName'] != 'AssumeRole':
        return False

    role_arn = event['requestParameters'].get('roleArn')
    if not role_arn:
        return False

    role_arn_key = '{}-UniqueSourceIPs'.format(role_arn)
    ip_addr = event['sourceIPAddress']

    previously_seen_ips = get_string_set(role_arn_key)

    # If this the only value, trust on first use
    if len(previously_seen_ips) == 0:
        add_to_string_set(role_arn_key, ip_addr)
        return False

    if ip_addr not in previously_seen_ips:
        return True

    return False

Counters

To implement a counter-based rule, use one or more of the following functions:

  • get_counter: Get the latest counter value

  • increment_counter: Add to the counter (default of 1)

  • reset_counter: Reset the counter to 0

  • set_key_expiration: Set the lifetime of the counter

Example

The rule below provides a demonstration of using counters.

from panther_detection_helpers.caching import increment_counter, set_key_expiration, reset_counter


def rule(event):
    # Filter to only analyze AccessDenied calls
    if event.get('errorCode') != 'AccessDenied':
        return False

    # Create our counter key, which should be fairly unique
    key = '{}-AccessDeniedCounter'.format(event['userIdentity'].get('arn'))

    # Increment the counter, and then check the current value
    hourly_error_count = increment_counter(key)
    if hourly_error_count == 1:
        set_key_expiration(key, time.time() + 3600)
    elif failure_hourly_count >= 10:
    # If it exceeds our threshold, reset and then return an alert
        reset_counter(key)
        return True
    return False

Using timestamps to track state

A common use case for a DynamoDB cache is to track groups of events in a given period of time. Since all key-value pairs must be generated in code, timestamp tracking is not provided unless provided in values.

Detection writers should consider storing p_event_time when aggregating events.

Timestamps should not used in keys, since they are very rarely reproducible across an unpredictable series of event logs.

Time to Live

The TTL is associated with a single cache key, regardless of the data type of the associated value. For example, if add_to_string_set() is called, then the TTL of the entire string set is reset to the value passed in epoch_seconds (or the default of 90 days if no value is passed).

Setting the TTL

It's possible to override the 90-day default TTL by using either:

  • The epoch_seconds parameter available in caching helper functions that write to the cache, such as put_string_set() and increment_counter()

  • The set_key_expiration() function

If you are not passing in a value for epoch_seconds, be sure to call set_key_expiration() after all functions taking in epoch_seconds.If a function taking in epoch_seconds is called after set_key_expiration() and no value is provided for epoch_seconds, the TTL will be reset to the default, 90 days.

To generate an expiration timestamp, take the unix timestamp associated with the event time, via event.event_time_epoch(), and add a given number of seconds. After the resulting timestamp passes, the row is automatically deleted within 48 hours.

It's recommended to use the event time (p_event_time) rather than the processing time (p_parse_time or datetime.datetime.now()) as the base for the TTL to account for any delays in processing events, as well as to ensure that old events, such as those found in unit tests, do not clutter the cache.

Examples

Example from Panther's Geographically Improbable Okta Login detection using epoch_seconds:

# Expire the entry after a week so the table doesn't fill up with past users
put_string_set(
    key,
    [
        dumps(
            {
                "city": deep_get(event, "client", "geographicalContext", "city"),
                "lon": deep_get(event, "client", "geographicalContext", "geolocation", "lon"),
                "lat": deep_get(event, "client", "geographicalContext", "geolocation", "lat"),
                "time": event.get("p_event_time"),
            }
        )
    ],
    epoch_seconds=event.event_time_epoch() + timedelta(days=7).total_seconds(),
)

The same example using set_key_expiration():

# Expire the entry after a week so the table doesn't fill up with past users
put_string_set(
    key,
    [
        dumps(
            {
                "city": deep_get(event, "client", "geographicalContext", "city"),
                "lon": deep_get(event, "client", "geographicalContext", "geolocation", "lon"),
                "lat": deep_get(event, "client", "geographicalContext", "geolocation", "lat"),
                "time": event.get("p_event_time"),
            }
        )
    ]
)
set_key_expiration(key, event.event_time_epoch() + timedelta(days=7).total_seconds())

Testing

The dependency on DynamoDB for a cache means that special considerations need to be made to test and validate detection code:

Testing in the Panther Console

  • Unit Test invocations will communicate with DynamoDB unless the applicable functions are overridden with mocks.

  • The data sent and received to/from DynamoDB can be committed to alert_context() for debugging in Unit Test results.

  • It is not possible to browse the raw contents of DynamoDB.

Testing with CLI workflows

  • Detections in Panther leverage an AWS IAM Role to communicate with DynamoDB.

    • It is not possible to interact with the DynamoDB cache outside the context of the Panther Console, and so testing needs to simulate inputs and outputs.

  • To accommodate CI/CD workflows, we recommend mocking any functions that interact with DynamoDB to simulate the expected outputs.

Common pitfalls while using the cache

Pitfall: Using the cache before it is necessary

When writing detections, it is important to only call the kv-store when you need to, and not before. For example, consider the following detection that checks to see if we see a bad actor twice:

BAD EXAMPLE
from panther_detection_helpers.caching import reset_string_set, get_string_set, add_to_string_set

def rule(event):
    bad_guys = get_string_set('BadGuys') # <-- #1
    bad_guy = event.get('BadGuyName')
    
    if event.get('eventType') == 'BadGuyDetected':
        add_to_string_set('BadGuys', bad_guy) # <-- #2
        
        if bad_guy in bad_guys:
            # repeat bad guy, alert
            reset_string_set('BadGuys')
            return True

    return False

There are two places where this detection can be significantly improved:

  1. It gets the BadGuys string set before checking if this is a BadGuyDetected event. If it is not a bad guy event, there is no need to fetch the string set. This call adds latency to every detection run even though it is not always needed.

  2. It adds the new bad_guy to the string set before checking if this is a repeat bad guy. If it is a repeat bad guy, we will alert and reset the set, so there is no need to add to the string set.

After making these changes, the detection looks like:

from panther_detection_helpers.caching import reset_string_set, get_string_set, add_to_string_set

def rule(event):   
    if event.get('eventType') == 'BadGuyDetected':
        bad_guy = event.get('BadGuyName')
        bad_guys = get_string_set('BadGuys') # <-- #1

        if bad_guy in bad_guys:
            # repeat bad guy, alert
            reset_string_set('BadGuys')
            return True
            
        add_to_string_set('BadGuys', bad_guy) # <-- #2
        
    return False

While caching can be used to count events and generate an alert after some event threshold is met, it's recommended to accomplish this using the built-in feature instead.

We recommend storing key generator functions in a to implement the same keys across multiple detections.

When intending to use the same cache across multiple detections and Log Sources, you may need to leverage to create a common field value taxonomy.

Panther maintains , a pip package you can use in your detections.

: Get the string set's current value

: Overwrite a string set

: Set the lifetime of the string set

Time to Live (TTL) lets you set an expiration timestamp on items in your cache. This automatic deletion can be useful as a deduplication strategy, as well as for efficient data cleanup. The default TTL for all cache entries is 90 days, but it's possible to .

Both epoch_seconds and set_key_expiration() define a timestamp at which an entry should be deleted. These functions are available in .

When using the to execute Unit Tests locally or as part of a CI/CD workflow, this IAM Role is not accessible.

See Panther's documentation for .

Global Helper
Data Models
panther_detection_helpers
panther_detection_helpers
panther_analysis_tool
Scheduled Search
Pitfall: Using the cache before it is necessary
set_key_expiration
configure your own TTL value
get_string_set
put_string_set
deduplication
more information about Mocks