시작하기 (스칼라) - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink는 이전에 Amazon Kinesis Data Analytics for Apache Flink로 알려졌습니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

시작하기 (스칼라)

참고

버전 1.15부터 Flink에는 Scala가 없습니다. 이제 애플리케이션은 모든 Scala API 버전에서 Java를 사용할 수 있습니다. Flink는 여전히 내부적으로 몇 가지 주요 구성 요소에서 Scala를 사용하지만 사용자 코드 클래스 로더에 Scala를 노출하지는 않습니다. 따라서 -archive에 Scala 종속 항목을 추가해야 합니다. JAR

Flink 1.15의 Scala 변경 사항에 대한 자세한 내용은 Scala Free in One Fifteen을 참조하세요.

이 연습에서는 Kinesis 스트림을 소스로 사용하고 싱크를 사용하여 Scala용 Apache Flink용 관리 서비스 애플리케이션을 만듭니다.

종속 리소스 생성

이 연습을 위해 Managed Service for Apache Flink 애플리케이션을 생성하기 전에 다음과 같은 종속 리소스를 생성해야 합니다.

  • 입력 및 출력을 위한 Kinesis 스트림 2개.

  • 애플리케이션 코드를 저장할 Amazon S3 버킷(ka-app-code-<username>)

콘솔을 사용하여 Kinesis 스트림과 Amazon S3 버킷을 만들 수 있습니다. 이러한 리소스를 만드는 방법 설명은 다음 주제를 참조하세요.

  • Amazon Kinesis Data Streams 개발자 안내서데이터 스트림 만들기 및 업데이트. 데이터 스트림 ExampleInputStreamExampleOutputStream에 명칭을 지정합니다.

    데이터 스트림 (AWS CLI)을 생성하려면

    • 첫 번째 스트림 (ExampleInputStream) 을 생성하려면 다음 Amazon Kinesis AWS CLI create-stream 명령을 사용하십시오.

      aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
    • 애플리케이션에서 출력을 쓰는 데 사용하는 두 번째 스트림을 생성하려면 동일한 명령을 실행하여 스트림 명칭을 ExampleOutputStream으로 변경합니다.

      aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  • Amazon Simple Storage Service 사용 설명서S3 버킷을 생성하려면 어떻게 해야 합니까? 로그인 명칭(예: ka-app-code-<username>)을 추가하여 Amazon S3 버킷에 전역적으로 고유한 명칭을 지정합니다.

기타 리소스

애플리케이션을 생성할 때 Apache Flink용 관리형 서비스는 다음과 같은 Amazon CloudWatch 리소스를 생성합니다 (아직 존재하지 않는 경우).

  • /AWS/KinesisAnalytics-java/MyApplication라는 로그 그룹.

  • kinesis-analytics-log-stream라는 로그 스트림.

입력 스트림에 샘플 레코드를 작성합니다.

이 섹션에서는 Python 스크립트를 사용하여 애플리케이션에서 처리할 샘플 레코드를 스트림에 쓰기 합니다.

참고

이 섹션에서는 AWS SDK for Python (Boto)이 필요합니다.

참고

이 섹션의 Python 스크립트는 AWS CLI를 사용합니다. 계정 자격 증명과 기본 지역을 사용하도록 구성해야 합니다. AWS CLI 를 AWS CLI구성하려면 다음을 입력합니다.

aws configure
  1. 다음 콘텐츠를 가진 stock.py이라는 파일을 생성합니다:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. stock.py 스크립트를 실행합니다.

    $ python stock.py

    자습서의 나머지 부분을 완료하는 동안 스크립트가 계속 돌아가게 둡니다.

애플리케이션 코드를 다운로드하여 검토하십시오.

