Creating PyPanther Detections
Overview
You can import Panther-managed PyPanther Detections and make your own modifications, as well as create your own custom ones. After you've created detections, you can register, test, and upload them.
This page describes how to create PyPanther Detections in the CLI workflow. If you'd like to create PyPanther Detections in the Console instead, see Managing PyPanther Detections in the Panther Console.
High-level guidelines for creating PyPanther Detections
When working with PyPanther Detections in the CLI workflow:
A
main.py
file controls your entire detection configuration, outlining which rules to register, override configurations, and custom rule definitions. You can either:(Recommended) Define your detections in various other files/folders, then import them into
main.py
(learn more in the PyPanther Detections Style Guide)Define your detections in
main.py
A PyPanther rule is defined in a single Python file. Within it, you can import Panther-managed (or your own custom) PyPanther rules and specify overrides. A single Python file can define multiple detections.
All PyPanther rules subclass the
pypanther
Rule
class or a parent class of typeRule
.Rules must be registered to be tested and uploaded to your Panther instance.
All event object functions currently available in v1 detections are available in PyPanther Detections. These include:
get()
,deep_get()
,deep_walk()
, andudm()
.All alert functions available in Python (v1) detections are available in PyPanther Detections, such as
title()
andseverity()
. SeeRule
auxiliary/alerting function reference.Use the
pypanther
type-ahead hints in your IDE, like searching for available rules or viewing properties of classes.
Writing a custom PyPanther Detection
A "custom" PyPanther rule is one that you write completely from scratch—i.e., one that isn't built from a Panther-managed rule. Custom PyPanther rules are defined in a Python class that subclasses the pypanther
Rule
class. In this class, you must:
Define a
rule()
function(Optional) Define any of the other alert functions, like
title()
ordestinations()
Define certain attributes, such as
log_types
(Optional) Define additional attributes, such as
threshold
ordedup_period_minutes
See the Rule
property reference section for a full list of required and optional fields.
from pypanther import Rule, Severity, LogType
# Custom rule for a Panther-supported log type
class MyCloudTrailRule(Rule):
id = "MyCloudTrailRule"
tests = True
log_types = [LogType.AWS_CLOUDTRAIL]
default_severity = Severity.MEDIUM
threshold = 50
dedup_period_minutes = 1
def rule(self, event) -> bool:
return (
event.get("eventType") == "AssumeRole" and
400 <= int(event.get("errorCode", 0)) <= 413
)
Importing Panther-managed rules
Panther-managed rules can be imported directly or using the get_panther_rules()
function.
You may want to import Panther-managed rules (into main.py
or another file) to either register them individually as-is, set overrides on them, or subclass them. You can import Panther-managed rules directly (from the pypanther.rules
module) or by using the get_panther_rules()
function.
To import a Panther-managed rule directly using the rules
module, you would use a statement like:
from pypanther.rules.github import GitHubAdvancedSecurityChange
The get_panther_rules()
function can filter on any Rule
class attribute, such as default_severity
, log_types
, or tag
. When filtering, keys use AND
logic, and values use OR
logic.
Get all Panther-managed rules using get_panther_rules()
:
from pypanther import get_panther_rules
all_rules = get_panther_rules()
Get Panther-managed rules with certain severities using get_panther_rules()
:
from pypanther import get_panther_rules, Severity
important_rules = get_panther_rules(
default_severity=[
Severity.HIGH,
Severity.CRITICAL,
]
)
Get Panther-managed rules for certain log types using get_panther_rules()
:
from pypanther import get_panther_rules, LogType
cloudtrail_okta_rules = get_panther_rules(
log_types=[
LogType.AWS_CLOUDTRAIL,
LogType.OKTA_SYSTEM_LOG
]
)
Get Panther-managed rules that meet multiple criteria using get_panther_rules()
:
from pypanther import get_panther_rules, Severity
rules_i_care_about = get_panther_rules(
enabled=True,
default_severity=[Severity.CRITICAL, Severity.HIGH],
tags=["Configuration Required"],
)
Once you’ve imported a Panther-managed rule, you can modify it using overrides or inheritance.
Applying overrides on existing rules
When making overrides on Panther-managed detections, it's recommended to:
Outside of
main.py
, store all of your overrides inapply_overrides()
functions.In
main.py
, callpypanther
'sapply_overrides()
to apply each of yourapply_overrides()
functions.
Learn more about apply_overrides()
in PyPanther Detections Style Guide.
Overriding single attributes
You can override a single rule attribute in a one-line statement using the override()
function:
from pypanther import Severity
from pypanther.rules.aws_cloudtrail import AWSCloudTrailCreated
# Set rule severity to High
AWSCloudTrailCreated.override(default_severity=Severity.HIGH)
Overriding multiple attributes with the override function
It’s also possible to make multi-attribute overrides with the override()
function:
from pypanther import Severity
from pypanther.rules.box import BoxContentWorkflowPolicyViolation
BoxContentWorkflowPolicyViolation.override(
default_severity=Severity.HIGH,
default_runbook="Check if other internal users hit this same violation"
)
Applying overrides on multiple PyPanther rules
To apply overrides on multiple rules at once, iterate over the collection using a for
loop.
This could be useful, for example, when updating a certain attribute for all rules associated to a certain LogType
.
from pypanther import get_panther_rules, Severity, LogType
rules = get_panther_rules(
enabled=True,
default_severity=[Severity.CRITICAL, Severity.HIGH],
tags=["Configuration Required"],
)
# Define the destination [override](<https://docs.panther.com/detections/rules/python#destinations>)
def aws_cloudtrail_destinations(self, event):
if event.get('recipientAccountId') == 112233445566:
# Name or UUID of a configured Slack Destination
return ['AWS Notifications']
# Suppress the alert, doesn't deliver
return ['SKIP']
# Appending to the Panther-managed rules' destinations and tags using extend()
for rule in rules:
rule.extend(
default_destinations=["Slack #security"],
tags=["Production"],
)
# Updating the destinations method for CloudTrail rules
if LogType.AWS_CLOUDTRAIL in rule.log_types:
rule.destinations = aws_cloudtrail_destinations
Extending list attributes on existing rules
When making modifications to an existing rule, you might want to add items to a list-type rule attribute (like tags
, tests
, include_filters
, or exclude_filters
) while preserving the existing list.
Instead of overriding the attribute (using one of the methods in Applying overrides on existing rules), which would replace the existing list value, use the pypanther
extend()
function to append new values to the list attribute.
from pypanther import Severity
from pypanther.rules.box_rules.box_policy_violation import BoxContentWorkflowPolicyViolation
BoxContentWorkflowPolicyViolation.extend(
tags=["Production"],
include_filters=[lambda event: event.get("env") == "prod"]
)
# Because BoxContentWorkflowPolicyViolation already has a "Box" tag, it will now
# have two tags: "Production" and "Box"
# Because BoxContentWorkflowPolicyViolation does not already define include_filters,
# it will now have the single filter above
Creating include or exclude filters
PyPanther Detection filters let you exclude certain events from being evaluated by a rule. Filters are designed to be applied on top of existing rule logic (likely for Panther-managed PyPanther Detections you are importing).
Each filter defines logic that is run before the rule()
function, and the outcome of the filter determines whether or not the event should go on to be evaluated by the rule.
There are two types of filters:
include_filters
: If the filter returnsTrue
for an event, the event is evaluated byrule()
exclude_filters
: If the filter returnsTrue
for an event, the event is dismissed (i.e., not evaluated byrule()
)
The Rule
base class has include_filters
and exclude_filters
attributes, which each contain a list of functions that will be evaluated against the log event.
Examples as standalone functions:
from pypanther.rules.github import GitHubAdvancedSecurityChange
# Include only repos that are production
prod_repos = ["prod_repo1", "prod_repo2"]
GitHubAdvancedSecurityChange.extend(
include_filters=[lambda e: e.get("repo") in prod_repos]
)
from pypanther.rules.box import BoxContentWorkflowPolicyViolation
# Exclude accounts that are for development
dev_repos = ["dev_repo1", "dev_repo2"]
GitHubAdvancedSecurityChange.extend(
exclude_filters=[lambda e: e.get("repo") in dev_repos]
)
Example as part of an inherited rule definition:
from pypanther.wrap import include, exclude
from pypanther.rules.github import GitHubAdvancedSecurityChange
class GitHubAdvancedSecurityChangeOverride(GitHubAdvancedSecurityChange):
id = "GitHubAdvancedSecurityChangeOverride"
# In a real-world scenario, likely only one of the below would be necessary
# Extend parent rule's include_filters
include_filters = super().include_filters.append(lambda e: e.get("repo") in prod_repos)
# Override parent rule's exclude_filters
exclude_filters = [lambda e: e.get("repo") in dev_repos]
...
Filters can also be reused with a for
loop to be applied to multiple rules:
from pypanther.wrap import include
rules = [
# Panther-managed and custom rules
...
]
def prod_filter(event):
return event.get("repo") in prod_repos
for rule in rules:
rule.extend(include_filters=[prod_filter])
Ensuring necessary fields are set on configuration-required rules
Panther-managed rules that require some customer configuration before they are uploaded into a Panther environment may include a validate_config()
function, which defines one or more conditions that must be met for the rule to pass the test
command (and function properly).
Most commonly, validate_config()
verifies that some class variable, such as an allowlist or denylist, has been assigned a value. If the requirements included in validate_config()
are not met, an exception will be raised when the pypanther
test
command is run (if the rule is registered).
Example:
class ValidateMyRule(Rule):
id = "Validate.MyRule"
log_types = [LogType.PANTHER_AUDIT]
default_severity = Severity.HIGH
allowed_domains: list[str] = []
def rule(self, event):
return event.get("domain") not in self.allowed_domains
@classmethod
def validate_config(cls):
assert (
len(cls.allowed_domains) > 0
), "The allowed_domains field on your rule must be populated"
In this example, if allowed_domains
is not assigned a non-empty list, an assertion error will be thrown during pypanther
test
.
To set this value, you can use a statement like:
ValidateMyRule.allowed_domains = ["example.com"]
Creating PyPanther rules with inheritance
You can use inheritance to create rules that are subclasses of other rules (that are Panther-managed or custom).
It’s recommended to use inheritance when you’re creating a collection of rules that all share a number of characteristics—for example, rule()
function logic, property values, class variables, or helper functions. In this case, it’s useful to create a base rule that all related rules inherit from.
For example, it may be useful to create a base rule for each LogType
, from which all rules for that LogType
are extended.
If you don’t plan to register a base rule, it’s not required to provide it an id
property.
Example:
# Custom rule for a custom log type — the parent rule
class HostIDSBaseRule(Rule):
# id not necessary since we're not uploading this parent rule
log_types = ['Custom.HostIDS']
default_severity = Severity.HIGH
threshold = 1
dedup_period_minutes = 6 * 60 # 6 hours
def rule(self, event) -> bool:
return event.get('event_name') == 'confirmed_compromise'
def host_user_lookup(self, hostname):
return 'groot'
def title(self, event): -> str:
return f"Confirmed compromise from hostname {event.get('hostname')}"
def alert_context(self, event):
user = self.host_user_lookup(event.get('hostName')
return {
'hostname': event['hostName'],
'time': event['p_event_time'],
'user': user,
}
# Inherited rule #1
class IDSCommandAndControl(HostIDSBaseRule):
id = 'IDSCommandAndControl'
threshold = 19
# Filter on event_type (in addition to base rule function)
include_filters = [lambda e: e.get('event_type') == 'c2']
def title(self, event):
return f"Confirmed c2 on host {event.get('hostname')}"
# From parent rule, inherits rule(), alert_context(), log_types, severity, and dedup_period_minutes
# From pypanther Rule, inherits other fields (like Enabled)
# Inherited rule #2
class IDSCommandAndControl(HostIDSBaseRule):
id = 'HostIDSMalware'
threshold = 2
default_severity = Severity.CRITICAL
# Filter on event_type (in addition to base rule function)
include_filters = [lambda e: e.get('event_type') == 'malware']
def title(self, event):
return f"Confirmed malware on host {event.get('hostname')}"
# From parent rule, inherits rule(), alert_context(), log_types, and dedup_period_minutes
# From pypanther Rule, inherits other fields (like enabled)
Using advanced Python
Because PyPanther rules are fully defined in Python, you can use its full expressiveness when customizing your detections.
Calling super()
super()
For more advanced use cases, you can supplement the logic in functions defined by the parent rule.
# Creating an inherited rule
class MyRule(AnExistingRule):
def alert_context(self, event):
# Preserve the parent rule's alert context and extend with a new field
context = super().alert_context(event)
context["new field"] = "new_value"
return context
def severity(self, event):
# Conditionally increment the parent rule's severity
if event.get("env") == "prod":
return Severity.CRITICAL
return super().severity(event)
Using list comprehension
You can use Python’s list comprehension functionality to create a new list based on an existing list with condensed syntax. This may be particularly useful when you want to filter a list of detections fetched using get_panther_rules()
.
# Collection of rules that have more than one LogType
rules = [rule for rule in get_panther_rules() if len(rule.log_types) > 1]
# Collection of rules that do not require configuration
rules = [rule for rule in get_panther_rules() if "Configuration Required" not in rule.tags]
Last updated
Was this helpful?