PyPanther Detections are in closed beta starting with Panther version 1.108. Please share any bug reports and feature requests with your Panther support team.
Import Panther-managed rules (which you do or don't want to make overrides on) using get_panther_rules
If you defined apply_overrides functions, call pypanther's apply_overrides() to apply all of your changes. Learn more in Call apply_overrides(), below.
Import custom rules using get_rules
# Example main.py file
from pypanther import get_panther_rules, get_rules, register, apply_overrides
from content import rules, overrides
from content.helpers.custom_log_types import CustomLogType
# Load base rules
base_rules = get_panther_rules(
# log_types=[
# LogType.AWS_CLOUDTRAIL,
# LogType.AWS_GUARDDUTY,
# LogType.PANTHER_AUDIT,
# ],
# default_severity=[
# Severity.CRITICAL,
# Severity.HIGH,
# ],
)
# Load all local custom rules
custom_rules = get_rules(module=rules)
# Omit rules with custom log types, since they must be present in the Panther instance for upload to work
custom_rules = [rule for rule in custom_rules if not any(custom in rule.log_types for custom in CustomLogType)]
# Apply overrides
apply_overrides(module=overrides, rules=base_rules)
# Register all rules
register(base_rules + custom_rules)
Call apply_overrides()
The pypantherapply_overrides() convenience function lets you, in main.py, efficiently apply all detection overrides made in a separate file or folder.
apply_overrides() takes an imported package (folder) or module (file) name and an optional list of rules, and runs all functions named apply_overrides() from that package or module against the list of rules. (This is similar to how get_rules() works.) It's recommended to name this package or module overrides.
Each folder containing rules imported with apply_overrides() must contain an __init__.py file.
In the following example, the apply_overrides() functions in general.py and aws_cloudtrail.py are applied when apply_overrides(overrides, all_rules) is called in main.py:
# general.py in /overrides
def apply_overrides(rules):
for rule in rules:
rule.override(enabled=True)
# aws_cloudtrail.py in /overrides
from pypanther import Severity
from pypanther.rules.aws_cloudtrail import AWSCloudTrailStopped
def apply_overrides(rules):
AWSCloudTrailStopped.override(
default_severity=Severity.LOW,
default_runbook=(
"If the account is in production, investigate why CloudTrail was stopped. "
"If it was intentional, ensure that the account is monitored by another CloudTrail. "
"If it was not intentional, investigate the account for unauthorized access."
),
)
Best practices for PyPanther Detection writing
Use filters instead of overriding rule()
If you would like to alter the logic of a Panther-managed PyPanther Detection, it's recommended to use include/exclude filters instead of overriding the rule's rule() function. Filters are designed for this purpose—to be applied on top of existing rule logic. They are executed against each incoming event before the rule() logic, in order to determine if the rule should indeed process the event.
If you are significantly altering the rule logic, you might also consider writing a custom rule instead.
Use upgrade() or downgrade() in severity()
While each PyPanther rule must define a default_severity, it can also define a severity() function, whose value overrides default_severity to set the severity level of resulting alerts.
Within severity(), it's common practice to dynamically set the severity based on an event field value. One way to do this is to add some condition, which, if satisfied, returns a hard-coded Severity value. In the example below, if the actor associated to the event has an admin role, Severity.MEDIUM is returned, regardless of the value of self.default_severity:
# Example NOT using upgrade() or downgrade()
class MyRule(Rule):
...
default_severity = Severity.LOW
...
def severity(self, event):
if event.deep_get("actor", "role") == "admin":
return Severity.MEDIUM # returns MEDIUM
return self.default_severity # returns LOW
Note that you can reference the default_severity value with self.default_severity.
In the example above, if MyRule's default_severity was ever changed from Severity.LOW to, say, Severity.MEDIUM, you would also need to remember to update the Severity.MEDIUM in severity() (presumably to Severity.HIGH), to preserve the escalation.
Using upgrade() instead of hard-coding a Severity
Instead of setting the return value of severity() as a hard-coded Severity value (as is shown in the example above), it's recommended to call the Severity class's upgrade() and downgrade() functions on self.default_severity.
In this model, if your detection's default_severity value ever changes, you won't also need to make changes in the severity() function. In the example below, default_severity has been changed to Severity.MEDIUM, and the return value within the condition automatically adjusts to returning Severity.HIGH because it uses upgrade():
# Example using upgrade()
class MyRule(Rule):
...
default_severity = Severity.MEDIUM
...
def severity(self, event):
if event.deep_get("actor", "role") == "admin":
return self.default_severity.upgrade() # returns HIGH
return self.default_severity # returns MEDIUM
In order to use self.default_severity.upgrade() or self.default_severity.downgrade(), the detection's default_severity value must be a Severity object, not a string literal.
There may be rare instances when using a static value within severity() is preferable—for example, you may always want to return Severity.INFO when an event originates in a dev account, even if the default_severity later changes. This might look like:
def severity(self, event):
if is_dev_env(event.get("accountId")):
return Severity.INFO
return self.default_severity
Optionally use explicit typing
The pypanther base Rule class is typed, so when you create a custom detection (i.e., inherit from Rule), explicit typing is optional. Still, if you prefer to use explicit types, you can do so.
Example of a custom rule with explicit typing
from time import strptime
from typing import Dict, List
from panther_core.enriched_event import PantherEvent
from pypanther import LogType, Rule, RuleTest, Severity
from pypanther.base import SeverityType
from pypanther.helpers.aws import aws_guardduty_context
from pypanther.severity import SEVERITY_DEFAULT
class MyTypedRule(Rule):
log_types: List[LogType | str] = [LogType.AWS_GUARDDUTY]
id: str = "AWS.GuardDuty.HighVolFindings"
create_alert: bool = True
dedup_period_minutes = 45
display_name: str = "High volume of GuardDuty findings"
enabled: bool = True
threshold = 100
tags: List[str] = ["GuardDuty", "Security"]
reports: Dict[str, List[str]] = {"MITRE ATT&CK": ["TA0010:T1499"]}
default_severity: Severity | str = Severity.HIGH
default_destinations: List[str] = ["slack:my-channel"]
default_description: str = "This rule tracks high volumes of GuardDuty findings"
def rule(self, event: PantherEvent) -> bool:
if event.deep_get("service", "additionalInfo", "sample"):
# in case of sample data
# https://docs.aws.amazon.com/guardduty/latest/ug/sample_findings.html
return False
return 7.0 <= float(event.get("severity", 0)) <= 8.9
def title(self, event: PantherEvent) -> str:
return event.get("title", "GuardDuty finding")
def severity(self, event: PantherEvent) -> SeverityType:
# Parse timestamp: "createdAt": "2020-02-14T18:12:22.316Z"
timestamp = strptime(event.get("createdAt", "1970-01-01T00:00:00Z"), "%Y-%m-%dT%H:%M:%S.%fZ")
# Increase severity if it's the weekend
if timestamp.tm_wday in (5, 6):
return Severity.CRITICAL
return SEVERITY_DEFAULT
def alert_context(self, event: PantherEvent) -> dict:
return aws_guardduty_context(event)
tests: List[RuleTest] = [
RuleTest(
name="High Sev Finding",
expected_result=True,
log={
"schemaVersion": "2.0",
"accountId": "123456789012",
"region": "us-east-1",
"partition": "aws",
"arn": "arn:aws:guardduty:us-west-2:123456789012:detector/111111bbbbbbbbbb5555555551111111/finding/90b82273685661b9318f078d0851fe9a",
"type": "PrivilegeEscalation:IAMUser/AdministrativePermissions",
"service": {
"serviceName": "guardduty",
"detectorId": "111111bbbbbbbbbb5555555551111111",
"action": {
"actionType": "AWS_API_CALL",
"awsApiCallAction": {
"api": "PutRolePolicy",
"serviceName": "iam.amazonaws.com",
"callerType": "Domain",
"domainDetails": {"domain": "cloudformation.amazonaws.com"},
"affectedResources": {"AWS::IAM::Role": "arn:aws:iam::123456789012:role/IAMRole"},
},
},
"resourceRole": "TARGET",
"additionalInfo": {},
"evidence": None,
"eventFirstSeen": "2020-02-14T17:59:17Z",
"eventLastSeen": "2020-02-14T17:59:17Z",
"archived": False,
"count": 1,
},
"severity": 8,
"id": "eeb88ab56556eb7771b266670dddee5a",
"createdAt": "2020-02-14T18:12:22.316Z",
"updatedAt": "2020-02-14T18:12:22.316Z",
"title": "Principal AssumedRole:IAMRole attempted to add a policy to themselves that is highly permissive.",
"description": "Principal AssumedRole:IAMRole attempted to add a highly permissive policy to themselves.",
},
),
RuleTest(
name="High Sev Finding As Sample Data",
expected_result=False,
log={
"schemaVersion": "2.0",
"accountId": "123456789012",
"region": "us-east-1",
"partition": "aws",
"arn": "arn:aws:guardduty:us-west-2:123456789012:detector/111111bbbbbbbbbb5555555551111111/finding/90b82273685661b9318f078d0851fe9a",
"type": "PrivilegeEscalation:IAMUser/AdministrativePermissions",
"service": {
"serviceName": "guardduty",
"detectorId": "111111bbbbbbbbbb5555555551111111",
"action": {
"actionType": "AWS_API_CALL",
"awsApiCallAction": {
"api": "PutRolePolicy",
"serviceName": "iam.amazonaws.com",
"callerType": "Domain",
"domainDetails": {"domain": "cloudformation.amazonaws.com"},
"affectedResources": {"AWS::IAM::Role": "arn:aws:iam::123456789012:role/IAMRole"},
},
},
"resourceRole": "TARGET",
"additionalInfo": {"sample": True},
"evidence": None,
"eventFirstSeen": "2020-02-14T17:59:17Z",
"eventLastSeen": "2020-02-14T17:59:17Z",
"archived": False,
"count": 1,
},
"severity": 8,
"id": "eeb88ab56556eb7771b266670dddee5a",
"createdAt": "2020-02-14T18:12:22.316Z",
"updatedAt": "2020-02-14T18:12:22.316Z",
"title": "Principal AssumedRole:IAMRole attempted to add a policy to themselves that is highly permissive.",
"description": "Principal AssumedRole:IAMRole attempted to add a highly permissive policy to themselves.",
},
),
]