Panther Developer Workflows: Detections
Workflows outside of the Panther Console that you can use to interact with your Panther account

Overview

Panther Developer Workflows are workflows that you can use outside of the Panther Console to interact with your Panther account, such as the Panther API, Panther Analysis Tool, and CI/CD workflows.
This page describes how to use the Panther Analysis Tool (PAT) to test and upload locally managed Detections, and how to optionally integrate with a CI/CD setup.
For information on using the Panther API, please see the Panther API documentation.

Using the Panther Analysis Tool

The panther_analysis_tool (PAT) is an open source utility for testing, packaging, and deploying Panther detections from source code. It's designed for developer-centric workflows such as managing your Panther analysis packs programmatically or within CI/CD pipelines.

Installing PAT

Installing with pip

First, ensure you have Python 3 installed.
To install PAT, run this command:
1
pip3 install panther_analysis_tool
Copied!

Building from source

If you'd prefer instead to run from source for development reasons, first setup your environment:
1
$ make install
2
$ pipenv run -- pip3 install -e .
Copied!

Updating the version

You can use the following utility script to update the version number in relevant files if a new release is being created:
1
cd bin/
2
./version_bump.py 0.10.9 #replace with the new version you are releasing
Copied!
If you would rather use the panther_analysis_tool outside of the virtual environment, install it directly:
1
$ make deps
2
$ pip3 install -e .
Copied!

PAT configuration file

PAT will read options from a configuration file called .panther_settings.yml located in your working directory. An example configuration file is included in this repo: example_panther_config.yml. It contains example syntax for supported options.
Note that options in the configuration file override options passed on the command line. For example if you set minimum_tests: 2 in the configuration file and --minimum-tests 1 on the command line, the minimum number of tests will be 2.

PAT commands and usage

Uploading packages to Panther directly

Panther SaaS customers: Please file a support ticket to gain upload access to your Panther environment.
Make sure to configure your environment with valid AWS credentials prior to running the command below. This command will upload based on the exported value of AWS_REGION.
To upload your analysis packs to your Panther Console, run the following command:
1
panther_analysis_tool upload --path <path-to-your-rules> --out tmp
Copied!
Analysis with the same ID are overwritten. Additionally, locally deleted rules/policies will not automatically be deleted in the database and must be removed manually. We recommend setting the Enabled property to false instead of deleting policies or rules for CLI driven workflows.

Creating a package to upload to the Panther Console

To create a package for uploading manually to the Panther Console, run the following command:
1
$ panther_analysis_tool zip --path tests/fixtures/valid_policies/ --out tmp
2
[INFO]: Testing analysis packs in tests/fixtures/valid_policies/
3
4
AWS.IAM.MFAEnabled
5
[PASS] Root MFA not enabled fails compliance
6
[PASS] User MFA not enabled fails compliance
7
8
[INFO]: Zipping analysis packs in tests/fixtures/valid_policies/ to tmp
9
[INFO]: <current working directory>/tmp/panther-analysis-2020-03-23T12-48-18.zip
Copied!

Deleting Rules, Policies, or Saved Queries with PAT

While panther_analysis_tool upload --path <directory> will upload everything from <directory>, it will not delete anything in your Panther instance if you simply remove a local file from <directory>. Instead, you can use the panther_analysis_tool delete command to explicitly delete detections from your Panther instance. To delete a specific detection, you can run the following command:
1
panther_analysis_tool delete --analysis-id MyRuleId
Copied!
This will interactively ask you for a confirmation before it deletes the detection. If you would like to delete without confirming, you can use the following command:
1
panther_analysis_tool delete --analysis-id MyRuleId --no-confirm
Copied!

Running Tests with PAT

Use the Panther Analysis Tool to load the defined specification files and evaluate unit tests locally:
1
panther_analysis_tool test --path <folder-name>
Copied!
To filter rules or policies based on certain attributes:
1
panther_analysis_tool test --path <folder-name> --filter RuleID=Category.Behavior.MoreInfo
Copied!

Available commands

