1단계: 입력 및 출력 스트림 생성 - Amazon Kinesis Data Analytics for SQL 애플리케이션 개발자 안내서

새 프로젝트의 경우 Kinesis Data Analytics for SQL 애플리케이션보다 새로운 Managed Service for Apache Flink Studio를 사용하는 것이 좋습니다. Managed Service for Apache Flink Studio는 사용 편의성과 고급 분석 기능을 결합하여 정교한 스트림 처리 애플리케이션을 몇 분 만에 구축할 수 있도록 합니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

1단계: 입력 및 출력 스트림 생성

핫스팟 예를 위해 Amazon Kinesis Data Analytics 애플리케이션을 생성하기 전에 먼저 Kinesis 데이터 스트림 2개를 생성합니다. 스트림 중 하나를 애플리케이션의 스트리밍 소스로 구성하고 또 다른 스트림을 Kinesis Data Analytics가 애플리케이션 출력을 유지하는 목적지로 구성합니다.

1.1단계: Kinesis 데이터 스트림 생성

이 섹션에서는 다음 2개의 Kinesis 데이터 스트림을 생성합니다: ExampleInputStreamExampleOutputStream.

콘솔 또는 AWS CLI을(를) 사용하여 데이터 스트림을 생성합니다.

  • 콘솔을 사용하여 데이터 스트림을 생성:

    1. AWS Management Console에 로그인하여 https://console.aws.amazon.com/kinesis에서 Kinesis 콘솔을 엽니다.

    2. 탐색 창에서 Data Streams(데이터 스트림)를 선택합니다.

    3. Kinesis 스트림 생성을 선택한 다음 샤드가 하나인 스트림(ExampleInputStream이라고 함)을 생성합니다.

    4. 이전 단계를 반복하여 샤드가 하나인 스트림(ExampleOutputStream이라고 함)을 생성합니다.

  • AWS CLI을(를) 사용하여 데이터 스트림 생성:

    • 다음의 Kinesis create-stream AWS CLI 명령을 사용하여 스트림(ExampleInputStreamExampleOutputStream)을 생성합니다. 애플리케이션이 출력을 작성하기 위해 사용할 두 번째 스트림을 생성하려면 동일한 명령을 실행하여 스트림 명칭을 ExampleOutputStream으로 변경합니다.

      $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

1.2단계: 샘플 레코드를 입력 스트림에 작성

이 단계에서는 Python 코드를 실행하여 샘플 레코드를 연속적으로 생성하고 ExampleInputStream 스트림에 작성합니다.

{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"} {"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
  1. Python 및 pip를 설치합니다.

    Python 설치에 관한 정보는 Python 웹사이트를 참조하십시오.

    pip를 사용하여 종속 프로그램을 설치할 수 있습니다. pip 설치에 관한 정보는 pip 웹 사이트에 있는 Installation을 참조하십시오.

  2. 다음 Python 코드를 실행합니다. 이 코드는 다음을 수행합니다.

    • (X, Y) 평면 어딘가에 잠재적 핫스팟을 생성합니다.

    • 각 핫스팟마다 1,000포인트 세트를 생성합니다. 이 포인트에서 20%가 핫스팟 주변에 클러스터링됩니다. 나머지는 전체 공간 내에 무작위로 생성됩니다.

    • put-record 명령은 JSON 레코드를 스트림에 작성합니다.

    중요

    이 파일에는 귀하의 AWS 자격 증명이 포함되어 있으므로 이 파일을 웹 서버에 업로드하지 마십시오.

    import json from pprint import pprint import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_hotspot(field, spot_size): hotspot = { "left": field["left"] + random.random() * (field["width"] - spot_size), "width": spot_size, "top": field["top"] + random.random() * (field["height"] - spot_size), "height": spot_size, } return hotspot def get_record(field, hotspot, hotspot_weight): rectangle = hotspot if random.random() < hotspot_weight else field point = { "x": rectangle["left"] + random.random() * rectangle["width"], "y": rectangle["top"] + random.random() * rectangle["height"], "is_hot": "Y" if rectangle is hotspot else "N", } return {"Data": json.dumps(point), "PartitionKey": "partition_key"} def generate( stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client ): """ Generates points used as input to a hotspot detection algorithm. With probability hotspot_weight (20%), a point is drawn from the hotspot; otherwise, it is drawn from the base field. The location of the hotspot changes for every 1000 points generated. """ points_generated = 0 hotspot = None while True: if points_generated % 1000 == 0: hotspot = get_hotspot(field, hotspot_size) records = [ get_record(field, hotspot, hotspot_weight) for _ in range(batch_size) ] points_generated += len(records) pprint(records) kinesis_client.put_records(StreamName=stream_name, Records=records) time.sleep(0.1) if __name__ == "__main__": generate( stream_name=STREAM_NAME, field={"left": 0, "width": 10, "top": 0, "height": 10}, hotspot_size=1, hotspot_weight=0.2, batch_size=10, kinesis_client=boto3.client("kinesis"), )

다음 단계

2단계: Kinesis Data Analytics 애플리케이션 생성