Amazon으로 스튜디오 노트북 만들기 MSK - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink는 이전에 Amazon Kinesis Data Analytics for Apache Flink로 알려졌습니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon으로 스튜디오 노트북 만들기 MSK

이 자습서에서는 Amazon MSK 클러스터를 소스로 사용하는 Studio 노트북을 만드는 방법을 설명합니다.

Amazon MSK 클러스터 설정

이 자습서에는 일반 텍스트 액세스를 허용하는 Amazon MSK 클러스터가 필요합니다. Amazon MSK 클러스터를 아직 설정하지 않은 경우 Amazon 사용 시작하기 MSK 자습서를 따라 AmazonVPC, Amazon MSK 클러스터, 주제 및 Amazon EC2 클라이언트 인스턴스를 생성하십시오.

자습서에 따라 다음을 수행하십시오:

NAT게이트웨이를 사용자 환경에 추가하십시오. VPC

Amazon 사용 시작하기 MSK 자습서에 따라 Amazon MSK 클러스터를 생성했거나 기존 VPC Amazon에 아직 프라이빗 서브넷용 NAT 게이트웨이가 없는 경우 VPC Amazon에 NAT 게이트웨이를 추가해야 합니다. 다음 다이어그램은 아키텍처입니다.

AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.

VPCAmazon용 NAT 게이트웨이를 생성하려면 다음과 같이 하십시오.

  1. 에서 Amazon VPC 콘솔을 엽니다 https://console.aws.amazon.com/vpc/.

  2. 왼쪽 탐색 표시줄에서 NAT게이트웨이를 선택합니다.

  3. NAT게이트웨이 페이지에서 게이트웨이 생성을 NAT 선택합니다.

  4. NAT게이트웨이 생성 페이지에서 다음 값을 제공합니다.

    이름 - 선택 사항 ZeppelinGateway
    서브넷 AWS KafkaTutorialSubnet1
    엘라스틱 IP 할당 ID 사용 가능한 엘라스틱 IP를 선택하세요. IPs사용할 수 있는 엘라스틱이 없는 경우 [Elastic IP 할당] 을 선택한 다음 콘솔에서 생성하는 Elasic IP를 선택합니다.

    게이트웨이 생성을 NAT 선택합니다.

  5. 왼쪽 탐색 모음에서 경로 표를 선택합니다.

  6. [Create Route Table]을 선택합니다.

  7. 경로 표 생성 페이지에서 다음 정보를 제공합니다.

    • Name tag: ZeppelinRouteTable

    • VPC: 원하는 항목 VPC (예: AWS KafkaTutorialVPC) 을 선택합니다.

    생성(Create)을 선택합니다.

  8. 라우팅 테이블 목록에서 선택합니다 ZeppelinRouteTable. 경로 탭에서 경로 편집을 선택합니다.

  9. 경로 편집 페이지에서 경로 추가를 선택합니다.

  10. 에서 대상 주소0.0.0.0/0을 입력합니다. 타겟에서 NAT 게이트웨이, 를 선택합니다 ZeppelinGateway. 경로 저장을 선택합니다. 닫기를 선택하세요.

  11. 라우팅 테이블 페이지에서 ZeppelinRouteTable선택한 상태에서 서브넷 연결 탭을 선택합니다. 서브넷 연결 편집을 선택합니다.

  12. 서브넷 연결 편집 페이지에서 AWS KafkaTutorialSubnet2와AWS KafkaTutorialSubnet 3을 선택합니다. 저장(Save)을 선택합니다.

AWS Glue 연결 및 테이블 만들기

Studio 노트북은 Amazon 데이터 소스에 대한 MSK 메타데이터용 AWS Glue데이터베이스를 사용합니다. 이 섹션에서는 Amazon MSK 클러스터에 액세스하는 방법을 설명하는 AWS Glue 연결과 데이터 소스의 데이터를 Studio 노트북과 같은 클라이언트에 제공하는 방법을 설명하는 AWS Glue 표를 생성합니다.