1
$ panther_analysis_tool -h
2
3
4
usage: panther_analysis_tool [-h] [--version] [--debug] {release,test,upload,delete,test-lookup-table,zip} ...
5
6
Panther Analysis Tool: A command line tool for managing Panther policies and rules.
7
8
positional arguments:
9
{release,test,upload,delete,test-lookup-table,zip}
10
release Create release assets for repository containing panther detections. Generates a file called panther-analysis-all.zip and optionally generates panther-analysis-all.sig
11
test Validate analysis specifications and run policy and rule tests.
12
upload Upload specified policies and rules to a Panther deployment.
13
delete Delete policies, rules, or saved queries from a Panther deployment
14
test-lookup-table Validate a Lookup Table spec file.
15
zip Create an archive of local policies and rules for uploading to Panther.
16
17
optional arguments:
18
-h, --help show this help message and exit
19
--version show program's version number and exit
20
--debug
Copied!

Filtering PAT commands

The test, zip, and upload commands all support filtering. Filtering works by passing the --filter argument with a list of filters specified in the format KEY=VALUE1,VALUE2. The keys can be any valid field in a policy or rule. When using a filter, only anaylsis that matches each filter specified will be considered.
For example, the following command will test only items with the AnalysisType of policy AND the severity of High:
1
panther_analysis_tool test --path tests/fixtures/valid_policies --filter AnalysisType=policy Severity=High
2
[INFO]: Testing analysis packs in tests/fixtures/valid_policies
3
4
AWS.IAM.BetaTest
5
[PASS] Root MFA not enabled fails compliance
6
[PASS] User MFA not enabled fails compliance
Copied!
The following command will test items with the AnalysisType policy OR rule, AND the severity High:
1
panther_analysis_tool test --path tests/fixtures/valid_policies --filter AnalysisType=policy,rule Severity=High
2
[INFO]: Testing analysis packs in tests/fixtures/valid_policies
3
4
AWS.IAM.BetaTest
5
[PASS] Root MFA not enabled fails compliance
6
[PASS] User MFA not enabled fails compliance
7
8
AWS.CloudTrail.MFAEnabled
9
[PASS] Root MFA not enabled fails compliance
10
[PASS] User MFA not enabled fails compliance
Copied!
When writing policies or rules that refer to the global analysis types, be sure to include them in your filter. You can include an empty string as a value in a filter, and it will mean the filter is only applied if the field exists.
The following command will return an error, because the policy in question imports a global but the global does not have a severity so it is excluded by the filter:
1
panther_analysis_tool test --path tests/fixtures/valid_policies --filter AnalysisType=policy,global Severity=Critical
2
[INFO]: Testing analysis packs in tests/fixtures/valid_policies
3
4
AWS.IAM.MFAEnabled
5
[ERROR] Error loading module, skipping
6
7
Invalid: tests/fixtures/valid_policies/example_policy.yml
8
No module named 'panther'
9
10
[ERROR]: [('tests/fixtures/valid_policies/example_policy.yml', ModuleNotFoundError("No module named 'panther'"))]
Copied!
For this query to work as expected, you need to allow for the severity field to be absent:
1
panther_analysis_tool test --path tests/fixtures/valid_policies --filter AnalysisType=policy,global Severity=Critical,""
2
[INFO]: Testing analysis packs in tests/fixtures/valid_policies
3
4
AWS.IAM.MFAEnabled
5
[PASS] Root MFA not enabled fails compliance
6
[PASS] User MFA not enabled fails compliance
Copied!
Filters work for the zip and upload commands in the exact same way they work for the test command.
In addition to filtering, you can set a minimum number of unit tests with the --minimum-tests flag. Detections that don't have the minimum number of tests will be considered failing, and if --minimum-tests is set to 2 or greater it will also enforce that at least one test must return True and one must return False.
In the example below, even though the rules passed all their tests, they're still considered failing because they do not have the correct test coverage:
1
panther_analysis_tool test --path tests/fixtures/valid_policies --minimum-tests 2
2
% panther_analysis_tool test --path okta_rules --minimum-tests 2
3
[INFO]: Testing analysis packs in okta_rules
4
5
Okta.AdminRoleAssigned
6
[PASS] Admin Access Assigned
7
8
Okta.BruteForceLogins
9
[PASS] Failed login
10
11
Okta.GeographicallyImprobableAccess
12
[PASS] Non Login
13
[PASS] Failed Login
14
15
--------------------------
16
Panther CLI Test Summary
17
Path: okta_rules
18
Passed: 0
19
Failed: 3
20
Invalid: 0
21
22
--------------------------
23
Failed Tests Summary
24
Okta.AdminRoleAssigned
25
['Insufficient test coverage, 2 tests required but only 1 found.', 'Insufficient test coverage: expected at least one passing and one failing test.']
26
27
Okta.BruteForceLogins
28
['Insufficient test coverage, 2 tests required but only 1 found.', 'Insufficient test coverage: expected at least one passing and one failing test.']
29
30
Okta.GeographicallyImprobableAccess
31
['Insufficient test coverage: expected at least one passing and one failing test.']
Copied!

