Scheduled Search Examples

This page contains common use cases and example searches you may want to use while investigating suspicious activity in your logs.

The examples below will require some customization to the local environment to be effective. Note that all queries should control the result size. This can be done with a LIMIT or GROUP BY clause.

Your company will incur costs on your data platform every time a Scheduled Search runs. Please make sure that your queries can complete inside the specified timeout period.

All examples in this section use Snowflake style SQL.

Athena limitations and considerations
  • All Athena queries should be qualified with partition columns (year, month, day, hour) for performance reasons.

  • Athena (Presto) has issues with returning query results as proper JSON for complex objects. This can lead to unparsable results in Scheduled Rules where the Python rules engine will read the result as a JSON object.

    • To work around this limitation of Athena you should return scalar objects in your Scheduled Queries NOT complex objects. If multiple elements in a complex object are needed in the result then extract them in the SQL as separate scalar columns.

Streaming rules

Panther enables you to run Scheduled Searches (Saved Searches on an interval) and, in concert with a Scheduled Rule, allows you to create detections that work over large time spans and aggregate data from multiples sources.

When possible, try to implement detections using Streaming Rules. Streaming Rules are low latency and less expensive compared to Scheduled Searches. However, there are cases when more context is required for a detection and then a Scheduled Search is the appropriate solution.

Data latency and timing considerations

In order to write effective Scheduled Rules, you need to understand the latency of the data from the time the event is recorded until it reaches Panther. Use this information to adjust the scheduled and window of time used accordingly.

For example, AWS CloudTrail data has a latency of about 10 minutes. This is due to AWS limitations and not due to Panther functionality. If you want to analyze the last hour of data, as a best practice we recommend that you schedule the search to run 15 minutes past the hour.

This is not a consideration for Panther streaming rules, since they are applied when the data is first processed as it comes into the system. Since Scheduled Searches periodically look back at accumulated data, timing considerations are important.

Examples

Restrict AWS console access to specific IP addresses

Let's start with a very simple end to end example to better understand the process of creating a detection using a Scheduled Search and a Scheduled Rule. Let's assume you very carefully restrict access to the AWS console to IP addresses from with company IP space. In order to verify this control you want to check that all AWS console logins have a sourceIPAddress from within this IP space. You keep a table of these IP blocks in the datalake. NOTE: this example uses a simple equality join operation and assumes that the table of IP blocks are all /32 addresses. In a realistic implementation the IP block table would have CIDR blocks and the check would be to find those not inside the CIDR blocks. This is left as an exercise for the reader.

Let's assume we schedule a rule to run every 15 minutes, checking the previous 30 minutes (we are using long window handle inherent delay in data associated with CloudTrail).

Full disclosure: you could implement this detection using a Streaming Rule if you managed the IP whitelist table in Dynamodb or S3. For very high volume log sources it will be more efficient to do a periodic batch join as below.

The SQL to detect these events is:

SELECT
    ct.*
FROM
    panther_logs.public.aws_cloudtrail ct LEFT OUTER JOIN infrastructure.networking.egress_blocks egress
        ON (ct.sourceIPAddress = egress.ip)
WHERE
    p_occurs_since('30 minutes') 
    AND
    ct.eventtype = 'AwsConsoleSignIn'
    AND 
    egress.ip IS NULL -- NOT matching!
LIMIT 1000 -- we don't expect many of these BUT we have this here for safety!

Note the p_occurs_since() is a Panther SQL macro to make creating Scheduled Queries easier.

Since the output of a Scheduled Search flows through a Scheduled Rule (in Python) it is important to keep the number of rows returned carefully controlled. It is recommended to always provide a LIMIT clause or use GROUP BY aggregations that return limited number of rows (less than a few thousand maximum).

To implement this:

  1. Create a Scheduled Search by following these instructions.

  2. Make sure the Scheduled Search is set to active.

A Scheduled Rule has all the capability of a streaming rule, allowing you to customize alerts and direct the destinations. The deduping in Panther prevents alert storms, in the above rule we use the sourceIPAddress dedupe which will only create 1 alert per 30 minutes.

This pattern of joining to a list can also be used for IOC detections (keeping a table of IOCs such as TOR Exit Nodes, malware hashes ,etc).

Command and Control (C2) Beacon Detection

