# 데이터 레이크 쿼리

## 개요

Panther API는 다음과 같은 데이터 레이크 작업을 지원합니다:

* 데이터 레이크의 데이터베이스, 테이블 및 열 목록 조회
* SQL을 사용하여 데이터 레이크(Data Explorer) 쿼리 실행
* 검색(Search) 쿼리 실행
* 현재 실행 중인 쿼리 취소
* 이전에 실행된 쿼리의 세부 정보 가져오기
* 선택적 필터와 함께 현재 실행 중이거나 이전에 실행된 모든 쿼리 나열

콘솔의 API 플레이그라운드 또는 GraphQL-over-HTTP API를 사용하여 Panther의 API를 호출할 수 있습니다. 이러한 방법에 대한 자세한 내용은 [Panther API](https://docs.panther.com/ko/panther/api/..#step-1-choose-a-method-for-invoking-the-api).

아래 섹션에서 GraphQL 쿼리, 뮤테이션 및 핵심 데이터 레이크 쿼리 작업에 관한 엔드투엔드 워크플로 예제를 참조하십시오.

{% hint style="warning" %}
API로 관리되는 쿼리는 SQL로 작성되어야 합니다; 그것들은 사용할 수 없습니다 [PantherFlow](https://docs.panther.com/ko/pantherflow).
{% endhint %}

## 일반적인 데이터 레이크 쿼리 작업

아래는 Panther에서 가장 일반적인 GraphQL 데이터 레이크 쿼리 작업의 예입니다. 이 예제들은 GraphQL 클라이언트(또는 `curl`)을 사용하여 Panther의 GraphQL API를 호출하기 위해 전송해야 하는 문서를 보여줍니다.

#### 데이터베이스 엔티티

{% tabs %}
{% tab title="모든 데이터베이스 엔티티 나열" %}

```graphql
# `AllDatabaseEntities`는 이 작업의 별칭입니다
query AllDatabaseEntities {
  dataLakeDatabases {
     이름
     description
     tables {
       이름
       description
       columns {
         이름
         description
         type
       }
     }
   }
 }
```

{% endtab %}

{% tab title="특정 데이터베이스의 엔티티 가져오기" %}

```graphql
# `DatabaseEntities`는 이 작업의 별칭입니다
query DatabaseEntities {
  dataLakeDatabase(name: "panther_logs.public") {
     이름
     description
     tables {
       이름
       description
       columns {
         이름
         description
         type
       }
     }
  }
}
```

{% endtab %}
{% endtabs %}

#### 쿼리 실행

{% tabs %}
{% tab title="데이터 레이크(Data Explorer) 쿼리 실행" %}

```graphql
# `IssueDataLakeQuery`는 이 작업의 별칭입니다
mutation IssueDataLakeQuery {
  executeDataLakeQuery(input: {
    sql: "select * from panther_logs.public.aws_alb limit 50"
  }) {
     id # 쿼리의 고유 ID
  }
}
```

{% endtab %}

{% tab title="인디케이터 검색(Indicator Search) 쿼리 실행" %}

```graphql
# `IssueIndicatorSearchQuery`는 이 작업의 별칭입니다
mutation IssueIndicatorSearchQuery {
  executeIndicatorSearchQuery(input: {
    indicators: ["286103014039", "126103014049"]
    startTime: "2022-04-01T00:00:00.000Z",
    endTime: "2022-04-30T23:59:59.000Z"
    indicatorName: p_any_aws_account_ids # 또는 자동 감지를 위해 비워둘 수 있음
  }) {
     id # 쿼리의 고유 ID
  }
}
```

{% endtab %}

{% tab title="쿼리 취소" %}

```graphql
# `AbandonQuery`는 이 작업의 별칭입니다
mutation AbandonQuery {
  cancelDataLakeQuery(input: { id: "1234-5678" }) {
     id # 취소된 ID 반환
  }
}
```

{% endtab %}
{% endtabs %}

#### 데이터 레이크 또는 검색 쿼리 결과 가져오기

데이터 레이크 또는 검색 쿼리를 실행하면 결과가 반환되기까지 몇 초에서 몇 분이 걸릴 수 있습니다. 쿼리가 완료되었는지 확인하려면 쿼리의 상태를 확인(폴링)해야 합니다.

다음 쿼리를 사용하여 쿼리 상태를 확인하고 가능한 경우 결과도 가져올 수 있습니다:

{% tabs %}
{% tab title="결과의 첫 페이지 가져오기" %}

```graphql
# `QueryResults`는 이 작업의 별칭입니다
query QueryResults {
  dataLakeQuery(id: "1234-1234-1234-1234") { # 쿼리의 고유 ID
    message
    status
    results {
      edges {
        node
      }
    }
  }
}
```

{% endtab %}

{% tab title="이후 결과 페이지 가져오기" %}

```graphql
# `QueryResults`는 이 작업의 별칭입니다
query QueryResults {
  dataLakeQuery(id: "1234-1234-1234-1234") { # 쿼리의 고유 ID
    message
    status
    results(input: { cursor: "5678-5678-5678-5678" }) { # `endCursor`의 값
      edges {
        node
      }
      pageInfo
        endCursor
        hasNextPage
      }
    }
  }
```

{% endtab %}
{% endtabs %}

의 예상 값 `status` 와 `results` 는 쿼리 상태에 따라 달라집니다:

* 쿼리가 아직 실행 중인 경우:
  * `status` 값을 갖습니다 `running`
  * `results` 값을 갖습니다 `null`
* 쿼리가 실패한 경우:
  * `status` 값을 갖습니다 `failed`
  * `results` 값을 갖습니다 `null` 오류 메시지는 에서 확인할 수 있습니다 `message` 키
* 쿼리가 완료된 경우
  * `status` 값을 갖습니다 `succeeded`
  * `results` 가 채워집니다

위의 모든 항목(및 의 가능한 값)과 함께 요청할 수 있는 추가 필드가 있습니다. `status`에 대해 알아보세요 [Panther API 스키마를 탐색하는 다양한 방법은 여기에서 확인하세요](https://docs.panther.com/ko/panther/api/..#step-4-discover-the-schema).

#### 데이터 레이크 또는 검색 쿼리의 메타데이터 가져오기

위의 예에서는 Panther 쿼리의 `results` 를 요청했습니다. 쿼리에 대한 추가 메타데이터를 요청하는 것도 가능합니다.

다음 예제에서는 첫 페이지 결과와 함께 이러한 메타데이터를 요청합니다:

```graphql
# `QueryMetadata`는 이 작업의 별칭입니다
query QueryMetadata {
  dataLakeQuery(id: "1234-1234-1234-1234") { # 쿼리의 고유 ID
    이름
    isScheduled
    issuedBy {
      ... on User {
        email
      }
      ... on APIToken {
        이름
      } 
    }
    sql
    message
    status
    startedAt
    completedAt
    results {
      edges {
        node
      }
    }
  }
}
```

#### 데이터 레이크 및 검색 쿼리 나열

{% tabs %}
{% tab title="첫 페이지 가져오기" %}

```graphql
# `ListDataLakeQueries`는 이 작업의 별칭입니다
query ListDataLakeQueries {
  dataLakeQueries {
    이름
    isScheduled
    issuedBy {
      ... on User {
        email
      }
      ... on APIToken {
        이름
    } 
    }
    sql
    message
    status
    startedAt
    completedAt
    results { # 각 쿼리에 대해 첫 페이지 결과만 가져옴
      edges {
        node
      }
    }
  }
```

{% endtab %}

{% tab title="이후 페이지 가져오기" %}

```graphql
# `ListDataLakeQueries`는 이 작업의 별칭입니다
query ListDataLakeQueries {
  dataLakeQueries(input: { cursor: "5678-5678-5678-5678" }) { # `endCursor`의 값
    이름
    isScheduled
    issuedBy {
      ... on User {
        email
      }
      ... on APIToken {
        이름
    } 
    }
    sql
    message
    status
    startedAt
    completedAt
    results { # 각 쿼리에 대해 첫 페이지 결과만 가져옴
      edges {
        node
      }
    }
    pageInfo {
      endCursor
      hasNextPage
    }
  }
```

{% endtab %}

{% tab title="필터된 집합 가져오기" %}

```graphql
# `ListDataLakeQueries`는 이 작업의 별칭입니다
query ListDataLakeQueries {
  dataLakeQueries(input: { contains: "aws_alb", isScheduled: true }) {
    이름
    isScheduled
    issuedBy {
      ... on User {
        email
      }
      ... on APIToken {
        이름
    } 
    }
    sql
    message
    status
    startedAt
    completedAt
    results { # 각 쿼리에 대해 첫 페이지 결과만 가져옴
      edges {
        node
      }
    }
  }
```

{% endtab %}
{% endtabs %}

## 종단 간 예제

아래에서는 [일반 작업](#common-operations) 예제를 바탕으로 엔드투엔드 흐름을 소개합니다.

#### 데이터 레이크(Data Explorer) 쿼리 실행

{% tabs %}
{% tab title="NodeJS" %}

```javascript
// npm install graphql graphql-request

import { GraphQLClient, gql } from 'graphql-request';

const client = new GraphQLClient(
  'YOUR_PANTHER_API_URL', 
  { headers: { 'X-API-Key': 'YOUR_API_KEY' } 
});

// `IssueQuery`는 쿼리의 별칭입니다. 완전히 생략할 수 있습니다.
const issueQuery = gql`
  mutation IssueQuery($sql: String!) {
    executeDataLakeQuery(input: { sql: $sql }) {
      id
    }
  }
`;

// `GetQueryResults`는 쿼리의 별칭입니다. 완전히 생략할 수 있습니다.
const getQueryResults = gql`
  query GetQueryResults($id: ID!, $cursor: String) {
    dataLakeQuery(id: $id) {
      message
      status
      results(input: { cursor: $cursor }) {
        edges {
          node
        }
        pageInfo {
          endCursor
          hasNextPage
        }
      }
    }
  }
`;

(async () => {
  try {
    // 우리가 가져오는 모든 결과 노드를 보관하는 누적기
    let allResults = [];
    // 루프를 종료할 시점을 알기 위한 헬퍼
    let hasMore = true;
    // 페이징 커서
    let cursor = null;

    // 쿼리 발행
    const mutationData = await client.request(issueQuery, {
      sql: 'select * from panther_logs.public.aws_alb limit 5',
    });

    // 쿼리가 결과를 반환할 때까지 폴링을 시작합니다. 그 이후로,
    // 더 이상 페이지가 없을 때까지 페이지를 계속 가져옵니다
    do {
      const queryData = await client.request(getQueryResults, {
        id: mutationData.executeDataLakeQuery.id,
        cursor,
      });

      // 여전히 실행 중이면 메시지를 출력하고 계속 폴링합니다
      if (queryData.dataLakeQuery.status === 'running') {
        console.log(queryData.dataLakeQuery.message);
        continue;
      }

      // 실행 중이 아니고 완료되지 않았다면,
      // 취소되었거나 오류가 발생한 경우입니다. 이 경우,
      // 예외를 던집니다
      if (queryData.dataLakeQuery.status !== 'succeeded') {
        throw new Error(queryData.dataLakeQuery.message);
      }

      allResults = [...allResults, ...queryData.dataLakeQuery.results.edges.map(edge => edge.node)];

      hasMore = queryData.dataLakeQuery.results.pageInfo.hasNextPage;
      cursor = queryData.dataLakeQuery.results.pageInfo.endCursor;
    } while (hasMore);

    console.log(`Your query returned ${allResults.length} result(s)!`);
  } catch (err) {
    console.error(err.response);
  }
})();
```

{% endtab %}

{% tab title="Python" %}

```python
# pip install gql aiohttp

from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport

transport = AIOHTTPTransport(
  url="YOUR_PANTHER_API_URL",
  headers={"X-API-Key": "YOUR_API_KEY"}
)

client = Client(transport=transport, fetch_schema_from_transport=True)

# `IssueQuery`는 쿼리의 별칭입니다. 완전히 생략할 수 있습니다.
issue_query = gql(
    """
    mutation IssueQuery($sql: String!) {
        executeDataLakeQuery(input: { sql: $sql }) {
            id
        }
    }
    """
)

# `GetQueryResults`는 쿼리의 별칭입니다. 완전히 생략할 수 있습니다.
get_query_results = gql(
    """
    query GetQueryResults($id: ID!, $cursor: String) {
        dataLakeQuery(id: $id) {
            message
            status
            results(input: { cursor: $cursor }) {
                edges {
                    node
                }
                pageInfo {
                    endCursor
                    hasNextPage
                }
            }
        }
    }
    """
)

# 우리가 모든 페이지에서 가져오는 모든 결과를 보관하는 누적기
all_results = []
# 루프를 종료할 시점을 알기 위한 헬퍼.
has_more = True
# 페이징 커서
cursor = None

# 데이터 레이크(Data Explorer) 쿼리 발행
mutation_data = client.execute(
    issue_query,
    variable_values={
        "sql": "select * from panther_logs.public.aws_alb limit 5"
    }
)

# 쿼리가 결과를 반환할 때까지 폴링을 시작합니다. 그 이후로,
# 더 이상 페이지가 없을 때까지 페이지를 계속 가져옵니다
while has_more:
    query_data = client.execute(
        get_query_results,
        variable_values = {
            "id": mutation_data["executeDataLakeQuery"]["id"],
            "cursor": cursor
        }
    )
    
    # 여전히 실행 중이면 메시지를 출력하고 계속 폴링합니다
    if query_data["dataLakeQuery"]["status"] == "running":
        print(query_data["dataLakeQuery"]["message"])
        continue
    
    # 실행 중이 아니고 완료되지 않았다면,
    # 취소되었거나 오류가 발생한 경우입니다. 이 경우,
    # 예외를 던집니다
    if query_data["dataLakeQuery"]["status"] != "succeeded":
        raise Exception(query_data["dataLakeQuery"]["message"])

    all_results.extend([edge["node"] for edge in query_data["dataLakeQuery"]["results"]["edges"]])
    has_more = query_data["dataLakeQuery"]["results"]["pageInfo"]["hasNextPage"]
    cursor = query_data["dataLakeQuery"]["results"]["pageInfo"]["endCursor"]

print(f'Query returned {len(all_results)} results(s)!')
```

{% endtab %}
{% endtabs %}

#### 검색(Search) 쿼리 실행

{% tabs %}
{% tab title="NodeJS" %}

```javascript
// npm install graphql graphql-request

import { GraphQLClient, gql } from 'graphql-request';

const client = new GraphQLClient(
  'YOUR_PANTHER_API_URL', 
  { headers: { 'X-API-Key': 'YOUR_API_KEY' } 
});

// `IssueQuery`는 쿼리의 별칭입니다. 완전히 생략할 수 있습니다.
const issueQuery = gql`
  mutation IssueQuery($input: ExecuteIndicatorSearchQueryInput!) {
    executeIndicatorSearchQuery(input: $input) {
      id
    }
  }
`;

// `GetQueryResults`는 쿼리의 별칭입니다. 완전히 생략할 수 있습니다.
const getQueryResults = gql`
  query GetQueryResults($id: ID!, $cursor: String) {
    dataLakeQuery(id: $id) {
      message
      status
      results(input: { cursor: $cursor }) {
        edges {
          node
        }
        pageInfo {
          endCursor
          hasNextPage
        }
      }
    }
  }
`;

(async () => {
  try {
    // 우리가 가져오는 모든 결과 노드를 보관하는 누적기
    let allResults = [];
    // 루프를 종료할 시점을 알기 위한 헬퍼
    let hasMore = true;
    // 페이징 커서
    let cursor = null;

    // 쿼리 발행
    const mutationData = await client.request(issueQuery, {
      input: {         
        indicators: ["226103014039"],
        startTime: "2022-03-29T00:00:00.001Z",
        endTime: "2022-03-30T00:00:00.001Z",
        indicatorName: "p_any_aws_account_ids"
      }
    });

    // 더 이상 페이지가 없을 때까지 계속 페이지를 가져옵니다
    do {
      const queryData = await client.request(getQueryResults, {
        id: mutationData.executeIndicatorSearchQuery.id,
        cursor,
      });

      // 여전히 실행 중이면 메시지를 출력하고 계속 폴링합니다
      if (queryData.dataLakeQuery.status === 'running') {
        console.log(queryData.dataLakeQuery.message);
        continue;
      }

      // 실행 중이 아니고 완료되지 않았다면,
      // 취소되었거나 오류가 발생한 경우입니다. 이 경우,
      // 예외를 던집니다
      if (queryData.dataLakeQuery.status !== 'succeeded') {
        throw new Error(queryData.dataLakeQuery.message);
      }

      allResults = [...allResults, ...queryData.dataLakeQuery.results.edges.map(edge => edge.node)];

      hasMore = queryData.dataLakeQuery.results.pageInfo.hasNextPage;
      cursor = queryData.dataLakeQuery.results.pageInfo.endCursor;
    } while (hasMore);

    console.log(`Your query returned ${allResults.length} result(s)!`);
  } catch (err) {
    console.error(err.response);
  }
})();
```

{% endtab %}

{% tab title="Python" %}

```python
# pip install gql aiohttp

from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport

transport = AIOHTTPTransport(
  url="YOUR_PANTHER_API_URL",
  headers={"X-API-Key": "YOUR_API_KEY"}
)

client = Client(transport=transport, fetch_schema_from_transport=True)

# `IssueQuery`는 쿼리의 별칭입니다. 완전히 생략할 수 있습니다.
issue_query = gql(
    """
    mutation IssueQuery($input: ExecuteIndicatorSearchQueryInput!) {
        executeIndicatorSearchQuery(input: $input) {
            id
        }
    }
    """
)

# `GetQueryResults`는 쿼리의 별칭입니다. 완전히 생략할 수 있습니다.
get_query_results = gql(
    """
    query GetQueryResults($id: ID!, $cursor: String) {
        dataLakeQuery(id: $id) {
            message
            status
            results(input: { cursor: $cursor }) {
                edges {
                    node
                }
                pageInfo {
                    endCursor
                    hasNextPage
                }
            }
        }
    }
    """
)

# 우리가 모든 페이지에서 가져오는 모든 결과를 보관하는 누적기
all_results = []
# 루프를 종료할 시점을 알기 위한 헬퍼
has_more = True
# 페이징 커서
cursor = None

# 인디케이터 검색(Indicator Search) 쿼리 실행
mutation_data = client.execute(
    issue_query,
    variable_values={
        "input": {         
            "indicators": ["226103014039"],
            "startTime": "2022-03-29T00:00:00.001Z",
            "endTime": "2022-03-30T00:00:00.001Z",
            "indicatorName": "p_any_aws_account_ids"
        }
    }
)

# 쿼리가 결과를 반환할 때까지 폴링을 시작합니다. 그 이후로,
# 더 이상 페이지가 없을 때까지 페이지를 계속 가져옵니다
while has_more:    
    query_data = client.execute(
        get_query_results,
        variable_values = {
            "id": mutation_data["executeIndicatorSearchQuery"]["id"],
            "cursor": cursor
        }
    )
    
    # 여전히 실행 중이면 메시지를 출력하고 계속 폴링합니다
    if query_data["dataLakeQuery"]["status"] == "running":
        print(query_data["dataLakeQuery"]["message"])
        continue
    
    # 실행 중이 아니고 완료되지 않았다면,
    # 취소되었거나 오류가 발생한 경우입니다. 이 경우,
    # 예외를 던집니다
    if query_data["dataLakeQuery"]["status"] != "succeeded":
        raise Exception(query_data["dataLakeQuery"]["message"])

    all_results.extend([edge["node"] for edge in query_data["dataLakeQuery"]["results"]["edges"]])
    has_more = query_data["dataLakeQuery"]["results"]["pageInfo"]["hasNextPage"]
    cursor = query_data["dataLakeQuery"]["results"]["pageInfo"]["endCursor"]

print(f'Query returned {len(all_results)} results(s)!')
```

{% endtab %}
{% endtabs %}