연결 생성
  1. 에서 AWS Management Console 로그인하고 AWS Glue 콘솔을 엽니다 https://console.aws.amazon.com/glue/.

  2. 아직 AWS Glue 데이터베이스가 없는 경우 왼쪽 탐색 표시줄에서 데이터베이스를 선택합니다. 데이터베이스 추가를 선택합니다. 데이터베이스 추가 창에서 데이터베이스 이름default(으)로 입력합니다. 생성(Create)을 선택합니다.

  3. 왼쪽 탐색 모음에서 연결을 선택합니다. 연결 추가를 선택합니다.

  4. 연결 추가 창에서 다음 값을 입력합니다.

    • 연결 명칭ZeppelinConnection을 입력합니다.

    • 연결 유형에서 Kafka를 선택합니다.

    • Kafka 부트스트랩 서버의 URLs 경우 클러스터의 부트스트랩 브로커 문자열을 제공하십시오. MSK콘솔에서 또는 다음 명령을 입력하여 부트스트랩 브로커를 가져올 수 있습니다. CLI

      aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
    • 연결 필요 SSL 체크박스의 선택을 취소하십시오.

    Next(다음)를 선택합니다.

  5. VPC페이지에서 다음 값을 입력합니다.

    • 의 경우 VPC사용자 이름 VPC (예:) 을 선택합니다 AWS KafkaTutorialVPC.

    • 서브넷의 경우 AWS KafkaTutorialSubnet2를 선택합니다.

    • 보안 그룹의 경우 사용 가능한 모든 그룹을 선택합니다.

    Next(다음)를 선택합니다.

  6. 연결 속성연결 액세스 페이지에서 마침을 선택합니다.

표 생성
참고

다음 단계에 설명된 대로 테이블을 수동으로 생성하거나 Apache Zeppelin 내의 노트북에서 Apache Flink용 Managed Service for Apache Flink용 테이블 커넥터 코드 생성을 사용하여 명령문을 통해 테이블을 생성할 수 있습니다. DDL 그런 다음 AWS Glue 체크인해 테이블이 올바르게 생성되었는지 확인할 수 있습니다.

  1. 왼쪽 탐색 모음에서 를 선택합니다. 페이지에서 표 추가, 표 수동 추가를 선택합니다.

  2. 표 속성 설정 페이지에서 표 명칭stock을 입력합니다. 이전에 만든 데이터베이스를 선택했는지 확인하세요. Next(다음)를 선택합니다.

  3. 데이터 스토어 추가 페이지에서 Kafka를 선택합니다. 주제 이름에는 주제 이름 (예: AWS KafkaTutorialTopic) 을 입력합니다. 연결에서 을 선택합니다 ZeppelinConnection.

  4. 분류 페이지에서 을 선택합니다 JSON. Next(다음)를 선택합니다.

  5. 스키마 정의 페이지에서 열 추가를 선택하여 열을 추가합니다. 다음 속성을 가진 열을 추가합니다.

    열 명칭 데이터 유형
    ticker string
    price double

    Next(다음)를 선택합니다.

  6. 다음 페이지에서 설정을 확인하고 마침을 선택합니다.

  7. 테이블 목록에서 새로 생성한 테이블을 선택합니다.

  8. 테이블 편집을 선택하고 다음 속성을 추가합니다.

    • 키:managed-flink.proctime, 값: proctime

    • 키:flink.properties.group.id, 값: test-consumer-group

    • 키:flink.properties.auto.offset.reset, 값: latest

    • 키:classification, 값: json

    이러한 키/값 쌍이 없으면 Flink 노트북에서 오류가 발생합니다.

  9. 적용을 선택합니다.

Amazon으로 스튜디오 노트북 만들기 MSK

애플리케이션에서 사용하는 리소스를 생성했으니 이제 Studio 노트북을 생성합니다.

AWS Management Console 또는 를 사용하여 애플리케이션을 생성할 수 AWS CLI있습니다.
참고

Amazon MSK 콘솔에서 기존 클러스터를 선택한 다음 실시간 데이터 처리를 선택하여 Studio 노트북을 생성할 수도 있습니다.

를 사용하여 스튜디오 노트북을 만드십시오. AWS Management Console

  1. 집에서 Apache Flink용 관리형 서비스 콘솔을 https://console.aws.amazon.com/managed-flink/ 여시겠습니까? 지역=us-east-1#/애플리케이션/대시보드.

  2. Managed Service for Apache Flink 애플리케이션 페이지에서 Studio 탭을 선택합니다. Studio 노트북 생성을 선택합니다.

    참고

    Amazon MSK 또는 Kinesis Data Streams 콘솔에서 스튜디오 노트북을 생성하려면 입력 Amazon MSK 클러스터 또는 Kinesis 데이터 스트림을 선택한 다음 실시간 데이터 처리를 선택합니다.

  3. Studio 노트북 생성 페이지에서 다음 정보를 입력합니다.

    • Studio 노트북 명칭 MyNotebook을 입력합니다.

    • AWS Glue 데이터베이스기본값을 선택합니다.

    Studio 노트북 생성을 선택합니다.

  4. MyNotebook페이지에서 구성 탭을 선택합니다. 네트워킹 섹션에서 편집을 선택합니다.

  5. 네트워킹 편집 MyNotebook 페이지에서 Amazon MSK 클러스터 기반 VPC 구성을 선택합니다. Amazon MSK 클러스터용 Amazon MSK 클러스터를 선택하십시오. Save changes(변경 사항 저장)를 선택합니다.

  6. MyNotebook페이지에서 [Run] 을 선택합니다. 상태실행 중으로 표시될 때까지 기다리세요.

