예: ROWTIME을 사용하는 텀블링 윈도우 - Amazon Kinesis Data Analytics for SQL Applications 개발자 안내서

신중한 고려 끝에 두 단계로 Amazon Kinesis Data Analytics for SQL applications를 중단하기로 결정했습니다.

1. 2025년 10월 15일부터 SQL 애플리케이션을 위한 새 Kinesis Data Analytics를 생성할 수 없습니다.

2. 2026년 1월 27일부터 애플리케이션이 삭제됩니다. SQL 애플리케이션용 Amazon Kinesis Data Analytics를 시작하거나 작동할 수 없습니다. 해당 시점부터 에 대한 Amazon Kinesis Data AnalyticsSQL에 대한 지원을 더 이상 사용할 수 없습니다. 자세한 내용은 Amazon Kinesis Data Analytics for SQL Applications 중단 단원을 참조하십시오.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

예: ROWTIME을 사용하는 텀블링 윈도우

윈도우 모드 쿼리가 비중첩 방식으로 각 윈도우를 처리하는 경우 이 윈도우를 텀블링 윈도우라고 합니다. 자세한 설명은 텀블링 윈도우(그룹별 집계) 섹션을 참조하세요. 이 Amazon Kinesis Data Analytics 예는 ROWTIME 열을 사용하여 텀플링 윈도우를 생성합니다. ROWTIME 열은 시간을 나타내며 애플리케이션이 레코드를 읽었습니다.

이 예에서는 다음 레코드를 Kinesis 데이터 스트림에 기록합니다.

{"TICKER": "TBV", "PRICE": 33.11} {"TICKER": "INTC", "PRICE": 62.04} {"TICKER": "MSFT", "PRICE": 40.97} {"TICKER": "AMZN", "PRICE": 27.9} ...

그런 다음 Kinesis 데이터 스트림을 스트리밍 소스로 사용하여 AWS Management Console에서 Kinesis Data Analytics 애플리케이션을 생성합니다. 검색 프로세스는 스트리밍 소스 상의 샘플 레코드를 읽고 다음과 같이 두 개의 열(TICKERPRICE)로 애플리케이션 내 스키마를 유추합니다.

가격 및 티커 열이 포함된 애플리케이션 내 스키마를 보여주는 콘솔 스크린샷

MINMAX 함수가 포함된 애플리케이션 코드를 사용하여 데이터의 윈도우 모드 집계를 생성합니다. 그런 다음 아래 스크린샷과 같이 다른 애플리케이션 내 스트림에 결과 데이터를 삽입합니다.

애플리케이션 내 스트림의 결과 데이터를 보여주는 콘솔 스크린샷

다음 절차에서는 ROWTIME을 기반으로 텀플링 윈도우의 입력 스트림에서 값을 집계하는 Kinesis Data Analytics 애플리케이션을 생성합니다.

1단계: Kinesis 데이터 스트림 생성

Amazon Kinesis 데이터 스트림을 생성하고 다음과 같이 레코드를 채웁니다:

  1. AWS Management Console에 로그인하여 https://console.aws.amazon.com/kinesis에서 Kinesis 콘솔을 엽니다.

  2. 탐색 창에서 Data Streams(데이터 스트림)를 선택합니다.

  3. Create Kinesis stream(Kinesis 스트림 생성)을 선택한 다음 샤드가 하나 있는 스트림을 생성합니다. 자세한 설명은 Amazon Kinesis Data Streams 개발자 가이드스트림 생성을 참조하세요.

  4. 프로덕션 환경에서 Kinesis 데이터 스트림에 레코드를 기록하려면 Kinesis Client Library 또는 Kinesis Data Streams API를 사용하는 것이 좋습니다. 이 예에서는 간단한 설명을 위해 다음 Python 스크립트를 사용하여 레코드를 생성합니다. 코드를 실행하여 샘플 티커 레코드를 채웁니다. 이 단순한 코드는 임의의 티커 레코드를 스트림에 연속적으로 씁니다. 이후 단계에서 애플리케이션 스키마를 생성할 수 있도록 스크립트를 실행 중 상태로 유지합니다.

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

2단계: Kinesis Data Analytics 애플리케이션 생성

다음과 같이 Kinesis Data Analytics 애플리케이션을 생성합니다:

  1. https://console.aws.amazon.com/kinesisanalytics에서 Managed Service for Apache Flink 콘솔을 엽니다.

  2. 애플리케이션 생성을 선택하고 애플리케이션 명칭을 입력한 다음 애플리케이션 생성을 선택합니다.

  3. 애플리케이션 세부 정보 페이지에서 Connect streaming data(스트리밍 데이터 연결)를 선택하여 소스에 연결합니다.

  4. Connect to source(소스에 연결) 페이지에서 다음을 수행합니다.

    1. 이전 섹션에서 생성한 스트림을 선택합니다.

    2. Discover schema(스키마 발견)를 선택합니다. 유추된 스키마와, 생성된 애플리케이션 내 스트림에 대한 스키마를 유추하는 데 사용된 샘플 레코드를 콘솔이 표시할 때까지 기다립니다. 유추된 스키마에는 두 개의 열이 있습니다.

    3. [Save schema and update stream samples]를 선택합니다. 콘솔에서 스키마를 저장한 이후 종료를 선택합니다.

    4. [Save and continue]를 선택합니다.

  5. 애플리케이션 세부 정보 페이지에서 Go to SQL editor(SQL 편집기로 이동)를 선택합니다. 애플리케이션을 시작하려면 나타나는 대화 상자에서 Yes, start application(예, 애플리케이션 시작)을 선택합니다.

  6. SQL 편집기에서 애플리케이션 코드를 작성하고 다음과 같이 결과를 확인합니다.

    1. 다음 애플리케이션 코드를 복사하여 편집기에 붙여넣습니다.

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), MIN_PRICE REAL, MAX_PRICE REAL); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, MIN(PRICE), MAX(PRICE) FROM "SOURCE_SQL_STREAM_001" GROUP BY TICKER, STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
    2. [Save and run SQL]을 선택합니다.

      Real-time analytics(실시간 분석) 탭에서 애플리케이션이 생성한 모든 애플리케이션 내 스트림을 확인하고 데이터를 검증할 수 있습니다.