이 예제의 Python 응용 프로그램 코드는 에서 사용할 수 GitHub 있습니다. 애플리케이션 코드를 다운로드하려면 다음을 수행하세요.

  1. 아직 설치하지 않았다면 Git 클라이언트를 설치합니다. 자세한 정보는 Git 설치를 참조하세요.

  2. 다음 명령을 사용하여 원격 리포지토리를 복제합니다:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. amazon-kinesis-data-analytics-java-examples/scala/GettingStarted 디렉터리로 이동합니다.

애플리케이션 코드에 대해 다음을 유의하십시오:

  • build.sbt 파일에는 Managed Service for Apache Flink 라이브러리를 포함하여 애플리케이션의 구성 및 종속성에 대한 정보가 들어 있습니다.

  • BasicStreamingJob.scala 파일에는 애플리케이션의 기능을 정의하는 주요 메서드가 들어 있습니다.

  • 애플리케이션은 Kinesis 소스를 사용하여 소스 스트림에서 읽습니다. 다음 스니펫은 Kinesis 소스를 생성합니다.

    private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }

    또한 애플리케이션은 Kinesis 싱크를 사용하여 결과 스트림에 기록합니다. 다음 조각은 Kinesis 싱크를 생성합니다.

    private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
  • 응용 프로그램은 StreamExecutionEnvironment 객체를 사용하여 외부 리소스에 액세스할 수 있는 소스 및 싱크 커넥터를 만듭니다.

  • 애플리케이션은 동적 애플리케이션 속성을 사용하여 소스 및 싱크 커넥터를 만듭니다. 런타임 애플리케이션의 속성을 읽어 커넥터를 구성합니다. 런타임 속성에 대한 자세한 내용은 런타임 속성을 참조하세요.

애플리케이션 코드 컴파일 및 업로드

이 섹션에서는 사용자가 종속 리소스 생성 섹션에서 생성한 Amazon S3 버킷에 사용자의 애플리케이션 코드를 컴파일 및 업로드합니다.

애플리케이션 코드 컴파일

이 섹션에서는 SBT빌드 도구를 사용하여 애플리케이션의 Scala 코드를 작성합니다. SBT설치하려면 cs 설정으로 sbt 설치를 참조하십시오. 또한 Java 개발 키트 (JDK) 도 설치해야 합니다. 연습 완료를 위한 사전 조건을 참조하세요.

  1. 애플리케이션 코드를 사용하려면 컴파일하여 JAR 파일로 패키징해야 합니다. 다음을 사용하여 코드를 컴파일하고 패키징할 수 있습니다. SBT

    sbt assembly
  2. 애플리케이션이 성공적으로 컴파일되면 다음 파일이 생성됩니다:

    target/scala-3.2.0/getting-started-scala-1.0.jar
Apache Flink 스트리밍 Scala 코드 업로드

이 섹션에서는 Amazon S3 버킷을 만들고 애플리케이션 코드를 업로드합니다.

  1. 에서 Amazon S3 콘솔을 엽니다 https://console.aws.amazon.com/s3/.

  2. 버킷 만들기를 선택합니다.

  3. 버킷 명칭 필드에 ka-app-code-<username>을 입력합니다. 버킷 명칭에 사용자 이름 등의 접미사를 추가하여 전역적으로 고유하게 만듭니다. Next(다음)를 선택합니다.

  4. 옵션 구성 단계에서 설정을 기본값 그대로 두고 다음을 선택합니다.

  5. 권한 설정 단계에서 설정을 기본값 그대로 두고 다음을 선택합니다.

  6. 버킷 생성을 선택합니다.

  7. ka-app-code-<username> 버킷을 선택한 다음 업로드를 선택합니다.

  8. 파일 선택 단계에서 파일 추가를 선택합니다. 이전 단계에서 생성한 getting-started-scala-1.0.jar 파일로 이동합니다.

  9. 개체 정보에 대한 설정은 변경할 필요가 없으므로 업로드를 선택합니다.

이제 애플리케이션 코드가 애플리케이션에서 액세스할 수 있는 Amazon S3 버킷에 저장됩니다.