# pip install gql aiohttp
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
transport = AIOHTTPTransport(
url="YOUR_PANTHER_API_URL",
headers={"X-API-Key": "YOUR_API_KEY"}
client = Client(transport=transport, fetch_schema_from_transport=True)
# `IssueQuery` is a nickname for the query. You can fully omit it.
mutation IssueQuery($input: ExecuteIndicatorSearchQueryInput!) {
executeIndicatorSearchQuery(input: $input) {
# `GetQueryResults` is a nickname for the query. You can fully omit it.
query GetQueryResults($id: ID!, $cursor: String) {
results(input: { cursor: $cursor }) {
# an accumulator that holds all results that we fetch from all pages
# a helper to know when to exit the loop
# Issue an Indicator Search query
mutation_data = client.execute(
"indicators": ["226103014039"],
"startTime": "2022-03-29T00:00:00.001Z",
"endTime": "2022-03-30T00:00:00.001Z",
"indicatorName": "p_any_aws_account_ids"
# Start polling the query until it returns results. From there,
# keep fetching pages until there are no more left
query_data = client.execute(
"id": mutation_data["executeIndicatorSearchQuery"]["id"],
# if it's still running, print a message and keep polling
if query_data["dataLakeQuery"]["status"] == "running":
print(query_data["dataLakeQuery"]["message"])
# if it's not running & it's not completed, then it's
# either cancelled or it has errored out. In this case,
if query_data["dataLakeQuery"]["status"] != "completed":
raise Exception(query_data["dataLakeQuery"]["message"])
all_results.extend([edge["node"] for edge in query_data["dataLakeQuery"]["results"]["edges"]])
has_more = query_data["dataLakeQuery"]["results"]["pageInfo"]["hasNextPage"]
cursor = query_data["dataLakeQuery"]["results"]["pageInfo"]["endCursor"]
print(f'Query returned {len(all_results)} results(s)!')