In this example we will create a very simple but effective behavioral detection that uses aggregation to find C2 beaconing. NOTE: this is an oversimplified detection for illustration purposes only. Using this without refinements such as whitelisting and tuning thresholds can cause excessive false positives (there are many non-malicious processes that "beacon"). That said, on well understood networks and using the appropriate whitelisting, this technique can be very effective.

We will define a C2 beacon as any IP activity that happens AT MOST 5 times day and repeats for more than 3 days. To implement that in SQL:

WITH low_and_slow_daily_ip_activity AS (
SELECT
    date(p_event_time) as day,
    srcAddr as beacon_ip
FROM
    panther_logs.public.aws_vpcflow
WHERE
    p_occurs_since('1 week')
GROUP BY 1,2
HAVING count(1) <= 5 -- only those will little activity, this requires tuning
)
SELECT
 beacon_ip,
 count(1) as days
FROM
 low_and_slow_daily_ip_activity
GROUP BY 
 1
HAVING days >= 3 -- only those that have activity at least this many days
LIMIT 20 -- avoid alert storms!

To implement this:

  1. Create a Scheduled Search by following these instructions.

  2. Make sure the Scheduled Search is set to active.

How Well is My Endpoint Monitoring Working?

For this hypothetical example, we will assume you are using CrowdStrike as your endpoint monitoring software. Panther is configured to ingest your logs and you have a CMDB populated that maps the deployed agents to their internal associated user(s).

There are many interesting questions that can be asked of this data but for this example we will specifically ask the question: "Which endpoints have not reported ANY data in the last 24 hours?"

In CrowdStrike logs the unique id for the deployed agent is called an aid . The CMDB has a mapping of aid to reference data. For this example we will assume it has the attributes employee_name, employee_group and last_seen. The employee related attributes help identify who currently uses the endpoint and the last_seen is a timestamp we assume is updated by a backend process that tracks network activity (e.g, VPN access, DHCP leases, Authentication, liveliness detections, etc).

To answer this question we want to know which agents in the CMDB that DO have network activity in the last 24 hours but do NOT have any CrowdStrike activity, which may indicate the agent is not running or has been disabled (indicating a coverage gap). The query below will compute a report by employee group that includes the specific suspect endpoints:

WITH active_aids AS (
SELECT
     DISTINCT aid -- just match the unique agent ids
FROM panther_logs.public.crowdstrike_aidmaster
WHERE p_occurs_since('1 day') -- in last 24 hours of log data
)
SELECT
    cmdb.employee_group,
    count(1) AS num_inactive_endpoints,
    ARRAY_AGG(
        DISTINCT cmdb.aid || ' ' || cmdb.employee_name || ' ' || cmdb.employee_group
    ) as endpoints -- here we collect the specific endpoints
FROM 
  infrastructure.inventory.cmdb AS cmdb LEFT OUTER JOIN active_aids cs USING(aid)
WHERE
  cs.aid IS NULL -- NOT matching any log data
AND 
  cmdb.last_seen BETWEEN current_date - 1 AND current_date -- is likely active
GROUP BY 1
ORDER BY 2 desc

To implement this:

  1. Create a Scheduled Search by following these instructions.

  2. Make sure the Scheduled Search is set to active.

  3. Make a Scheduled Rule targeted at the output of the Scheduled Search:

The events associated with the alert can be reviewed by an analyst which will be at most one per employee group. The "hits" are accumulated in endpoints using the employee info for easy vetting. As with all Panther rules, you have the flexibility to customize destinations of alert. For example, if the employee_group is C-Suite then perhaps that generates a page to the oncall, while the default alerts simply go to a work queue for vetting the next day.

Unusual Okta Logins

The Okta logs provide the answers to "who", "with what device" and the "where" questions associated with an event. This information can be used to identify suspicious behavior, for example an attacker using stolen credentials.

The challenge is defining "suspicious". One way to define suspicious is deviation from normal. If we can construct a baseline for each user, then we could alert when there is a significant change.

That sounds good, but now we have to define "significant change" in a way that generates useful security findings (and not many false positives). For this example we will target significant changes to the client information that might indicate stolen credentials. NOTE: the Okta data is very rich in context, this is just one simple example of how to make use of this data.