Writing Detections locally

Writing Detections locally means creating Python and metadata files that define a Panther Detection on your own machine. After writing Detections locally, you upload the files to your Panther Console (typically via the Panther Analysis Tool) to control your Detection content.
In Panther, there are three core Detection types:
  • Real-Time Rules that analyze data as soon as it's sent to Panther
  • Scheduled Rules that run after a SQL query has been executed
  • Policies that detect insecure cloud resources

File setup

Each detection consists of:
  • A Python file (a file with a .py extension) containing your detection/audit logic
  • A YAML or JSON specification file (a file with a .yml or .json extension) containing metadata attributes of the detection.
    • By convention, we give this file the same name as the Python file.
We recommend creating folders based on log/resource type to group your detections, such as suricata_rules or aws_s3_policies. You can use the open source Panther Analysis repo as a reference.
We also recommend managing these files in a version control system (VCS). Most commonly we see GitHub or GitLab for this, which are managed git providers.
It's best practice to create a fork of Panther's open source analysis repository, but you can also create your own repo from scratch.

Writing real-time and scheduled rules locally

Rules are Python functions to detect suspicious behaviors. Returning a value of True indicates suspicious activity, which triggers an alert.
  1. 1.
    Write your rule and save it (in your folder of choice) as my_new_rule.py: def rule(event):
    1
    def rule(event):
    2
    return 'prod' in event.get('hostName')
    Copied!
  2. 2.
    Create a metadata file using the template below:
    1
    AnalysisType: rule
    2
    DedupPeriodMinutes: 60 # 1 hour
    3
    DisplayName: Example Rule to Check the Format of the Spec
    4
    Enabled: true
    5
    Filename: my_new_rule.py
    6
    RuleID: Type.Behavior.MoreContext
    7
    Severity: High
    8
    LogTypes:
    9
    - LogType.GoesHere
    10
    Reports:
    11
    ReportName (like CIS, MITRE ATT&CK):
    12
    - The specific report section relevant to this rule
    13
    Tags:
    14
    - Tags
    15
    - Go
    16
    - Here
    17
    Description: >
    18
    This rule exists to validate the CLI workflows of the Panther CLI
    19
    Runbook: >
    20
    First, find out who wrote this the spec format, then notify them with feedback.
    21
    Reference: https://www.a-clickable-link-to-more-info.com
    Copied!
When this rule is uploaded, each of the fields you would normally populate in the UI will be auto-filled. See Rule Specification Reference below for a complete list of required and optional fields.

Rule Tests

Tests help validate that your rule will behave as intended and detect the early signs of a breach. In your spec file, add the Tests key with sample cases:
1
Tests:
2
-
3
Name: Name to describe our first test
4
LogType: LogType.GoesHere
5
ExpectedResult: true or false
6
Log:
7
{
8
"hostName": "test-01.prod.acme.io",
9
"user": "martin_smith",
10
"eventTime": "June 22 5:50:52 PM"
11
}
Copied!
We recommend running as many test cases as possible, including both true and false positives.

Writing Policies locally

