신중한 고려 끝에 두 단계로 Amazon Kinesis Data Analytics for SQL applications를 중단하기로 결정했습니다.
1. 2025년 10월 15일부터 SQL 애플리케이션을 위한 새 Kinesis Data Analytics를 생성할 수 없습니다.
2. 2026년 1월 27일부터 애플리케이션이 삭제됩니다. SQL 애플리케이션용 Amazon Kinesis Data Analytics를 시작하거나 작동할 수 없습니다. 해당 시점부터 에 대한 Amazon Kinesis Data AnalyticsSQL에 대한 지원을 더 이상 사용할 수 없습니다. 자세한 내용은 Amazon Kinesis Data Analytics for SQL Applications 중단 단원을 참조하십시오.
기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
예: 값 변환 DateTime
Amazon Kinesis Data Analytics는 열을 타임스탬프로 변환하는 것을 지원합니다. 예를 들어 GROUP BY
절의 일부인 자체 타임스탬프를 ROWTIME
열에 더하여 또 다른 시간 기반 윈도우로 사용할 수 있습니다. Kinesis Data Analytics는 날짜 및 시간 필드 작업을 위한 작업 및 SQL 함수를 제공합니다.
-
날짜 및 시간 연산자 – 날짜, 시간 및 간격 데이터 유형에 대한 산술 연산을 수행할 수 있습니다. 자세한 설명은 Amazon Managed Service for Apache Flink SQL 참조의 날짜, 타임스탬프 및 간격 연산자를 참조하십시오.
-
SQL 함수 – 여기에는 다음이 포함됩니다. 자세한 설명은 Amazon Managed Service for Apache Flink SQL 참조의 날짜 및 시간 함수를 참조하십시오.
-
EXTRACT()
– 날짜, 시간, 타임스탬프 또는 간격 표현식에서 필드 하나를 추출합니다. -
CURRENT_TIME
– 쿼리가 실행될 때의 시간을 반환합니다(UTC). -
CURRENT_DATE
– 쿼리가 실행될 때의 날짜를 반환합니다(UTC). -
CURRENT_TIMESTAMP
– 쿼리가 실행될 때의 타임스탬프를 반환합니다(UTC). -
LOCALTIME
– Kinesis Data Analytics가 실행 중인 환경에서 정의된 대로 쿼리가 실행될 때의 현재 시간(UTC)을 반환합니다. -
LOCALTIMESTAMP
– Kinesis Data Analytics가 실행 중인 환경에 의해 정의되는 대로 현재 타임스탬프를 반환합니다(UTC).
-
-
SQL 확장 – 여기에는 다음이 포함됩니다. 자세한 설명은 Amazon Managed Service for Apache Flink SQL 참조에서 날짜 및 시간 함수와 날짜 시간 변환 함수를 참조하십시오.
-
CURRENT_ROW_TIMESTAMP
– 스트림 상의 각 행에 대해 새 타임스탬프를 반환합니다. -
TSDIFF
– 두 타임스탬프 간의 차이를 밀리초 단위로 반환합니다. -
CHAR_TO_DATE
– 문자열을 날짜로 변환합니다. -
CHAR_TO_TIME
– 문자열을 시간으로 변환합니다. -
CHAR_TO_TIMESTAMP
– 문자열을 타임스탬프로 변환합니다. -
DATE_TO_CHAR
– 날짜를 문자열로 변환합니다. -
TIME_TO_CHAR
– 시간을 문자열로 변환합니다. -
TIMESTAMP_TO_CHAR
– 타임스탬프를 문자열로 변환합니다.
-
위의 SQL 함수 중 대부분은 형식을 사용하여 열을 변환합니다. 형식은 유연합니다. 예를 들어, 형식 yyyy-MM-dd hh:mm:ss
를 지정하여 입력 문자열 2009-09-16 03:15:24
를 타임스탬프로 변환할 수 있습니다. 자세한 설명은 Amazon Managed Service for Apache Flink SQL 참조의 Char To Timestamp(Sys)를 참조하십시오.
예: 날짜 변환
이 예에서는 다음 레코드를 Amazon Kinesis 데이터 스트림에 기록합니다.
{"EVENT_TIME": "2018-05-09T12:50:41.337510", "TICKER": "AAPL"} {"EVENT_TIME": "2018-05-09T12:50:41.427227", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.520549", "TICKER": "INTC"} {"EVENT_TIME": "2018-05-09T12:50:41.610145", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.704395", "TICKER": "AAPL"} ...
그런 다음 콘솔에서 Kinesis Data Analytics 애플리케이션을 생성하고 Kinesis 스트림을 스트리밍 소스로 취합니다. 검색 프로세스는 스트리밍 소스 상의 샘플 레코드를 읽고 다음과 같이 두 개의 열(EVENT_TIME
및 TICKER
)로 애플리케이션 내 스키마를 유추합니다.
그런 다음 SQL 함수를 지닌 애플리케이션 코드를 사용하여 EVENT_TIME
타임스탬프 필드를 다양한 방법으로 변환합니다. 그러면 다음 스크린샷과 같이 다른 애플리케이션 내 스트림에 결과 데이터를 삽입합니다.
1단계: Kinesis 데이터 스트림 생성
Amazon Kinesis 데이터 스트림을 생성하고 다음과 같이 이벤트 시간 및 티커 레코드를 채웁니다:
-
탐색 창에서 Data Streams(데이터 스트림)를 선택합니다.
-
Kinesis 스트림 생성을 선택한 다음 샤드가 하나인 스트림을 생성합니다.
-
다음의 Python 코드를 실행하여 스트림을 샘플 데이터로 채웁니다. 이 간단한 코드는 지속적으로 임의 티커 기호 및 현재 타임스탬프가 포함된 레코드를 스트림에 씁니다.
import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
2단계: Amazon Kinesis Data Analytics 애플리케이션 생성
다음과 같이 애플리케이션을 생성합니다:
https://console.aws.amazon.com/kinesisanalytics
에서 Managed Service for Apache Flink 콘솔을 엽니다. -
애플리케이션 생성을 선택하고 애플리케이션 명칭을 입력한 다음 애플리케이션 생성을 선택합니다.
-
애플리케이션 세부 정보 페이지에서 Connect streaming data(스트리밍 데이터 연결)를 선택하여 소스에 연결합니다.
-
Connect to source(소스에 연결) 페이지에서 다음을 수행합니다.
-
이전 섹션에서 생성한 스트림을 선택합니다.
-
IAM 역할 생성을 선택합니다.
-
Discover schema(스키마 발견)를 선택합니다. 유추된 스키마와, 생성된 애플리케이션 내 스트림에 대한 스키마를 유추하는 데 사용된 샘플 레코드를 콘솔이 표시할 때까지 기다립니다. 유추된 스키마에는 두 개의 열이 있습니다.
-
Edit Schema(스키마 편집)를 선택합니다. EVENT_TIME 열의 열 유형을
TIMESTAMP
으로 변경합니다. -
[Save schema and update stream samples]를 선택합니다. 콘솔에서 스키마를 저장한 이후 종료를 선택합니다.
-
[Save and continue]를 선택합니다.
-
-
애플리케이션 세부 정보 페이지에서 Go to SQL editor(SQL 편집기로 이동)를 선택합니다. 애플리케이션을 시작하려면 나타나는 대화 상자에서 Yes, start application(예, 애플리케이션 시작)을 선택합니다.
-
SQL 편집기에서 애플리케이션 코드를 작성하고 다음과 같이 결과를 확인합니다.
-
다음 애플리케이션 코드를 복사하여 편집기에 붙여넣습니다.
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001"
-
[Save and run SQL]을 선택합니다. Real-time analytics(실시간 분석) 탭에서 애플리케이션이 생성한 모든 애플리케이션 내 스트림을 확인하고 데이터를 검증할 수 있습니다.
-