Because of VPNs and proxies, it is often not practical to just use specific IP addresses or related geographic information to identity suspect activity. Similarly, users may change their device because they are using a new one, or they may make use of multiple devices. We expect significant variation between legitimate users. However, for any particular user we expect there to be more consistency over time.

For this example, we will characterize "normal" by computing for each actor, for up to 30 days in the past:

  • unique auth clients used

  • unique os versions used

  • unique devices used

  • unique locations used (defined as: country, state, city)

We will define events that do NOT match ANY the four dimensions as "suspicious". This means:

  • We will not get alerts if they get a new device.

  • We will not get alerts when they change location.

  • We will get alerts when all of the attributes change at once,

    and we are assuming this is both anomalous and interesting from the security point of view.

We will also NOT consider actors unless they have at least 5 days of history, to avoid false positives from new employees.

Assume we schedule this to run once a day for the previous day.

This is just an example, and requires tuning like any other heuristic but has the benefit of being self calibrating per actor.

The SQL to compute the above is:

WITH actor_baseline AS (
  SELECT
    actor:id as actor_id,
    ARRAY_AGG (DISTINCT client:id) as client_id_list,
    ARRAY_AGG (DISTINCT client:userAgent.os)  as client_os_list,
    ARRAY_AGG (DISTINCT client:device) as client_devices_list,
    ARRAY_AGG (DISTINCT 
       client:geographicalContext:country || client:geographicalContext:state || client:geographicalContext:city)
      as client_locations_list
  FROM
    panther_logs.public.okta_systemlog
  WHERE
    p_occurs_since('30 days') 
  GROUP BY 1
  HAVING
    COUNT(DISTINCT date(p_event_time)) > 5 -- at least 5 days of history
)
-- compare current day to baseline, returning events that do not match ANY of the baseline attributes
SELECT
  logs.*
FROM
  panther_logs.public.okta_systemlog logs JOIN actor_baseline bl ON (actor:id = bl.actor_id)
WHERE
  p_occurs_since('1 day') 
  AND
  NOT ARRAY_CONTAINS(logs.client:id::variant, bl.client_id_list)
  AND
  NOT ARRAY_CONTAINS(logs.client:userAgent:os::variant, bl.client_os_list)
  AND
  NOT ARRAY_CONTAINS(logs.client:device::variant, bl.client_devices_list)
  AND
  NOT ARRAY_CONTAINS(
          (client:geographicalContext:country || 
           client:geographicalContext:state || 
           client:geographicalContext:city)::variant, bl.client_locations_list)

Recomputing these baselines each time a search is run is not very efficient. In the future Panther will be supporting the ability to create summary tables so that methods such as described above can be made more efficient.

Detecting Password Spraying

Password spraying is an attack that attempts to access numerous accounts (usernames) with a few commonly used passwords. Traditional brute-force attacks attempt to gain unauthorized access to a single account by guessing the password. This can quickly result in the targeted account getting locked-out, as commonly used account-lockout policies allow for a limited number of failed attempts (typically three to five) during a set period of time. During a password-spray attack (also known as the “low-and-slow” method), the malicious actor attempts a single commonly used password (such as ‘password123’ or ‘winter2017’) against many accounts before moving on to attempt a second password, and so on. This technique allows the actor to remain undetected by avoiding rapid or frequent account lockouts.

The key to detecting this behavior is to aggregate over time and look at the diversity of usernames with failed logins. The example below uses CloudTrail but a similar technique and be used with any authentication log. The thresholds chosen will need to be tuned to the target network.

SELECT
  -- this information will be in the alert events
  awsRegion as region,
  recipientAccountId as accountid,
  COUNT(DISTINCT useridentity:userName) as distinctUserNames,
  COUNT(1) as failures,
  MIN(p_event_time) as first_attempt,
  MAX(p_event_time) as last_attempt
FROM
  panther_logs.public.aws_cloudtrail
WHERE
  -- this is Panther macro that looks back 3600 seconds (1 hour)
  p_occurs_since('1 hour') 
  AND
  eventtype = 'AwsConsoleSignIn'
  AND
  responseElements:ConsoleLogin = 'Failure'
GROUP BY
  region, accountid
HAVING
  distinctUserNames > 5
   AND
  failures > 10

Detecting DNS Tunnels