를 사용하여 Studio 노트북을 생성합니다. AWS CLI

를 사용하여 스튜디오 노트북을 만들려면 다음과 같이 하십시오. AWS CLI

  1. 다음 정보가 있는지 확인합니다. 애플리케이션을 생성하려면 이러한 값이 필요합니다.

    • 계정 ID

    • Amazon MSK 클러스터를 VPC 포함하는 Amazon의 서브넷 IDs 및 보안 그룹 ID입니다.

  2. 다음 콘텐츠를 가진 create.json이라는 파일을 생성합니다: 자리 표시자 값을 정보로 바꿉니다.

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  3. 애플리케이션을 생성하려면 다음 명령을 실행합니다.

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  4. 명령이 완료되면 새 Studio 노트북의 세부 정보를 보여 주는 다음과 유사한 출력이 나타나야 합니다.

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  5. 애플리케이션을 시작하려면 다음 명령을 실행합니다. 샘플 값을 계정 ID로 바꿉니다.

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

Amazon MSK 클러스터로 데이터 전송

이 섹션에서는 Amazon EC2 클라이언트에서 Python 스크립트를 실행하여 Amazon 데이터 소스로 MSK 데이터를 전송합니다.

  1. Amazon EC2 클라이언트에 연결합니다.

  2. 다음 명령을 실행하여 Python 버전 3, Pip 및 Python용 Kafka 패키지를 설치하고 작업을 확인합니다.

    sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
  3. 다음 명령을 입력하여 클라이언트 시스템에서 구성합니다. AWS CLI

    aws configure

    region에 계정 자격 증명 및 us-east-1을 제공하세요.

  4. 다음 콘텐츠를 가진 stock.py이라는 파일을 생성합니다: 샘플 값을 Amazon MSK 클러스터의 Bootstrap Brokers 문자열로 바꾸고, 주제가 다음과 같지 않은 경우 주제 이름을 업데이트하십시오. AWS KafkaTutorialTopic

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  5. 다음 명령으로 스크립트를 실행합니다:

    $ python3 stock.py
  6. 다음 섹션을 완료하는 동안 스크립트는 실행 상태로 두세요.

Studio 노트북을 테스트

이 섹션에서는 Studio 노트북을 사용하여 Amazon MSK 클러스터의 데이터를 쿼리합니다.

  1. 집에서 Apache Flink용 관리형 서비스 콘솔을 https://console.aws.amazon.com/managed-flink/ 여시겠습니까? 지역=us-east-1#/애플리케이션/대시보드.

  2. Managed Service for Apache Flink 애플리케이션 페이지에서 Studio 노트북 탭을 선택합니다. 선택하세요. MyNotebook

  3. MyNotebook페이지에서 [Apache Zeppelin에서 열기] 를 선택합니다.

    Apache Zeppelin 인터페이스가 새 탭에서 열립니다.

  4. 제플린에 오신 것을 환영합니다! 페이지에서 Zeppelin 새로운 노트를 선택하세요.

  5. Zeppelin 노트 페이지에서 새로운 노트에 다음 쿼리를 입력합니다.

    %flink.ssql(type=update) select * from stock

    실행 아이콘을 선택합니다.

    애플리케이션은 Amazon MSK 클러스터의 데이터를 표시합니다.

애플리케이션에서 운영 측면을 볼 수 있도록 Apache Flink 대시보드를 열려면 선택하십시오. FLINKJOB Flink 대시보드에 대한 자세한 내용을 알아보려면 Managed Service for Apache Flink 개발자 가이드Apache Flink 대시보드를 참조하세요.

Flink 스트리밍 SQL 쿼리의 더 많은 예를 보려면 Apache Flink 설명서의 쿼리를 참조하십시오.