Writing Python Detections
Construct Python detections in the Console or CLI workflow
Overview
You can write your own Python detections in the Panther Console or locally, following the CLI workflow. When writing Python detections, try to follow these best practices, and remember that certain alert fields can be set dynamically. Rules written in Python can be used in detection derivation.
You can alternatively use the no-code detection builder in the Console to create rules, or write them locally as Simple Detections. If you aren't sure whether to write detections locally as Simple Detections or Python detections, see the Using Python vs. Simple Detections YAML section.
It is highly discouraged to make external API requests from within your detections in Panther. In general, detections are processed at a very high scale, and making API requests can overload receiving systems and cause your rules to exceed the 15-second runtime limit.
How to create detections in Python
How to create a rule in Python
You can write a Python rule in both the Panther Console and CLI workflow.
How to create a scheduled rule in Python
You can write a Python scheduled rule in both the Panther Console and CLI workflow.
How to create a policy in Python
To learn how to create a policy, see the How to write a policy instructions on Policies.
Python detection syntax
A local Python detection is made up of two files: a Python file and a YAML file. When a Python detection is created in the Panther Console, there is only a Python text editor (not a YAML one). The keys listed in the YAML column, below, are set in fields in the user interface.
Detection logic
def rule(event): # or def policy(resource):
Alert functions (dynamic)
def severity(event): def title(event): def dedup(event): def destinations(event): def runbook(event): def reference(event): def description(event): def alert_context(event):
Filter key
InlineFilters:
Metadata keys
AnalysisType: # rule, scheduled_rule, or policy Enabled: FileName: RuleID: # or PolicyId: LogTypes: Reports: Tags: Tests: ScheduledQueries: # only applicable to scheduled rules Suppressions: # only applicable to policies CreateAlert: # not applicable to policies
Alert keys (static)
Severity: Description: DedupPeriodMinutes: Threshold: DisplayName: OutputIds: Reference: Runbook: SummaryAttributes:
Basic Python rule structure
Only a rule()
function and the YAML keys shown below are required for a Python rule. Additional Python alert functions, however, can make your alerts more dynamic. Additional YAML keys are available, too—see Python rule specification reference.
def rule(event):
if event.get("Something"):
return True
return False
AnalysisType: rule
Enabled: true
Filename: rule.py
RuleID: my.rule
LogTypes:
- Some.Schema
Severity: INFO
For more templates, see the panther-analysis repo on GitHub.
InlineFilters
InlineFilters
Learn more about using InlineFilters
in Python rules on Modifying Detections with Inline Filters.
Alert functions in Python detections
Panther's detection auxiliary functions are Python functions that control analysis logic, generated alert title, event grouping, routing of alerts, and metadata overrides. Rules are customizable and can import from standard Python libraries or global helpers.
Applicable to both rules and policies, each function below takes a single argument of event
(rules) or resource
(policies). Advanced users may define functions, variables, or classes outside of the functions defined below.
If you are using alert deduplication, the first event to match the detection is used as a parameter for these alert functions.
Each of the below alert functions are optional, but can add dynamic context to your alerts.
The level of urgency of the alert
In YAML: Severity
key
In Console: Severity field
INFO
, LOW
, MEDIUM
, HIGH
,CRITICAL
, or DEFAULT
The generated alert title
In YAML: DisplayName
> RuleID
or PolicyID
In Console: Name field > ID field
String
The string to group related events with, limited to 1000 characters
In Python/YAML: title()
> DisplayName
> RuleID
or PolicyID
In Console: title()
> Name field > ID field
String
An explanation about why the rule exists
In YAML: Description
key
In Console: Description field
String
A reference URL to an internal document or online resource about the rule
In YAML: Reference
key
In Console: Reference field
String
A list of instructions to follow once the alert is generated. It's recommended to provide a descriptive runbook, as Panther AI alert triage will take it into consideration.
In YAML: Runbook
key
In Console: Runbook field
String
The label or ID of the destinations to specifically send alerts to. An empty list will suppress all alerts.
In YAML: OutputIds
key
In Console: Destination Overrides field
List[Destination Name/ID]
severity
severity
In some scenarios, you may need to upgrade or downgrade the severity level of an alert. The severity levels of an alert can be mapped to INFO
, LOW
, MEDIUM
, HIGH
, CRITICAL
, or DEFAULT
. Return DEFAULT
to fall back to the statically defined rule severity.
The severity string is case insensitive, meaning you can return, for example, Critical
or default
, depending on your style preferences.
In the example below, if an API token has been created, a HIGH
severity alert is generated—otherwise an INFO
level alert is generated:
def severity(event):
if event.get('eventType') == 'system.api_token.create':
return "HIGH"
return "INFO"
Reference: Template Rule
Example using DEFAULT
:
def severity(event):
if event.get('eventType') == 'system.api_token.create':
return "HIGH"
return "DEFAULT"
title
title
The title()
function is optional, but it is recommended to include it to provide additional context in an alert.
In the example below, the log type, username, and a static string are sent to the alert destination. The function checks to see if the event is related the AWS.CloudTrail log type and, if so, returns the AWS Account Name.
Learn more about how an alert title is set on Rules and Scheduled Rules.
Example:
def title(event):
# use unified data model field in title
log_type = event.get("p_log_type")
title_str = (
f"{log_type}: User [{event.udm('actor_user')}] has exceeded the failed logins threshold"
)
if log_type == "AWS.CloudTrail":
title_str += f" in [{lookup_aws_account_name(event.get('recipientAccountId'))}]"
return title_str
Reference: Template Rule
dedup
dedup
Deduplication is the process of grouping related events into a single alert to prevent receiving duplicate alerts. Events triggering the same detection that also share a deduplication string, within the deduplication period, are grouped together in a single alert. The dedup
function is one way to define a deduplication string. It is limited to 1000 characters.
Learn more about deduplication on Rules and Scheduled Rules.
Example:
def dedup(event):
user_identity = event.get("userIdentity", {})
if user_identity.get("type") == "AssumedRole":
return helper_strip_role_session_id(user_identity.get("arn", ""))
return user_identity.get("arn")
Reference: AWS S3 Bucket Deleted Rule
destinations
destinations
By default, Alerts are sent to specific destinations based on severity level or log type event. Each Detection has the ability to override their default destination and send the Alert to one or more specific destination(s). In some scenarios, a destination override is required, providing more advance criteria based on the logic of the Rule.
Example:
A rule used for multiple log types utilizes the destinations function to reroute the Alert to another destination if the log type is "AWS.CloudTrail". The Alert is suppressed to this destination using return ["SKIP"]
if the log type is not CloudTrail.
def destinations(event):
if event.get("p_log_type") == "AWS.CloudTrail":
return ["slack-security-alerts"] ### Name or UUID of destination
# Do not send alert to an external destination
return ["SKIP"]
Reference: Template Rule
alert_context
alert_context
This function allows the detection to pass any event details as additional context, such as usernames, IP addresses, or success/failure, to the alert destination(s).
Values included in the alert context dictionary must be JSON-compliant. Examples of non-compliant values include Python's nan
, inf
, and -inf
.
Example:
The code below returns all event data in the alert context.
def rule(event):
return (
event.get("actionName") == "UPDATE_SAML_SETTINGS"
and event.get("actionResult") == "SUCCEEDED"
)
def alert_context(event):
return {
"user": event.udm("actor_user"),
"ip": event.udm("source_ip")
}
runbook
, reference
, and description
runbook
, reference
, and description
These functions can provide additional context around why an alert was triggered and how to resolve the related issue.
It's recommended to provide a descriptive runbook, as Panther AI alert triage will take it into consideration. For example:
def runbook(event):
return f"""
Query CloudTrail activity from the new access key ({event.deep_get("responseElements",
"accessKey", "accessKeyId", default="key not found")}) at least 2 hours after the alert was triggered
and check for data access or other privilege escalation attempts using the aws_cloudtrail table.
"""
This would produce a runbook like the following:
Query CloudTrail activity from the new access key (AKIA5FCD6LZQR7OPYQHF) at least 2 hours after the alert was triggered and check for data access or other privilege escalation attempts using the aws_cloudtrail table.
The example below dynamically provides a link within the reference
field in an alert.
def reference(event):
log_type = event.get("p_log_type")
if log_type == "OnePassword.SignInAttempt":
return: f"<https://link/to/resource>"
elif log_type == "Okta.SystemLog":
return: f"<https://link/to/resource/2>"
else:
return: f"<https://default/link>"
Event object functions
In a Python detection, the rule()
function and all dynamic alert functions take in a single argument: the event
object. This event object has built-in functions to enable simple extraction of event values.
get()
get()
def get(self, key, default=None) -> Any:
Use get()
to access a top-level event field. You can provide a default value that will be returned if the key is not found.
It is also possible to access a top-level field using deep_get()
and deep_walk()
. Learn more about accessing top-level fields safely below.
Example:
{
"key": "value"
}
def rule(event):
return event.get("key") == "value"
# The above would return true
deep_get()
deep_get()
def deep_get(self, *keys: str, default: Any = None) -> Any:
Use deep_get()
to return keys that are nested within Python dictionaries.
If the value you need to retrieve lives within a list, use deep_walk()
instead.
Example:
Given an event with the following structure
{
"object": {
"nested": {
"key": "here"
}
}
}
def rule(event):
return event.deep_get("object", "nested", "key") == "here"
# The above would return true
deep_walk()
deep_walk()
def deep_walk(
self, *keys: str, default: Optional[str] = None, return_val: str = "all"
) -> Union[Optional[Any], Optional[List[Any]]]:
Use deep_walk()
to return values associated with keys that are deeply nested in Python dictionaries, which may contain any number of dictionaries or lists. If it matches multiple event fields, an array of matches will be returned; if only one match is made, the value of that match will be returned.
Example:
{
"object": {
"nested": {
"list": [
{
"key": "first"
},
{
"key": "second"
}
]
}
}
}
def rule(event):
return "first" in event.deep_walk("object", "nested", "list", "key", default=[])
# The above would return true
lookup()
lookup()
def lookup(self, lookup_table_name: str, lookup_key: str) -> Any:
The lookup()
function lets you dynamically access data from Custom Lookup Tables and Panther-managed Enrichment providers from your detections. The lookup()
function may be useful if your incoming logs don't contain an exact match for a value in your Lookup Table's primary key column. You can use Python to modify an event value before passing it into lookup()
to fetch enrichment data.
lookup()
takes two arguments:
The name of the Lookup Table
The Lookup Table name passed to
lookup()
must be as it appears on the Enrichment Providers or Lookup Tables pages in the Panther Console. This name may differ syntactically from how it appears in a search query; for example,My-Custom-LUT
instead ofmy_custom_lut
.
A Lookup Table primary key
If a match is found in the Lookup Table for the provided key, the full Lookup Table row is returned as a Python dictionary. If no match is found, None
is returned.
Example using lookup()
:
# Imagine you have a Lookup Table named user_roles with the following entries:
# row 1: {"id": "[email protected]", "role": "admin"}
# row 2: {"id": "[email protected]", "role": "guest"}
# In this rule, we want to return True if the user has a non-admin role
# We want to fetch role from the Lookup Table, but the event doesn't
# contain the Lookup Table's primary key (the email address)
def rule(event):
lookup_table_name = "user_roles"
# On the event, we *do* have access to the username, from which we can
# generate the email address
user_name = event.get("username", "").lower()
lookup_key = f"{user_name}@email.com" # Dynamically compose the lookup key, or "Selector"
lookup_data = event.lookup(lookup_table_name, lookup_key)
# If a match occurs, `lookup_data` will contain the full row of data
# Otherwise it will return None
if (lookup_data and lookup_data.get("role") != "admin") or lookup_data is None:
return True
return False
Unit testing detections that use lookup()
lookup()
When unit tests are run, lookup()
does not retrieve live data. To emulate the lookup functionality, add a _mocked_lookup_data_
field in the event payload of each unit test to mock the Lookup Table data. You cannot use the enrich test data button or CLI command with lookup()
.
_mocked_lookup_data_
should be structured like the following example:
{
"_mocked_lookup_data_": {
"user_roles": { # This key is the name of your Lookup Table
# The keys in this object should be the Lookup Table key
# The values in this object should be the Lookup Table data object
"[email protected]": {"id": "[email protected]", "role": "admin"},
"[email protected]": {"id": "[email protected]", "role": "guest"}
}
}
}
If you do not specify a _mocked_lookup_data_
field in your unit test, attempts to call lookup()
will return None/null
.
udm()
udm()
The behavior of udm() will change when Panther Core Fields are removed on September 29, 2025. Learn more about this change here.
def udm(self, *key: str, default: Any = None) -> Any:
The udm()
function is primarily intended to allow you to access Data Model and Core Field (also known as p_udm field) values, but can also be used to access event fields.
Here is how the udm()
function works:
The function first checks to see if there is a Data Model key mapping defined for the value passed in to
udm()
. If so, the value of the Data Model is returned.If a Data Model key is defined for the value passed in to
udm()
, the function returns its value and does not move on to steps 2 or 3, below. This is true even if the event being evaluated does not contain the key path defined in the Data Model mapping—in this case,null
is returned.
If there is no Data Model key defined for the value passed into
udm()
, the function then checks to see if there is Core Field value in the event'sp_udm
struct with that name. If so, the Core Field value is returned.In order for
udm()
to not move on to Step 3, below, the event being evaluated must contain the value passed in toudm()
as a key within itsp_udm
struct. If a Core Field has simply been defined with the passed-in value as a key, but the event does not contain the mapping's associated field path, the event'sp_udm
struct will not contain the Core Field, andudm()
will move on to Step 3, below.
If there is no Data Model defined or Core Field present for the value passed into
udm()
, the function then checks whether there is an event field with that name. If so, its value is returned.In this case,
udm()
checks all event fields, even nested ones. Its behavior is analogous todeep_get()
.
# Example usage when operating on data models
def rule(event):
return event.udm('field_on_data_model')
# Example usage when operating on data models with default
def rule(event):
# The default parameter is only respected when using path-based mappings on
# your data model. If your data model maps to a function, whatever value your
# function returns will be respected
return event.udm('field_on_data_model', default='')
# Example usage when operating on p_udm fields
def rule(event):
# Note: When operating on p_udm fields, the udm function operates like our
# deep_get function, allowing you to reference nested fields
#
# If the deep_get syntax is used and the top most field belongs to a mapped
# data model field, udm will look at the p_udm field instead
return event.udm('field_on_udm', 'nested_field', default='')
Example using udm()
to access a Data Model value:
udm()
to access a Data Model value:Mappings:
- Name: source_ip
Path: nested.srcIp
{
"nested": {
"srcIp": "127.0.0.1"
}
}
def rule(event):
return event.udm("source_ip") == "127.0.0.1"
# The above would return true
Python best practices
Python Enhancement Proposals publishes resources on how to cleanly and effectively write and style your Python code. For example, you can use autopep8 to automatically ensure that your written detections all follow a consistent style.
Available Python libraries
The following Python libraries are available to be used in Panther in addition to boto3
, provided by AWS Lambda:
Package
Version
Description
License
jsonpath-ng
1.5.2
JSONPath Implementation
Apache v2
policyuniverse
1.3.3.20210223
Parse AWS ARNs and Policies
Apache v2
requests
2.23.0
Easy HTTP Requests
Apache v2
Python detection writing best practices
Writing tests for your detections
Before enabling new detections, it is recommended to write tests that define scenarios where alerts should or should not be generated. Best practice dictates at least one positive and one negative to ensure the most reliability.
Casing for event fields
Lookups for event fields are not case sensitive. event.get("Event_Type")
or event.get("event_type")
will return the same result.
Understanding top level fields and nested fields
Top-level fields represent the parent fields in a nested data structure. For example, a record may contain a field called user
under which there are other fields such as ip_address
. In this case, user
is the top-level field, and ip_address
is a nested field underneath it.
Nesting can occur many layers deep, and so it is valuable to understand the schema structure and know how to access a given field for a detection.
Accessing top-level fields safely
Basic Rules match a field’s value in the event, and a best practice to avoid errors is to leverage Python’s built-in get()
function.
The example below is a best practice because it leverages a get()
function. get()
will look for a field, and if the field doesn't exist, it will return None
instead of an error, which will result in the detection returning False
.
def rule(event):
return event.get('field') == 'value'
In the example below, if the field exists, the value of the field will be returned. Otherwise, False
will be returned:
def rule(event):
if event.get('field')
return event.get('field')
return False
Bad practice example
The rule definition below is bad practice because the code is explicit about the field name. If the field doesn't exist, Python will throw a KeyError
:
def rule(event):
return event['field'] == 'value'
Using Global Helper functions
Once many detections are written, a set of patterns and repeated code will begin to emerge. This is a great use case for Global Helper functions, which provide a centralized location for this logic to exist across all detections.
Accessing nested fields safely
If you'd like to access a filed nested deeply within an event, use the deep_get()
and deep_walk()
functions available on the event object. These functions are also represented as Global Helper functions, but for convenience, it's recommended to use the event object version instead.
Example:
AWS CloudTrail logs nest the type
of user accessing the console underneath userIdentity
. Here is a snippet of a JSON CloudTrail root activity log:
{
"eventVersion": "1.05",
"userIdentity": {
"type": "Root",
"principalId": "1111",
"arn": "arn:aws:iam::123456789012:root",
"accountId": "123456789012",
"userName": "root"
},
...
}
See how to check the value of type
safely using both forms of deep_get()
:
Checking the event value using the event object deep_get()
function:
def rule(event):
return event.deep_get("userIdentity", "type") == "Root"
Checking fields for specific values
You may want to know when a specific event has occurred. If it did occur, then the detection should trigger an alert. Since Panther stores everything as normalized JSON, you can check the value of a field against the criteria you specify.
For example, to detect the action of granting Box technical support access to your Box account, the Python below would be used to match events where the event_type
equals ACCESS_GRANTED
:
def rule(event):
return event.get("event_type") == "ACCESS_GRANTED"
If the field is event_type
and the value is equal to ACCESS_GRANTED
then the rule function will return true
and an Alert will be created.
Checking fields for Integer values
You may need to compare the value of a field against integers. This allows you to use any of Python’s built-in comparisons against your events.
For example, you can create an alert based on HTTP response status codes:
# returns True if 'status_code' equals 404
def rule(event):
if event.get("status_code"):
return event.get("status_code") == 404
else:
return False
# returns True if 'status_code' greater than 400
def rule(event):
if event.get("status_code"):
return event.get("status_code") > 404
else:
return False
Reference:
Using the Universal Data Model
Data Models provide a way to configure a set of unified fields across all log types. By default, Panther comes with built-in Data Models for several log types. Custom Data Models can be added in the Panther Console or via the Panther Analysis Tool.
event.udm()
can only be used with log types that have an existing Data Model in your Panther environment.
Example:
import panther_event_type_helpers as event_type
def rule(event):
# filter events on unified data model field ‘event_type’
return event.udm("event_type") == event_type.FAILED_LOGIN
References:
Using multiple conditions
The and
keyword is a logical operator and is used to combine conditional statements. It is often required to match multiple fields in an event using the and
keyword. When using and
, all statements must be true:
"string_a" == "this"
and
"string_b" == "that"
Example:
To track down successful root user access to the AWS console you need to look at several fields:
from panther_base_helpers import deep_get
def rule(event):
return (event.get("eventName") == "ConsoleLogin" and
deep_get(event, "userIdentity", "type") == "Root" and
deep_get(event, "responseElements", "ConsoleLogin") == "Success")
The or
keyword is a logical operator and is used to combine conditional statements. When using or
, either of the statements may be true:
"string_a" == "this"
or
"string_b" == "that"
Example:
This example detects if the field contains either Port 80 or Port 22:
# returns True if 'port_number' is 80 or 22
def rule(event):
return event.get("port_number") == 80 or event.get("port_number") == 22
Searching values in lists
Comparing event values against a list (containing, for example, IP addresses or users) is quick in Python. It's a common pattern to set your rule logic to not match when an event value also exists in the list. This can help reduce false positives for known behavior in your environment.
When checking whether event values are in some collection, it's recommended to use a Python set—sets are more performant (i.e., memory efficient) than lists and tuples in Python. Lists and tuples, unlike sets, require iterating through each item in the collection to check for inclusion.
If the set against which you're performing the comparison is static, it's recommended to define it at the global level, rather than inside the rule()
function. Global variables are initialized only once per Lambda invocation. Because a single Lambda invocation can process multiple events, a global variable is usually more efficient than initializing it each time rule()
is invoked.
Example:
# Set - Recommended over tuples and lists for performance
ALLOW_IP = {'192.0.0.1', '192.0.0.2', '192.0.0.3'}
def rule(event):
return event.get("ip_address") not in ALLOW_IP
In the example below, we use the Panther helper pattern_match_list
:
from panther_base_helpers import pattern_match_list
USER_CREATE_PATTERNS = [
"chage", # user password expiry
"passwd", # change passwords for users
"user*", # create, modify, and delete users
]
def rule(event):
# Filter the events
if event.get("event") != "session.command":
return False
# Check that the program matches our list above
return pattern_match_list(event.get("program", ""), USER_CREATE_PATTERNS)
Reference: Teleport Create User Accounts
Matching events with regex
If you want to match against events using regular expressions - to match subdomains, file paths, or a prefix/suffix of a general string - you can use regex. In Python, regex can be used by importing the re
library and looking for a matching value.
In the example below, the regex pattern will match Administrator or administrator against the nested value of the privilegeGranted field.
import re
from panther_base_helpers import deep_get
#The regex pattern is stored in a variable
# Note: This is better performance than putting it in the rule function, which is evaluated on each event
ADMIN_PATTERN = re.compile(r"[aA]dministrator")
def rule(event):
# using the deep_get function we can pull out the nested value under the "privilegeGranted" field
value_to_search = deep_get(event, "debugContext", "debugData", "privilegeGranted")
# finally we use the regex object we created earlier to check against our value
# if there is a match, "True" is returned
return (bool(ADMIN_PATTERN.search(value_to_search, default="")))
In the example below, we use the Panther helper pattern_match
:
from panther_base_helpers import pattern_match
def rule(event):
return pattern_match(event.get("operation", ""), "REST.*.OBJECT")
References:
Python rule specification reference
Required fields are in bold.
Field Name
Description
Expected Value
AnalysisType
Indicates whether this analysis is a rule, scheduled_rule, policy, or global
Rules: rule
Scheduled Rules: scheduled_rule
Enabled
Whether this rule is enabled
Boolean
FileName
The path (with file extension) to the python rule body
String
RuleID
The unique identifier of the rule
String
Cannot include %
LogTypes
The list of logs to apply this rule to
List of strings
Severity
What severity this rule is
One of the following strings: Info
, Low
, Medium
, High
, or Critical
ScheduledQueries
(field only for Scheduled Rules)
The list of Scheduled Query names to apply this rule to
List of strings
Description
A brief description of the rule
String
DedupPeriodMinutes
The time period (in minutes) during which similar events of an alert will be grouped together
15
,30
,60
,180
(3 hours),720
(12 hours), or 1440
(24 hours)
DisplayName
A friendly name to show in the UI and alerts. The RuleID
will be displayed if this field is not set.
String
OutputIds
Static destination overrides. These will be used to determine how alerts from this rule are routed, taking priority over default routing based on severity.
List of strings
Reference
The reason this rule exists, often a link to documentation
String
Reports
A mapping of framework or report names to values this rule covers for that framework
Map of strings to list of strings
Runbook
The actions to be carried out if this rule returns an alert, often a link to documentation
String
SummaryAttributes
A list of fields that alerts should summarize.
List of strings
Threshold
How many events need to trigger this rule before an alert will be sent.
Integer
Tags
Tags used to categorize this rule
List of strings
Tests
Unit tests for this rule.
List of maps
Python Policy Specification Reference
Required fields are in bold.
A complete list of policy specification fields:
Field Name
Description
Expected Value
AnalysisType
Indicates whether this specification is defining a policy or a rule
policy
Enabled
Whether this policy is enabled
Boolean
FileName
The path (with file extension) to the python policy body
String
PolicyID
The unique identifier of the policy
String
Cannot include %
ResourceTypes
What resource types this policy will apply to
List of strings
Severity
What severity this policy is
One of the following strings: Info
, Low
, Medium
, High
, or Critical
Description
A brief description of the policy
String
DisplayName
What name to display in the UI and alerts. The PolicyID
will be displayed if this field is not set.
String
Reference
The reason this policy exists, often a link to documentation
String
Reports
A mapping of framework or report names to values this policy covers for that framework
Map of strings to list of strings
Runbook
The actions to be carried out if this policy fails, often a link to documentation
String
Suppressions
Patterns to ignore, e.g., aws::s3::*
List of strings
Tags
Tags used to categorize this policy
List of strings
Tests
Unit tests for this policy.
List of maps
Troubleshooting Detections
Visit the Panther Knowledge Base to view articles about detections that answer frequently asked questions and help you resolve common errors and issues.
Last updated
Was this helpful?