Since DNS cannot generally be blocked on most networks, DNS based data exfiltration and C2 can be extremely effective. There are many tools available to create DNS-based tunnels. Not all DNS tunnels are malicious, ironically, many anti-virus tools use DNS tunnels to send telemetry back "home". Most security-minded people find DNS tunnels un-nerving, so detecting them on your network is useful. Simple traffic analysis can easily find these tunnels but because of legitimate tunnels, this below example will require some tuning to the local environment for both thresholds and whitelisting.

We will define a potential DNS tunnel as a DNS server (port 53) that moves enough data to be interesting with an hour's time to only a few UNIQUE domains.

Assume we run this search every hour, looking back 1 hour to identify these tunnels:

SELECT
  account_id,
  region,
  vpc_id,
  srcAddr, -- outside
  srcIds:instance, -- inside

  COUNT(1) as message_count,
  ARRAY_AGG(DISTINCT query_name) as query_names
FROM
  panther_logs.public.aws_vpcdns
WHERE
  p_occurs_since('1 hour')
  AND
  -- simple whitelisting
  query_name NOT LIKE '%amazonaws.com'
GROUP BY
  1,2,3,4,5
HAVING
  message_count >= 1000   -- decent amount of activity in an hour
   AND
  ARRAY_SIZE(query_names) <= 2 -- only a small number of distinct domains (not likely a real dns server!)

Monthly Reporting of Cloud Infrastructure

Given Panther Cloud Security can report on your AWS infrastructure, you can use the resource_history table to compute activity statistics that may be of interest to operations as well a security.

A simple example is the below report that can be scheduled run on the first of the month for the previous month to show the activity in the monitored accounts.

WITH monthly_report AS (
  SELECT
    accountId,
    changeType,
    ARRAY_AGG(DISTINCT resourceType) as resourceTypes,
    count(1) as objects
  FROM
    panther_cloudsecurity.public.resource_history
  WHERE
      DATE_TRUNC('MONTH', p_event_time) = DATE_TRUNC('MONTH', current_date - 1)  -- all last month cuz we run on the 1st!
    AND
    changeType <> 'SYNC' -- these are not changes but full records
  GROUP BY 1,2
)
-- we want the whole report as a single row of JSON objects we can forward in the Python rule
SELECT
  ARRAY_AGG(OBJECT_CONSTRUCT(*)) as monthly_report
FROM monthly_report

Example output:

{
    "monthly_report": [
        {
            "ACCOUNTID": "34924069XXXX",
            "CHANGETYPE": "DELETED",
            "OBJECTS": 8,
            "RESOURCETYPES": [
                "AWS.IAM.Role"
            ]
        },
        {
            "ACCOUNTID": "34924069XXXX",
            "CHANGETYPE": "MODIFIED",
            "OBJECTS": 2388,
            "RESOURCETYPES": [
                "AWS.CloudTrail.Meta",
                "AWS.S3.Bucket",
                "AWS.CloudFormation.Stack",
                "AWS.CloudTrail",
                "AWS.IAM.Role"
            ]
        },
        {
            "ACCOUNTID": "34924069XXXX",
            "CHANGETYPE": "CREATED",
            "OBJECTS": 11,
            "RESOURCETYPES": [
                "AWS.IAM.Role",
                "AWS.CloudFormation.Stack",
                "AWS.KMS.Key"
            ]
        }
    ]
}

The resource_history table has detail down to the specific resource, so there are variations of the above search that can be more detailed if desired.

Database Monitoring (Snowflake)

Databases holding sensitive data require extensive security monitoring as they are often targets of attack.

These queries require that Panther's read-only role has access to the snowflake.account_usage audit database (this may need to be done by the Snowflake admins).

 USE ROLE accountadmin;
 GRANT IMPORTED PRIVILEGES ON DATABASE snowflake TO ROLE panther_readonly_role;

This query looks for patterns of failed logins by username and should be run on a regular schedule:

  --return users with more than 2 failed logins in the previous 24 hours
  --this was adapted from a SnowAlert query
  SELECT 
    user_name,
    reported_client_type,
    ARRAY_AGG(DISTINCT error_code),
    ARRAY_AGG(DISTINCT error_message),
    COUNT(event_id) AS counts
  FROM snowflake.account_usage.login_history
  WHERE 1=1
    AND DATEDIFF(HOUR, event_timestamp, CURRENT_TIMESTAMP) < 24
    AND error_code IS NOT NULL
  GROUP BY reported_client_type, user_name
  HAVING counts >=3;

