Writing Python Detections
Construct Python detections in the Console or CLI workflow
You can write your own Python detections in the Panther Console or locally, following the CLI workflow. When writing Python detections, try to follow these best practices, and remember that certain alert fields can be set dynamically.
You can alternatively use the no-code detection builder in the Console to create rules, or write them locally in YAML. If you aren't sure whether to write detections locally in YAML or Python, see the Using Python vs. YAML section.
Before you write a new Python detection, see if there's a Panther-managed detection that meets your needs (or almost meets your needs—Panther-managed rules can be tuned with Inline Filters). Leveraging a Panther-managed detection not only saves you from the effort of writing one yourself, but also provides the ongoing benefit of continuous updates to core detection logic, as Panther releases new versions.
You can write a Python rule in both the Panther Console and CLI workflow.
This consolidated user interface for viewing and editing detections is in open beta starting with Panther version 1.74, and is available to all customers. Please share any bug reports and feature requests with your Panther support team.
- 1.In the left-hand navigation bar of your Panther Console, click Build > Detections.
- 2.Click Create New.
- 3.On the New Detection page, select Rule for the detection type.
- 4.In the Basic Info section, provide values for the following fields:
- Name: Enter a descriptive name for the rule.
- ID (optional): Click the pen icon and enter a unique ID for your rule.
- 5.In the upper-right corner, click Continue.
- 6.On the next page, configure your rule:
- In the upper-right corner, the Enabled toggle will be set to
ON
by default. If you'd like to disable the rule, flip the toggle toOFF
. - In the For the Following Source section:
- Log Types: Select the log types this rule should apply to.
- In the Detect section:
- In the Rule Function text editor, write a Python
rule
function to define your detection.
- In the Set Alert Fields section:
- In the Optional Fields section, optionally provide values for the following fields:
- Description: Enter additional context about the rule.
- Runbook: Enter the procedures and operations relating to this rule.
- Reference: Enter an external link to more information relating to this rule.
- Destination Overrides: Choose destinations to receive alerts for this detection, regardless of severity. Note that destinations can also be set dynamically, in the rule function. See Routing Order Precedence to learn more about routing precedence.
- Deduplication Period and Events Threshold: Enter the deduplication period and threshold for rule matches. To learn how deduplication works, see Deduplication.
- Summary Attributes: Enter the attributes you want to showcase in the alerts that are triggered by this detection.
- To use a nested field as a summary attribute, use the Snowflake dot notation in the Summary Attribute field to traverse a path in a JSON object:
<column>:<level1_element>.<level2_element>.<level3_element>
The alert summary will then be generated for the referenced object in the alert. Learn more about traversing semi-structured data in Snowflake here.
- Custom Tags: Enter custom tags to help you understand the rule at a glance (e.g.,
HIPAA
.) - In the Framework Mapping section:
- 1.Click Add New to enter a report.
- 2.Provide values for the following fields:
- Report Key: Enter a key relevant to your report.
- Report Values: Enter values for that report.
- In the Test section:
- In the Unit Test section, click Add New to create a test for the rule you defined in the previous step.
- 7.In the upper-right corner, click Save.
If you're writing detections locally (instead of in the Panther Console), we recommend managing your local detection files in a version control system like GitHub or GitLab.
We advise that you start your custom detection content by creating either a public fork or a private cloned repo from Panther's open-source panther-analysis repository.
If you group your rules into folders, each folder name must contain
rules
in order for them to be found during upload (using either PAT or the bulk uploader in the Console).We recommend grouping rules into folders based on log/resource type, e.g.,
suricata_rules
or aws_s3_policies
. You can use the panther-analysis repo as a reference.Each rule and scheduled rule consists of:
- A Python file (a file with a
.py
extension) containing your detection logic. - A YAML specification file (a file with a
.yml
extension) containing metadata attributes of the detection.- By convention, we give this file the same name as the Python file.
Rules are Python functions to detect suspicious behaviors. Returning a value of
True
indicates suspicious activity, which triggers an alert.- 1.Write your rule and save it (in your folder of choice) as
my_new_rule.py
:def rule(event):return 'prod' in event.get('hostName') - 2.Create a metadata file using the template below:AnalysisType: ruleDedupPeriodMinutes: 60 # 1 hourDisplayName: Example Rule to Check the Format of the SpecEnabled: trueFilename: my_new_rule.pyRuleID: Type.Behavior.MoreContextSeverity: HighLogTypes:- LogType.GoesHereReports:ReportName (like CIS, MITRE ATT&CK):- The specific report section relevant to this ruleTags:- Tags- Go- HereDescription: >This rule exists to validate the CLI workflows of the Panther CLIRunbook: >First, find out who wrote this the spec format, then notify them with feedback.Reference: https://www.a-clickable-link-to-more-info.com
When this rule is uploaded, each of the fields you would normally populate in the Panther Console will be auto-filled. See Rule specification reference for a complete list of required and optional fields.
You can write a Python scheduled rule in both the Panther Console and CLI workflow.
This consolidated user interface for viewing and editing detections is in open beta starting with Panther version 1.74, and is available to all customers. Please share any bug reports and feature requests with your Panther support team.
- 1.In the left-hand navigation bar of your Panther Console, click Build > Detections.
- 2.Click Create New.
- 3.On the New Detection page, select Scheduled Rule for the detection type.
- 4.In the Basic Info section, provide values for the following fields:
- Name: Enter a descriptive name for the scheduled rule.
- ID (optional): Click the pen icon and enter a unique ID for your scheduled rule.
- 5.In the upper-right corner, click Continue.
- 6.On the next page, configure your scheduled rule:
- In the upper-right corner, the Enabled toggle will be set to
ON
by default. If you'd like to disable the scheduled rule, flip the toggle toOFF
. - In the For the Following Scheduled Queries section:
- In the Detect section:
- In the Rule Function text editor, write a Python
rule
function to define your detection.- If all your filtering logic is already taken care of in the SQL of the associated scheduled query, you can configure the
rule
function to simply returntrue
for each row:def rule(event):return True
- In the Set Alert Fields section:
- In the Optional Fields section, optionally provide values for the following fields:
- Description: Enter additional context about the rule.
- Runbook: Enter the procedures and operations relating to this rule.
- Reference: Enter an external link to more information relating to this rule.
- Destination Overrides: Choose destinations to receive alerts for this detection, regardless of severity. Note that destinations can also be set dynamically, in the rule function. See Routing Order Precedence to learn more about routing precedence.
- Deduplication Period and Events Threshold: Enter the deduplication period and threshold for rule matches. To learn how deduplication works, see Deduplication.
- Summary Attributes: Enter the attributes you want to showcase in the alerts that are triggered by this detection.
- To use a nested field as a summary attribute, use the Snowflake dot notation in the Summary Attribute field to traverse a path in a JSON object:
<column>:<level1_element>.<level2_element>.<level3_element>
The alert summary will then be generated for the referenced object in the alert. Learn more about traversing semi-structured data in Snowflake here.
- Custom Tags: Enter custom tags to help you understand the rule at a glance (e.g.,
HIPAA
.) - In the Framework Mapping section:
- 1.Click Add New to enter a report.
- 2.Provide values for the following fields:
- Report Key: Enter a key relevant to your report.
- Report Values: Enter values for that report.
- In the Test section:
- In the Unit Test section, click Add New to create a test for the rule you defined in the previous step.
- 7.In the upper-right corner, click Save.
- Once you've clicked Save, the scheduled rule will become active. The SQL returned from the associated scheduled query (at the interval defined in the query) will be run through the scheduled rule (if, that is, any rows are returned).
If you're writing detections locally (instead of in the Panther Console), we recommend managing your local detection files in a version control system like GitHub or GitLab.
We advise that you start your custom detection content by creating either a public fork or a private cloned repo from Panther's open-source panther-analysis repository.
If you group your rules into folders, each folder name must contain the string
rules
in order for them to be found during upload (using either PAT or the bulk uploader in the Console).We recommend grouping rules into folders based on log/resource type, e.g.,
suricata_rules
or aws_s3_policies
. You can use the panther-analysis repo as a reference.Each scheduled rule consists of:
- A Python file (a file with a
.py
extension) containing your detection logic. - A YAML specification file (a file with a
.yml
extension) containing metadata attributes of the detection.- By convention, we give this file the same name as the Python file.
Scheduled rules allow you to analyze the output of a scheduled query with Python. Returning a value of
True
indicates suspicious activity, which triggers an alert.- 1.Write your query and save it as
my_new_scheduled_query.yml
:AnalysisType: scheduled_queryQueryName: My New Scheduled Query NameEnabled: trueTags:- Optional- TagsDescription: >An optional DescriptionQuery: 'SELECT * FROM panther_logs.aws_cloudtrail LIMIT 10'SnowflakeQuery: 'SELECT * FROM panther_logs.public.aws_cloudtrail LIMIT 10'AthenaQuery: 'SELECT * FROM panther_logs.aws_cloudtrail LIMIT 10'Schedule:# Note: CronExpression and RateMinutes are mutually exclusive, only# configure one or the otherCronExpression: '0 * * * *'RateMinutes: 1TimeoutMinutes: 1 - 2.Write your rule and save it as
my_new_rule.py
:# Note: See an example rule for more options# https://github.com/panther-labs/panther-analysis/blob/master/templates/example_rule.pydef rule(_):# Note: You may add additional logic herereturn True - 3.Create a metadata file and save it as
my_new_schedule_rule.yml
:AnalysisType: scheduled_ruleFilename: my_new_rule.pyRuleID: My.New.RuleDisplayName: A More Friendly NameEnabled: trueScheduledQueries:- My New Scheduled Query NameTags:- TagSeverity: MediumDescription: >An optional DescriptionRunbook: >An optional RunbookReference: An optional reference.linkTests:-Name: NameExpectedResult: trueLog:{"JSON": "string"}
When this scheduled rule is uploaded, each of the files will connect a scheduled query with a rule, and fill in the fields you would normally populate in the Panther Console will be auto-filled. See Rule specification reference below for a complete list of required and optional fields.
A local Python detection is made up of two files: a Python file and a YAML file. When a Python detection is created in the Panther Console, there is only a Python text editor (not a YAML one). The keys listed in the YAML column, below, are set in fields in the user interface.
The Python file can contain: | The YAML file can contain: |
---|---|
|
|
Only a
rule()
function and the YAML keys shown below are required for a Python rule. Additional Python alert functions, however, can make your alerts more dynamic. Additional YAML keys are available, too—see Python rule specification reference.rule.py | rule.yml |
---|---|
def rule(event): if event.get("Something"): return True return False | AnalysisType: rule Enabled: true Filename: rule.py RuleID: my.rule LogTypes: - Some.Schema Severity: INFO |
Panther's detection auxiliary functions are Python functions that control analysis logic, generated alert title, event grouping, routing of alerts, and metadata overrides. Rules are customizable and can import from standard Python libraries or global helpers.
Applicable to both rules and policies, each function below takes a single argument of
event
(rules) or resource
(policies). Advanced users may define functions, variables, or classes outside of the functions defined below.Each of the below alert functions are optional, but can add dynamic context to your alerts.
Detection alert function name | Description | Overrides | Return Value |
---|---|---|---|
The level of urgency of the alert | In YAML: Severity keyIn Console: Severity field | INFO, LOW, MEDIUM, HIGH, or CRITICAL | |
The generated alert title | In YAML: DisplayName > RuleID or PolicyID In Console: Name field > ID field | String | |
The string to group related events with, limited to 1000 characters | In Python/YAML: title() > DisplayName > RuleID or PolicyID In Console: title() > Name field > ID field | String | |
Additional context to pass to the alert destination(s) | Does not override a YAML nor Console field | Dict[String: Any] | |
An explanation about why the rule exists | In YAML: Description keyIn Console: Description field | String | |
A reference URL to an internal document or online resource about the rule | In YAML: Reference keyIn Console: Reference field | String | |
A list of instructions to follow once the alert is generated | In YAML: Runbook keyIn Console: Runbook field | String | |
The label or ID of the destinations to specifically send alerts to. An empty list will suppress all alerts. | In YAML: OutputIds keyIn Console: Destination Overrides field | List[Destination Name/ID] |
In some scenarios, you may need to upgrade or downgrade the severity level of an alert. The severity levels of an alert can be mapped to INFO, LOW, MEDIUM, HIGH, CRITICAL, or DEFAULT. Return DEFAULT to fall back to the statically defined rule severity.
In all cases, the severity string returned is case insensitive, meaning you can return, for example,
Critical
or default
, depending on your style preferences.Example where a HIGH severity alert is returned if an API token is created - otherwise we create an INFO level alert:
def severity(event):
if event.get('eventType') == 'system.api_token.create':
return "HIGH"
return "INFO"
Example using
DEFAULT
:def severity(event):
if event.get('eventType') == 'system.api_token.create':
return "HIGH"
return "DEFAULT"
The title function is optional, but it is recommended to include it to provide additional context. In the example below, the log type, relevant username, and a static string are returned to the destination. The function checks to see if the event is related the AWS.CloudTrail log type and return the AWS Account Name if that is true. Learn more about how an alert title is set on Rules and Scheduled Rules.
Example:
def title(event):
# use unified data model field in title
log_type = event.get("p_log_type")
title_str = (
f"{log_type}: User [{event.udm('actor_user')}] has exceeded the failed logins threshold"
)
if log_type == "AWS.CloudTrail":
title_str += f" in [{lookup_aws_account_name(event.get('recipientAccountId'))}]"
return title_str
Deduplication is the process of grouping related events into a single alert to prevent receiving duplicate alerts. Events triggering the same detection that also share a deduplication string, within the deduplication period, are grouped together in a single alert. The
dedup
function is one way to define a deduplication string. It is limited to 1000 characters. Learn more about deduplication on Rules and Scheduled Rules. Example:
def dedup(event):
user_identity = event.get("userIdentity", {})
if user_identity.get("type") == "AssumedRole":
return helper_strip_role_session_id(user_identity.get("arn", ""))
return user_identity.get("arn")
By default, Alerts are sent to specific destinations based on severity level or log type event. Each Detection has the ability to override their default destination and send the Alert to one or more specific destination(s). In some scenarios, a destination override is required, providing more advance criteria based on the logic of the Rule.
Example:
A rule used for multiple log types utilizes the destinations function to reroute the Alert to another destination if the log type is "AWS.CloudTrail". The Alert is suppressed to this destination using
return ["SKIP"]
if the log type is not CloudTrail.def destinations(event):
if event.get("p_log_type") == "AWS.CloudTrail":
return ["slack-security-alerts"] ### Name or UUID of destination
# Do not send alert to an external destination
return ["SKIP"]
This function allows the detection to pass any event details as additional context, such as usernames, IP addresses, or success/failure, to the Alert destination(s).
Example:
The code below returns all event data in the alert context.
def rule(event):
return (
event.get("actionName") == "UPDATE_SAML_SETTINGS"
and event.get("actionResult") == "SUCCEEDED"
)
def alert_context(event):
return {
"user": event.udm("actor_user"),
"ip": event.udm("source_ip")
}
These functions can provide additional context around why an alert was triggered and how to resolve the related issue. Depending on what conditions are met, a string can be overridden and returned to the specified field in the alert.
The example below dynamically provides a link within the
runbook
field in an alert.def runbook(event):
log_type = event.get("p_log_type")
if log_type == "OnePassword.SignInAttempt":
return: f"<https://link/to/resource>"
elif log_type == "Okta.SystemLog":
return: f"<https://link/to/resource/2>"
else:
return: f"<https://default/link>"
Python Enhancement Proposals publishes resources on how to cleanly and effectively write and style your Python code. For example, you can use autopep8 to automatically ensure that your written detections all follow a consistent style.
The following Python libraries are available to be used in Panther in addition to
boto3
, provided by AWS Lambda:Package | Version | Description | License |
jsonpath-ng | 1.5.2 | JSONPath Implementation | Apache v2 |
policyuniverse | 1.3.3.20210223 | Parse AWS ARNs and Policies | Apache v2 |
requests | 2.23.0 | Easy HTTP Requests | Apache v2 |
Before enabling new detections, it is recommended to write tests that define scenarios where alerts should or should not be generated. Best practice dictates at least one positive and one negative to ensure the most reliability.
Lookups for event fields are not case sensitive.
event.get("Event_Type")
or event.get("event_type")
will return the same result.Top-level fields represent the parent fields in a nested data structure. For example, a record may contain a field called
user
under which there are other fields such as ip_address
. In this case, user
is the top-level field, and ip_address
is a nested field underneath it.Nesting can occur many layers deep, and so it is valuable to understand the schema structure and know how to access a given field for a detection.
Basic Rules match a field’s value in the event, and a best practice to avoid errors is to leverage Python’s built-in
get()
function.The example below is a best practice because it leverages a
get()
function. get()
will look for a field, and if the field doesn't exist, it will return None
instead of an error, which will result in the detection returning False
.def rule(event):
return event.get('field') == 'value'
In the example below, if the field exists, the value of the field will be returned. Otherwise,
False
will be returned:def rule(event):
if event.get('field')
return event.get('field')
return False
Bad practice example
The rule definition below is bad practice because the code is explicit about the field name. If the field doesn't exist, Python will throw a
KeyError
:def rule(event):
return event['field'] == 'value'
Once many detections are written, a set of patterns and repeated code will begin to emerge. This is a great use case for Global Helper functions, which provide a centralized location for this logic to exist across all detections. For example, see the
deep_get()
function referenced in the next section.If the field is nested deep within the event, use a Panther-supplied function called
deep_get()
to safely access the fields value. deep_get()
must be imported by the panther_base_helpers
library.deep_get()
takes two or more arguments:- The event object itself (required)
- The top-level field name (required)
- Any nested fields, in order (as many nested fields as needed)
Example:
AWS CloudTrail logs nest the type of user accessing the console underneath
userIdentity
.JSON CloudTrail root activity:
{
"eventVersion": "1.05",
"userIdentity": {
"type": "Root",
"principalId": "1111",
"arn": "arn:aws:iam::123456789012:root",
"accountId": "123456789012",
"userName": "root"
},
...
}
Here is how you could check that value safely with
deep_get
:from panther_base_helpers import deep_get
def rule(event):
return deep_get(event, "userIdentity", "type") == "Root"
You may want to know when a specific event has occurred. If it did occur, then the detection should trigger an alert. Since Panther stores everything as normalized JSON, you can check the value of a field against the criteria you specify.
For example, to detect the action of granting Box technical support access to your Box account, the Python below would be used to match events where the
event_type
equals ACCESS_GRANTED
:def rule(event):
return event.get("event_type") == "ACCESS_GRANTED"
If the field is
event_type
and the value is equal to ACCESS_GRANTED
then the rule function will return true
and an Alert will be created.You may need to compare the value of a field against integers. This allows you to use any of Python’s built-in comparisons against your events.
For example, you can create an alert based on HTTP response status codes:
# returns True if 'status_code' equals 404
def rule(event):
if event.get("status_code"):
return event.get("status_code") == 404
else:
return False
# returns True if 'status_code' greater than 400
def rule(event):
if event.get("status_code"):
return event.get("status_code") > 404
else:
return False
Reference:
Data Models provide a way to configure a set of unified fields across all log types. By default, Panther comes with built-in Data Models for several log types. Custom Data Models can be added in the Panther Console or via the Panther Analysis Tool.
event.udm()
can only be used with log types that have an existing Data Model in your Panther environment.Example:
import panther_event_type_helpers as event_type
def rule(event):
# filter events on unified data model field ‘event_type’
return event.udm("event_type") == event_type.FAILED_LOGIN
References:
The
and
keyword is a logical operator and is used to combine conditional statements. It is often required to match multiple fields in an event using the and
keyword. When using and
, all statements must be true:
"string_a" == "this"
and
"string_b" == "that"
Example:
To track down successful root user access to the AWS console you need to look at several fields:
from panther_base_helpers import deep_get
def rule(event):
return (event.get("eventName") == "ConsoleLogin" and
deep_get(event, "userIdentity", "type") == "Root" and
deep_get(event, "responseElements", "ConsoleLogin") == "Success")
The
or
keyword is a logical operator and is used to combine conditional statements. When using or
, either of the statements may be true:
"string_a" == "this"
or
"string_b" == "that"
Example:
This example detects if the field contains either Port 80 or Port 22:
# returns True if 'port_number' is 80 or 22
def rule(event):
return event.get("port_number") == 80 or event.get("port_number") == 22
Comparing and matching events against a list of IP addresses, domains, users etc. is very quick and easy in Python. This is often used in conjunction with choosing not to alert on an event if the field being checked also exists in the list. This helps with reducing false positives for known behavior in your environment.
Example: If you have a list of IP addresses that you would like to add to your allow list, but you want to know if an IP address comes through outside of that list, we recommend using a Python set. Sets are similar to Python lists and tuples, but are more memory efficient.
# Set - Recommended over tuples or lists for performance
ALLOW_IP = {'192.0.0.1', '192.0.0.2', '192.0.0.3'}
def rule(event):
return event.get("ip_address") not in ALLOW_IP
In the example below, we use the Panther helper
pattern_match_list
:from panther_base_helpers import pattern_match_list