Policies are Python functions to detect misconfigured cloud infrastructure. Returning a value of True indicates this resource is valid and properly configured. Returning False indicates a policy failure, which triggers an alert.
  1. 1.
    Write your policy and save it (in your folder of choice) as my_new_policy.py: def polcy(resource):
    1
    def polcy(resource):
    2
    return resource['Region'] != 'us-east-1'
    Copied!
  2. 2.
    Create a specification file using the template below:
    1
    AnalysisType: policy
    2
    Enabled: true
    3
    Filename: my_new_policy.py
    4
    PolicyID: Category.Type.MoreInfo
    5
    ResourceType:
    6
    - Resource.Type.Here
    7
    Severity: Info|Low|Medium|High|Critical
    8
    DisplayName: Example Policy to Check the Format of the Spec
    9
    Tags:
    10
    - Tags
    11
    - Go
    12
    - Here
    13
    Runbook: Find out who changed the spec format.
    14
    Reference: https://www.link-to-info.io
    Copied!
See the Policy Specification Reference below for a complete list of required and optional fields.

Policy Tests

In the spec file, add the following Tests key:
1
Tests:
2
-
3
Name: Name to describe our first test.
4
ResourceType: AWS.S3.Bucket
5
ExpectedResult: true
6
Resource:
7
{
8
"PublicAccessBlockConfiguration": null,
9
"Region": "us-east-1",
10
"Policy": null,
11
"AccountId": "123456789012",
12
"LoggingPolicy": {
13
"TargetBucket": "access-logs-us-east-1-100",
14
"TargetGrants": null,
15
"TargetPrefix": "acmecorp-fiancial-data/"
16
},
17
"EncryptionRules": [
18
{
19
"ApplyServerSideEncryptionByDefault": {
20
"SSEAlgorithm": "AES256",
21
"KMSMasterKeyID": null
22
}
23
}
24
],
25
"Arn": "arn:aws:s3:::acmecorp-fiancial-data",
26
"Name": "acmecorp-fiancial-data",
27
"LifecycleRules": null,
28
"ResourceType": "AWS.S3.Bucket",
29
"Grants": [
30
{
31
"Permission": "FULL_CONTROL",
32
"Grantee": {
33
"URI": null,
34
"EmailAddress": null,
35
"DisplayName": "admins",
36
"Type": "CanonicalUser",
37
"ID": "013ae1034i130431431"
38
}
39
}
40
],
41
"Versioning": "Enabled",
42
"ResourceId": "arn:aws:s3:::acmecorp-fiancial-data",
43
"Tags": {
44
"aws:cloudformation:logical-id": "FinancialDataBucket"
45
},
46
"Owner": {
47
"ID": "013ae1034i130431431",
48
"DisplayName": "admins"
49
},
50
"TimeCreated": "2020-06-13T17:16:36.000Z",
51
"ObjectLockConfiguration": null,
52
"MFADelete": null
53
}
Copied!
The value of Resource can be a JSON object copied directly from the Policies > Resources explorer.

Policy and rule unit test mocking

Both policy and rule tests support unit test mocking. In order to configure mocks for a particular test case, add the Mocks key to your test case. The Mocks key is used to define a list of functions you want to mock, and the value that should be returned when that function is called. Multiple functions can be mocked in a single test. For example, if we have a rule test and want to mock the function get_counter to always return a 1 and the function geoinfo_from_ip to always return a specific set of geo IP info, we could write our unit test like this:
1
Tests:
2
-
3
Name: Test With Mock
4
LogType: LogType.Custom
5
ExpectedResult: true
6
Mocks:
7
- objectName: get_counter
8
returnValue: 1
9
- objectName: geoinfo_from_ip
10
returnValue: >-
11
{
12
"region": "UnitTestRegion",
13
"city": "UnitTestCityNew",
14
"country": "UnitTestCountry"
15
}
16
Log:
17
{
18
"hostName": "test-01.prod.acme.io",
19
"user": "martin_smith",
20
"eventTime": "June 22 5:50:52 PM"
21
}
Copied!
Mocking allows us to emulate network calls without requiring API keys or network access in our CI/CD pipeline, and without muddying the state of external tracking systems (such as the panther KV store).

Customizing Detections

To manage custom detections, you can create a private fork of the Panther Analysis Github repo. Upon tagged releases, you can pull upstream changes from this public repo.
For instructions on forking a repo, see Github's documentation.

Getting Updates

