# 데이터 레이크 쿼리

## 개요

Panther API는 다음 데이터 레이크 작업을 지원합니다:

* 데이터 레이크의 데이터베이스, 테이블, 열 나열
* SQL을 사용하여 데이터 레이크(Data Explorer) 쿼리 실행
* Search 쿼리 실행
* 현재 실행 중인 모든 쿼리 취소
* 이전에 실행된 모든 쿼리의 세부 정보 가져오기
* 선택적 필터와 함께 현재 실행 중이거나 이전에 실행된 모든 쿼리 나열

콘솔의 API Playground 또는 GraphQL-over-HTTP API를 사용하여 Panther의 API를 호출할 수 있습니다. 이러한 방법에 대해 자세히 알아보려면 [Panther API](https://docs.panther.com/ko/panther/api/..#step-1-choose-a-method-for-invoking-the-api).

핵심 데이터 레이크 쿼리 작업에 대한 GraphQL 쿼리, mutation, 그리고 엔드투엔드 워크플로 예시는 아래 섹션을 참조하세요.

{% hint style="warning" %}
API를 통해 관리되는 쿼리는 SQL로 작성해야 하며, 사용할 수 없습니다 [PantherFlow](https://docs.panther.com/ko/pantherflow).
{% endhint %}

## 일반적인 데이터 레이크 쿼리 작업

아래는 Panther에서 가장 일반적인 GraphQL 데이터 레이크 쿼리 작업 중 일부입니다. 이 예시들은 GraphQL 클라이언트(또는 `curl`)를 사용하여 Panther의 GraphQL API를 호출할 때 전송해야 하는 문서를 보여줍니다.

#### 데이터베이스 엔터티

{% tabs %}
{% tab title="모든 데이터베이스 엔터티 나열" %}

```graphql
# `AllDatabaseEntities`는 작업의 별칭입니다
query AllDatabaseEntities {
  dataLakeDatabases {
     name
     description
     tables {
       name
       description
       columns {
         name
         description
         유형
       }
     }
   }
 }
```

{% endtab %}

{% tab title="특정 데이터베이스의 엔터티 가져오기" %}

```graphql
# `DatabaseEntities`는 작업의 별칭입니다
query DatabaseEntities {
  dataLakeDatabase(name: "panther_logs.public") {
     name
     description
     tables {
       name
       description
       columns {
         name
         description
         유형
       }
     }
  }
}
```

{% endtab %}
{% endtabs %}

#### 쿼리 실행

{% tabs %}
{% tab title="데이터 레이크(Data Explorer) 쿼리 실행" %}

```graphql
# `IssueDataLakeQuery`는 작업의 별칭입니다
mutation IssueDataLakeQuery {
  executeDataLakeQuery(input: {
    sql: "select * from panther_logs.public.aws_alb limit 50"
  }) {
     id # 쿼리의 고유 ID
  }
}
```

{% endtab %}

{% tab title="Indicator Search 쿼리 실행" %}

```graphql
# `IssueIndicatorSearchQuery`는 작업의 별칭입니다
mutation IssueIndicatorSearchQuery {
  executeIndicatorSearchQuery(input: {
    indicators: ["286103014039", "126103014049"]
    startTime: "2022-04-01T00:00:00.000Z",
    endTime: "2022-04-30T23:59:59.000Z"
    indicatorName: p_any_aws_account_ids # 또는 자동 감지를 위해 비워 두세요
  }) {
     id # 쿼리의 고유 ID
  }
}
```

{% endtab %}

{% tab title="쿼리 취소" %}

```graphql
# `AbandonQuery`는 작업의 별칭입니다
mutation AbandonQuery {
  cancelDataLakeQuery(input: { id: "1234-5678" }) {
     id # 취소된 ID를 반환
  }
}
```

{% endtab %}
{% endtabs %}

#### 데이터 레이크 또는 Search 쿼리의 결과 가져오기

데이터 레이크 또는 Search 쿼리를 실행하면 결과가 반환되기까지 몇 초에서 몇 분이 걸릴 수 있습니다. 쿼리가 완료되었는지 확인하려면 쿼리 상태를 확인(폴링)해야 합니다.

다음 쿼리를 사용하여 쿼리 상태를 확인하는 동시에, 가능하면 결과도 가져올 수 있습니다:

{% tabs %}
{% tab title="결과의 첫 페이지 가져오기" %}

```graphql
# `QueryResults`는 작업의 별칭입니다
query QueryResults {
  dataLakeQuery(id: "1234-1234-1234-1234") { # 쿼리의 고유 ID
    message
    status
    results {
      edges {
        node
      }
    }
  }
}
```

{% endtab %}

{% tab title="결과의 다음 페이지 가져오기" %}

```graphql
# `QueryResults`는 작업의 별칭입니다
query QueryResults {
  dataLakeQuery(id: "1234-1234-1234-1234") { # 쿼리의 고유 ID
    message
    status
    results(input: { cursor: "5678-5678-5678-5678" }) { # `endCursor`의 값
      edges {
        node
      }
      pageInfo
        endCursor
        hasNextPage
      }
    }
  }
```

{% endtab %}
{% endtabs %}

예상되는 값은 `status` 및 `results` 쿼리 상태에 따라 달라집니다:

* 쿼리가 아직 실행 중이면:
  * `status` 값이 `running`
  * `results` 값이 `null`
* 쿼리가 실패하면:
  * `status` 값이 `failed`
  * `results` 값이 `null` 그리고 오류 메시지는 `message` 키
* 쿼리가 완료되면
  * `status` 값이 `succeeded`
  * `results` 가 채워집니다

위의 모든 내용(및 `status`)의 가능한 값과, 요청할 수 있는 추가 필드를 포함합니다. Panther API 스키마를 탐색하는 다양한 방법에 대해 알아보세요 [여기에서](https://docs.panther.com/ko/panther/api/..#step-4-discover-the-schema).

#### 데이터 레이크 또는 Search 쿼리의 메타데이터 가져오기

위 예시에서는 Panther 쿼리의 `results` 를 요청했습니다. 쿼리에 대한 추가 메타데이터를 요청하는 것도 가능합니다.

다음 예시에서는 결과의 첫 페이지와 함께 이러한 메타데이터를 요청합니다:

```graphql
# `QueryMetadata`는 작업의 별칭입니다
query QueryMetadata {
  dataLakeQuery(id: "1234-1234-1234-1234") { # 쿼리의 고유 ID
    name
    isScheduled
    issuedBy {
      ... on User {
        email
      }
      ... on APIToken {
        name
      } 
    }
    sql
    message
    status
    startedAt
    completedAt
    results {
      edges {
        node
      }
    }
  }
}
```

#### 데이터 레이크 및 Search 쿼리 나열

{% tabs %}
{% tab title="첫 페이지 가져오기" %}

```graphql
# `ListDataLakeQueries`는 작업의 별칭입니다
query ListDataLakeQueries {
  dataLakeQueries {
    name
    isScheduled
    issuedBy {
      ... on User {
        email
      }
      ... on APIToken {
        name
    } 
    }
    sql
    message
    status
    startedAt
    completedAt
    results { # 각 쿼리의 첫 번째 결과 페이지만 가져옵니다
      edges {
        node
      }
    }
  }
```

{% endtab %}

{% tab title="다음 페이지 가져오기" %}

```graphql
# `ListDataLakeQueries`는 작업의 별칭입니다
query ListDataLakeQueries {
  dataLakeQueries(input: { cursor: "5678-5678-5678-5678" }) { # `endCursor`의 값
    name
    isScheduled
    issuedBy {
      ... on User {
        email
      }
      ... on APIToken {
        name
    } 
    }
    sql
    message
    status
    startedAt
    completedAt
    results { # 각 쿼리의 첫 번째 결과 페이지만 가져옵니다
      edges {
        node
      }
    }
    pageInfo {
      endCursor
      hasNextPage
    }
  }
```

{% endtab %}

{% tab title="필터링된 집합 가져오기" %}

```graphql
# `ListDataLakeQueries`는 작업의 별칭입니다
query ListDataLakeQueries {
  dataLakeQueries(input: { contains: "aws_alb", isScheduled: true }) {
    name
    isScheduled
    issuedBy {
      ... on User {
        email
      }
      ... on APIToken {
        name
    } 
    }
    sql
    message
    status
    startedAt
    completedAt
    results { # 각 쿼리의 첫 번째 결과 페이지만 가져옵니다
      edges {
        node
      }
    }
  }
```

{% endtab %}
{% endtabs %}

## 엔드투엔드 예제

아래에서는 [일반적인 작업](#common-operations) 예제를 바탕으로 엔드투엔드 흐름을 보여드리겠습니다.

#### 데이터 레이크(Data Explorer) 쿼리 실행

{% tabs %}
{% tab title="NodeJS" %}

```javascript
// npm install graphql graphql-request

import { GraphQLClient, gql } from 'graphql-request';

const client = new GraphQLClient(
  'YOUR_PANTHER_API_URL', 
  { headers: { 'X-API-Key': 'YOUR_API_KEY' } 
});

// `IssueQuery`는 쿼리의 별칭입니다. 완전히 생략할 수도 있습니다.
const issueQuery = gql`
  mutation IssueQuery($sql: String!) {
    executeDataLakeQuery(input: { sql: $sql }) {
      id
    }
  }
`;

// `GetQueryResults`는 쿼리의 별칭입니다. 완전히 생략할 수도 있습니다.
const getQueryResults = gql`
  query GetQueryResults($id: ID!, $cursor: String) {
    dataLakeQuery(id: $id) {
      message
      status
      results(input: { cursor: $cursor }) {
        edges {
          node
        }
        pageInfo {
          endCursor
          hasNextPage
        }
      }
    }
  }
`;

(async () => {
  try {
    // 가져오는 모든 결과 노드를 보관하는 누적 변수
    let allResults = [];
    // 루프를 언제 종료할지 알기 위한 헬퍼
    let hasMore = true;
    // 페이지네이션 커서
    let cursor = null;

    // 쿼리 실행
    const mutationData = await client.request(issueQuery, {
      sql: 'select * from panther_logs.public.aws_alb limit 5',
    });

    // 결과가 반환될 때까지 쿼리 폴링을 시작합니다. 그 다음에는,
    // 더 이상 남은 페이지가 없을 때까지 계속 가져옵니다
    do {
      const queryData = await client.request(getQueryResults, {
        id: mutationData.executeDataLakeQuery.id,
        cursor,
      });

      // 아직 실행 중이면 메시지를 출력하고 계속 폴링합니다
      if (queryData.dataLakeQuery.status === 'running') {
        console.log(queryData.dataLakeQuery.message);
        continue;
      }

      // 실행 중도 아니고 완료도 아니라면,
      // 취소되었거나 오류가 발생한 것입니다. 이 경우,
      // 예외를 발생시킵니다
      if (queryData.dataLakeQuery.status !== 'succeeded') {
        throw new Error(queryData.dataLakeQuery.message);
      }

      allResults = [...allResults, ...queryData.dataLakeQuery.results.edges.map(edge => edge.node)];

      hasMore = queryData.dataLakeQuery.results.pageInfo.hasNextPage;
      cursor = queryData.dataLakeQuery.results.pageInfo.endCursor;
    } while (hasMore);

    console.log(`쿼리에서 ${allResults.length}개의 결과를 반환했습니다!`);
  } catch (err) {
    console.error(err.response);
  }
})();
```

{% endtab %}

{% tab title="Python" %}

```python
# pip install gql aiohttp

from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport

transport = AIOHTTPTransport(
  url="YOUR_PANTHER_API_URL",
  headers={"X-API-Key": "YOUR_API_KEY"}
)

client = Client(transport=transport, fetch_schema_from_transport=True)

# `IssueQuery`는 쿼리의 별칭입니다. 완전히 생략할 수도 있습니다.
issue_query = gql(
    """
    mutation IssueQuery($sql: String!) {
        executeDataLakeQuery(input: { sql: $sql }) {
            id
        }
    }
    """
)

# `GetQueryResults`는 쿼리의 별칭입니다. 완전히 생략할 수도 있습니다.
get_query_results = gql(
    """
    query GetQueryResults($id: ID!, $cursor: String) {
        dataLakeQuery(id: $id) {
            message
            status
            results(input: { cursor: $cursor }) {
                edges {
                    node
                }
                pageInfo {
                    endCursor
                    hasNextPage
                }
            }
        }
    }
    """
)

# 모든 페이지에서 가져오는 모든 결과를 보관하는 누적 변수
all_results = []
# 루프를 언제 종료할지 알기 위한 도우미.
has_more = True
# 페이지네이션 커서
cursor = None

# 데이터 레이크(Data Explorer) 쿼리 실행
mutation_data = client.execute(
    issue_query,
    variable_values={
        "sql": "select * from panther_logs.public.aws_alb limit 5"
    }
)

# 결과가 반환될 때까지 쿼리를 폴링하기 시작합니다. 그 다음에는,
# 더 이상 남은 페이지가 없을 때까지 계속 가져옵니다
while has_more:
    query_data = client.execute(
        get_query_results,
        variable_values = {
            "id": mutation_data["executeDataLakeQuery"]["id"],
            "cursor": cursor
        }
    )
    
    # 아직 실행 중이면 메시지를 출력하고 계속 폴링합니다
    if query_data["dataLakeQuery"]["status"] == "running":
        print(query_data["dataLakeQuery"]["message"])
        continue
    
    # 실행 중도 아니고 완료도 아니라면,
    # 취소되었거나 오류가 발생한 것입니다. 이 경우,
    # 예외를 발생시킵니다
    if query_data["dataLakeQuery"]["status"] != "succeeded":
        raise Exception(query_data["dataLakeQuery"]["message"])

    all_results.extend([edge["node"] for edge in query_data["dataLakeQuery"]["results"]["edges"]])
    has_more = query_data["dataLakeQuery"]["results"]["pageInfo"]["hasNextPage"]
    cursor = query_data["dataLakeQuery"]["results"]["pageInfo"]["endCursor"]

print(f'쿼리에서 {len(all_results)}개의 결과를 반환했습니다!')
```

{% endtab %}
{% endtabs %}

#### Search 쿼리 실행

{% tabs %}
{% tab title="NodeJS" %}

```javascript
// npm install graphql graphql-request

import { GraphQLClient, gql } from 'graphql-request';

const client = new GraphQLClient(
  'YOUR_PANTHER_API_URL', 
  { headers: { 'X-API-Key': 'YOUR_API_KEY' } 
});

// `IssueQuery`는 쿼리의 별칭입니다. 완전히 생략할 수도 있습니다.
const issueQuery = gql`
  mutation IssueQuery($input: ExecuteIndicatorSearchQueryInput!) {
    executeIndicatorSearchQuery(input: $input) {
      id
    }
  }
`;

// `GetQueryResults`는 쿼리의 별칭입니다. 완전히 생략할 수도 있습니다.
const getQueryResults = gql`
  query GetQueryResults($id: ID!, $cursor: String) {
    dataLakeQuery(id: $id) {
      message
      status
      results(input: { cursor: $cursor }) {
        edges {
          node
        }
        pageInfo {
          endCursor
          hasNextPage
        }
      }
    }
  }
`;

(async () => {
  try {
    // 가져오는 모든 결과 노드를 보관하는 누적 변수
    let allResults = [];
    // 루프를 언제 종료할지 알기 위한 헬퍼
    let hasMore = true;
    // 페이지네이션 커서
    let cursor = null;

    // 쿼리 실행
    const mutationData = await client.request(issueQuery, {
      input: {         
        indicators: ["226103014039"],
        startTime: "2022-03-29T00:00:00.001Z",
        endTime: "2022-03-30T00:00:00.001Z",
        indicatorName: "p_any_aws_account_ids"
      }
    });

    // 더 이상 남아 있는 페이지가 없을 때까지 페이지를 계속 가져옵니다
    do {
      const queryData = await client.request(getQueryResults, {
        id: mutationData.executeIndicatorSearchQuery.id,
        cursor,
      });

      // 아직 실행 중이면 메시지를 출력하고 계속 폴링합니다
      if (queryData.dataLakeQuery.status === 'running') {
        console.log(queryData.dataLakeQuery.message);
        continue;
      }

      // 실행 중도 아니고 완료도 아니라면,
      // 취소되었거나 오류가 발생한 것입니다. 이 경우,
      // 예외를 발생시킵니다
      if (queryData.dataLakeQuery.status !== 'succeeded') {
        throw new Error(queryData.dataLakeQuery.message);
      }

      allResults = [...allResults, ...queryData.dataLakeQuery.results.edges.map(edge => edge.node)];

      hasMore = queryData.dataLakeQuery.results.pageInfo.hasNextPage;
      cursor = queryData.dataLakeQuery.results.pageInfo.endCursor;
    } while (hasMore);

    console.log(`쿼리에서 ${allResults.length}개의 결과를 반환했습니다!`);
  } catch (err) {
    console.error(err.response);
  }
})();
```

{% endtab %}

{% tab title="Python" %}

```python
# pip install gql aiohttp

from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport

transport = AIOHTTPTransport(
  url="YOUR_PANTHER_API_URL",
  headers={"X-API-Key": "YOUR_API_KEY"}
)

client = Client(transport=transport, fetch_schema_from_transport=True)

# `IssueQuery`는 쿼리의 별칭입니다. 완전히 생략할 수도 있습니다.
issue_query = gql(
    """
    mutation IssueQuery($input: ExecuteIndicatorSearchQueryInput!) {
        executeIndicatorSearchQuery(input: $input) {
            id
        }
    }
    """
)

# `GetQueryResults`는 쿼리의 별칭입니다. 완전히 생략할 수도 있습니다.
get_query_results = gql(
    """
    query GetQueryResults($id: ID!, $cursor: String) {
        dataLakeQuery(id: $id) {
            message
            status
            results(input: { cursor: $cursor }) {
                edges {
                    node
                }
                pageInfo {
                    endCursor
                    hasNextPage
                }
            }
        }
    }
    """
)

# 모든 페이지에서 가져오는 모든 결과를 보관하는 누적 변수
all_results = []
# 루프를 종료해야 할 때를 알기 위한 도우미
has_more = True
# 페이지네이션 커서
cursor = None

# Indicator Search 쿼리 실행
mutation_data = client.execute(
    issue_query,
    variable_values={
        "input": {         
            "indicators": ["226103014039"],
            "startTime": "2022-03-29T00:00:00.001Z",
            "endTime": "2022-03-30T00:00:00.001Z",
            "indicatorName": "p_any_aws_account_ids"
        }
    }
)

# 결과가 반환될 때까지 쿼리를 폴링하기 시작합니다. 그 다음에는,
# 더 이상 남은 페이지가 없을 때까지 계속 가져옵니다
while has_more:    
    query_data = client.execute(
        get_query_results,
        variable_values = {
            "id": mutation_data["executeIndicatorSearchQuery"]["id"],
            "cursor": cursor
        }
    )
    
    # 아직 실행 중이면 메시지를 출력하고 계속 폴링합니다
    if query_data["dataLakeQuery"]["status"] == "running":
        print(query_data["dataLakeQuery"]["message"])
        continue
    
    # 실행 중도 아니고 완료도 아니라면,
    # 취소되었거나 오류가 발생한 것입니다. 이 경우,
    # 예외를 발생시킵니다
    if query_data["dataLakeQuery"]["status"] != "succeeded":
        raise Exception(query_data["dataLakeQuery"]["message"])

    all_results.extend([edge["node"] for edge in query_data["dataLakeQuery"]["results"]["edges"]])
    has_more = query_data["dataLakeQuery"]["results"]["pageInfo"]["hasNextPage"]
    cursor = query_data["dataLakeQuery"]["results"]["pageInfo"]["endCursor"]

print(f'쿼리에서 {len(all_results)}개의 결과를 반환했습니다!')
```

{% endtab %}
{% endtabs %}