Snowflake failed logins by single IP looks at login attempts by IP over 24 hours and returns IPs with more than 2 failed logins. This could be scheduled to run every 24 hours to highlight potentially suspicious activity. The effectiveness of this approach may depend on how your enterprise handles company-internal IP addresses.

  --return IPs with more than 2 failed logins in the previous 24 hours
  --this was adapted from a SnowAlert query
  SELECT 
    client_ip,
    MIN(event_timestamp) event_start,
    MAX(event_timestamp) event_end,
    timediff(second, event_start, event_end) as event_duration,
    reported_client_type,
    ARRAY_AGG(DISTINCT error_code),
    ARRAY_AGG(DISTINCT error_message),
    COUNT(event_id) AS counts
  FROM snowflake.account_usage.login_history
  WHERE 1=1
    AND DATEDIFF(HOUR,  event_timestamp, CURRENT_TIMESTAMP) < 24
    AND error_code IS NOT NULL
  GROUP BY client_ip, reported_client_type
  HAVING counts >= 3;

Grants of admin rights in Snowflake, looking back 7 days. This is not necessarily suspicious, but possibly something the Snowflake admins may want to keep track of.

SELECT
    current_account() AS environment
     , REGEXP_SUBSTR(query_text, '\\s([^\\s]+)\\s+to\\s',1,1,'ie') AS role_granted
     , start_time AS event_time
     , query_text AS event_data
     , user_name AS user_name
     , role_name
     , query_type
     , query_id AS query_id
FROM snowflake.account_usage.query_history
WHERE 1=1
  AND DATEDIFF(DAY,  start_time, CURRENT_TIMESTAMP) <= 7
  AND query_type='GRANT'
  AND execution_status='SUCCESS'
  AND (role_granted ILIKE '%securityadmin%'  OR role_granted ILIKE '%accountadmin%' OR role_granted ILIKE '%admin%')

Querying account usage views

See Snowflake's documentation for additional examples on querying account usage.

Successful Okta Brute Force

In this example, we will observe how you may leverage Snowflake's MATCH_RECOGNIZE() function within Panther to measure a sequence of Okta login events.

Consider this: For any successful login, were there more than n number of failures that occurred beforehand? The following query gathers successful and unsuccessful login events as well as client and user agent information from within the last 60 minutes. Then, leveraging the MATCH_RECOGNIZE() function, it measures the number of successes and failures.

By defining a pattern of SUCCESS after n number of failures, we can take advantage of the HAVING condition to limit the output to events that fit the threshold. These thresholds could be fine-tuned to fit your organization's needs.

WITH
login_attempts AS ( -- filter for what we care about for speed
  SELECT
   p_event_time, 
   outcome:result AS outcome, 
   client:ipAddress AS clientIP, 
   client:userAgent.rawUserAgent AS userAgent, 
   actor
  FROM  panther_logs.public.okta_systemlog
  WHERE 
    outcome:result IN ('SUCCESS','FAIL','ALLOW','DENY')
    AND 
    p_occurs_since('60 minutes')
)

SELECT * from login_attempts
  MATCH_RECOGNIZE(
    PARTITION BY clientIP, userAgent, actor
    ORDER BY p_event_time DESC -- backwards in time
    MEASURES
      match_number() as match_number,
      first(p_event_time) as start_time,
      last(p_event_time) as end_time,
      count(*) as rows_in_sequence,
      count(row_with_success.*) as num_successes,
      count(row_with_fail.*) as num_fails
    ONE ROW PER MATCH
    AFTER MATCH SKIP TO LAST row_with_fail
    -- a success with fails following
    PATTERN(row_with_success row_with_fail+)
    DEFINE
      row_with_success AS outcome IN ('SUCCESS','ALLOW'),
      row_with_fail AS outcome IN ('FAIL','DENY')
  )
HAVING num_fails >= 8 -- how many fails must follow a success to qualify
ORDER BY clientIP, userAgent, actor, match_number

Last updated

#1924: [don't merge until ~Oct] Notion Logs (Beta)

Change request updated