When you want to pull in the latest changes from the repository, perform the following steps from your private repo:
1
# add the public repository as a remote
2
git remote add panther-upstream [email protected]:panther-labs/panther-analysis.git
3
4
# Pull in the latest changes
5
# Note: You may need to use the `--allow-unrelated-histories`
6
# flag if you did not maintain the history originally
7
git pull panther-upstream master
8
9
# Push the latest changes up to your forked repo and merge them
10
git push
Copied!

Data Models

To add a new data model using the panther_analysis_tool:
  1. 1.
    Create your DataModel specification file (e.g. data_models/aws_cloudtrail_datamodel.yml):
    1
    AnalysisType: datamodel
    2
    LogTypes:
    3
    - AWS.CloudTrail
    4
    DataModelID: AWS.CloudTrail
    5
    Filename: aws_cloudtrail_data_model.py
    6
    Enabled: true
    7
    Mappings:
    8
    - Name: actor_user
    9
    Path: $.userIdentity.userName
    10
    - Name: event_type
    11
    Method: get_event_type
    12
    - Name: source_ip
    13
    Path: sourceIPAddress
    14
    - Name: user_agent
    15
    Path: userAgent
    Copied!
  2. 2.
    If any Methods are defined, create an associated Python file (data_models/aws_cloudtrail_datamodel.py): Note: The Filename specification field is required if a Method is defined in a mapping. If Method is not used in any Mappings, no Python file is required.
    1
    from panther_base_helpers import deep_get
    2
    def get_event_type(event):
    3
    if event.get('eventName') == 'ConsoleLogin' and deep_get(event, 'userIdentity', 'type') == 'IAMUser':
    4
    if event.get('responseElements', {}).get('ConsoleLogin') == 'Failure':
    5
    return "failed_login"
    6
    if event.get('responseElements', {}).get('ConsoleLogin') == 'Success':
    7
    return "successful_login"
    8
    return None
    Copied!
  3. 3.
    Use this data model in a rule by:
    1. 1.
      Adding the LogType under the Rule specification LogType field
    2. 2.
      Adding the LogType to all the Rule's Test cases, in the p_log_type field
    3. 3.
      Leveraging the event.udm() method in the Rule's python logic:
1
AnalysisType: rule
2
DedupPeriodMinutes: 60
3
DisplayName: DataModel Example Rule
4
Enabled: true
5
Filename: my_new_rule.py
6
RuleID: DataModel.Example.Rule
7
Severity: High
8
LogTypes:
9
# Add LogTypes where this rule is applicable
10
# and a Data Model exists for that LogType
11
- AWS.CloudTrail
12
Tags:
13
- Tags
14
Description: >
15
This rule exists to validate the CLI workflows of the Panther CLI
16
Runbook: >
17
First, find out who wrote this the spec format, then notify them with feedback.
18
Tests:
19
- Name: test rule
20
ExpectedResult: true
21
# Add the LogType to the test specification in the 'p_log_type' field
22
Log: {
23
"p_log_type": "AWS.CloudTrail"
24
}
Copied!
1
def rule(event):
2
# filter events on unified data model field
3
return event.udm('event_type') == 'failed_login'
4
5
6
def title(event):
7
# use unified data model field in title
8
return '{}: User [{}] from IP [{}] has exceeded the failed logins threshold'.format(
9
event.get('p_log_type'), event.udm('actor_user'),
10
event.udm('source_ip'))
Copied!

Globals locally

Global functions allow common logic to be shared across either rules or policies. To declare them as code, add them into the global_helpers folder with a similar pattern to rules and policies.
Globals defined outside of the global_helpers folder will not be loaded.
First, create your Python file (global_helpers/acmecorp.py):
1
from fnmatch import fnmatch
2
3
RESOURCE_PATTERN = 'acme-corp-*-[0-9]'
4
5
6
def matches_internal_naming(resource_name):
7
return fnmatch(resource_name, RESOURCE_PATTERN)
Copied!
Then, create your specification file:
1
AnalysisType: global
2
GlobalID: acmecorp
3
Filename: acmecorp.py
4
Description: A set of helpers internal to acme-corp
Copied!
Finally, use this helper in a policy (or a rule):
1
import acmecorp
2
3
4
def policy(resource):
5
return acmecorp.matches_internal_naming(resource['Name'])
Copied!

Specification Reference

Required fields are in bold.

Rule Specification Reference

A complete list of rule specification fields:
Field Name
Description
Expected Value
AnalysisType
Indicates whether this analysis is a rule, policy, or global
rule
Enabled
Whether this rule is enabled
Boolean
FileName
The path (with file extension) to the python rule body
String
RuleID
The unique identifier of the rule
String
LogTypes
The list of logs to apply this rule to
List of strings
Severity
What severity this rule is
One of the following strings: Info, Low, Medium, High, or Critical
Description
A brief description of the rule
String
DedupPeriodMinutes
The time period (in minutes) during which similar events of an alert will be grouped together
15,30,60,180 (3 hours),720 (12 hours), or 1440 (24 hours)
DisplayName
A friendly name to show in the UI and alerts. The RuleID will be displayed if this field is not set.
String
OutputIds
Static destination overrides. These will be used to determine how alerts from this rule are routed, taking priority over default routing based on severity.
List of strings
Reference
The reason this rule exists, often a link to documentation
String
Reports
A mapping of framework or report names to values this rule covers for that framework
Map of strings to list of strings
Runbook
The actions to be carried out if this rule returns an alert, often a link to documentation
String
SummaryAttributes
A list of fields that alerts should summarize.
List of strings
Threshold
How many events need to trigger this rule before an alert will be sent.
Integer
Tags
Tags used to categorize this rule
List of strings
Tests
Unit tests for this rule.
List of maps

Policy Specification Reference

Required fields are in bold.
A complete list of policy specification fields:
Field Name
Description
Expected Value
AnalysisType
Indicates whether this specification is defining a policy or a rule
policy
Enabled
Whether this policy is enabled
Boolean
FileName
The path (with file extension) to the python policy body
String
PolicyID
The unique identifier of the policy
String
ResourceTypes
What resource types this policy will apply to
List of strings
Severity
What severity this policy is
One of the following strings: Info, Low, Medium, High, or Critical
ActionDelaySeconds
How long (in seconds) to delay auto-remediations and alerts, if configured
Integer
AutoRemediationID
The unique identifier of the auto-remediation to execute in case of policy failure
String
AutoRemediationParameters
What parameters to pass to the auto-remediation, if one is configured
Map
Description
A brief description of the policy
String
DisplayName
What name to display in the UI and alerts. The PolicyID will be displayed if this field is not set.
String
Reference
The reason this policy exists, often a link to documentation
String
Reports
A mapping of framework or report names to values this policy covers for that framework
Map of strings to list of strings
Runbook
The actions to be carried out if this policy fails, often a link to documentation
String
Tags
Tags used to categorize this policy
List of strings
Tests
Unit tests for this policy.
List of maps

Scheduled Query Specification Reference

Required fields are in bold.
A complete list of scheduled query specification fields:
Field Name
Description
Expected Value
AnalysisType
Indicates whether this analysis is a rule, policy, scheduled query, or global.
scheduled_query
QueryName
A friendly name to show in the UI.
String
Enabled
Whether this rule is enabled.
Boolean
Tags
Tags used to categorize this rule.
List of strings
Description
A brief description of the rule.
String
Query
A query that can run on any backend. If this field is specified, you should not specify a SnowflakeQuery or a AthenaQuery.
String
SnowflakeQuery
A query specifically for a snowflake backend.
String
AthenaQuery
A query specifically for Athena.
String
Schedule
The schedule that this this query should run. Can be expressed as a cron or in rate minutes. Note that cron and rate minutes are mutually exclusive.
Map

Data Model Specification Reference

Required fields are in bold.
A complete list of DataModel specification fields:
Field Name
Description
Expected Value
AnalysisType
Indicates whether this specification is defining a rule, policy, data model, or global
datamodel
DataModelID
The unique identifier of the data model
String
DisplayName
What name to display in the UI and alerts. The DataModelID will be displayed if this field is not set.
String
Enabled
Whether this data model is enabled
Boolean
FileName
The path (with file extension) to the python DataModel body
String
LogTypes
What log types this policy will apply to
Singleton List of strings
Mappings
Mapping from source field name or method to unified data model field name
List of Maps