Managed Service for Apache Flink 애플리케이션 생성 및 실행 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink는 이전에 Amazon Kinesis Data Analytics for Apache Flink로 알려졌습니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Managed Service for Apache Flink 애플리케이션 생성 및 실행

이 단계에서는 Kinesis 데이터 스트림을 소스 및 싱크로 사용하여 Managed Service for Apache Flink 애플리케이션을 생성합니다.

종속 리소스 생성

이 연습을 위해 Managed Service for Apache Flink 애플리케이션을 생성하기 전에 다음과 같은 종속 리소스를 생성해야 합니다.

  • 입력 및 출력을 위한 두 개의 Kinesis 데이터 스트림

  • 애플리케이션 코드를 저장하는 Amazon S3 버킷

    참고

    이 자습서에서는 us-east-1 US East(버지니아 북부) 리전에 애플리케이션을 배포하고 있다고 가정합니다. 다른 리전을 사용하는 경우 그에 따라 모든 단계를 조정합니다.

Amazon Kinesis 데이터 스트림 2개 생성

이 연습을 위해 Managed Service for Apache Flink 애플리케이션을 생성하기 전에 두 개의 Kinesis 데이터 스트림(ExampleInputStreamExampleOutputStream)을 생성하세요. 이 애플리케이션은 애플리케이션 소스 및 대상 스트림에 대해 이러한 스트림을 사용합니다.

Amazon Kinesis 콘솔 또는 다음 AWS CLI 명령을 사용하여 이러한 스트림을 생성할 수 있습니다. 콘솔 지침은 Amazon Kinesis Data Streams 개발자 가이드데이터 스트림 생성 및 업데이트를 참조하세요. 를 사용하여 스트림을 생성하려면 다음 명령을 AWS CLI사용하여 애플리케이션에 사용하는 리전에 맞게 조정합니다.

데이터 스트림 (AWS CLI)을 생성하려면
  1. 첫 번째 스트림(ExampleInputStream)을 생성하려면 다음 Amazon Kinesis create-stream AWS CLI 명령을 사용합니다.

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \
  2. 애플리케이션이 출력을 쓰는 데 사용하는 두 번째 스트림을 생성하려면 동일한 명령을 실행하고 스트림 이름을 로 변경합니다ExampleOutputStream.

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \

애플리케이션 코드에 대한 Amazon S3 버킷 생성

콘솔을 사용하여 Amazon S3 버킷을 생성할 수 있습니다. 콘솔을 사용하여 Amazon S3 버킷을 생성하는 방법을 알아보려면 Amazon S3 사용 설명서버킷 생성을 참조하세요. 로그인 이름을 추가하는 등 전역적으로 고유한 이름을 사용하여 Amazon S3 버킷의 이름을 지정합니다.

참고

이 자습서(us-east-1)에 사용하는 리전에서 버킷을 생성해야 합니다.

기타 리소스

애플리케이션을 생성할 때 Managed Service for Apache Flink는 다음 Amazon CloudWatch 리소스가 아직 없는 경우 자동으로 생성합니다.

  • /AWS/KinesisAnalytics-java/<my-application>라는 로그 그룹.

  • kinesis-analytics-log-stream라는 로그 스트림.

로컬 개발 환경 설정

개발 및 디버깅의 경우 IDE 선택한 에서 직접 시스템에서 Apache Flink 애플리케이션을 실행할 수 있습니다. 모든 Apache Flink 종속성은 Apache Maven을 사용하여 일반 Java 종속성처럼 처리됩니다.

참고

개발 머신에는 Java JDK 11, Maven 및 Git이 설치되어 있어야 합니다. Eclipse Java Neon 또는 IntelliJIDEA와 같은 개발 환경을 사용하는 것이 좋습니다. 모든 사전 조건을 충족하는지 확인하려면 섹션을 참조하세요연습을 완료하기 위한 사전 조건 이행. 시스템에 Apache Flink 클러스터를 설치할 필요가 없습니다.

세션 인증 AWS

애플리케이션은 Kinesis 데이터 스트림을 사용하여 데이터를 게시합니다. 로컬에서 실행할 때는 Kinesis 데이터 스트림에 쓸 수 있는 권한이 있는 유효한 AWS 인증된 세션이 있어야 합니다. 다음 단계에 따라 세션을 인증합니다.

  1. 유효한 자격 증명이 구성된 AWS CLI 및 명명된 프로파일이 없는 경우 섹션을 참조하세요AWS Command Line Interface (AWS CLI) 설정.

  2. 다음 테스트 레코드를 게시하여 AWS CLI 가 올바르게 구성되었고 사용자에게 Kinesis 데이터 스트림에 쓸 수 있는 권한이 있는지 확인합니다.

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. 에 와 통합할 플러그인IDE이 있는 경우 AWS플러그인을 사용하여 에서 실행되는 애플리케이션에 보안 인증을 전달할 수 있습니다IDE. 자세한 내용은 AWS Toolkit for IntelliJ IDEAAWS Toolkit for Eclipse를 참조하세요.

Apache Flink 스트리밍 Java 코드 다운로드 및 검사

이 예제의 Java 애플리케이션 코드는 에서 사용할 수 있습니다 GitHub. 애플리케이션 코드를 다운로드하려면 다음을 수행하세요.

  1. 다음 명령을 사용하여 원격 리포지토리를 복제합니다.

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted 디렉터리로 이동합니다.

애플리케이션 구성 요소 검토

애플리케이션은 com.amazonaws.services.msf.BasicStreamingJob 클래스에서 완전히 구현됩니다. 이 main() 메서드는 스트리밍 데이터를 처리하고 실행할 데이터 흐름을 정의합니다.

참고

최적화된 개발자 경험을 위해 애플리케이션은 Amazon Managed Service for Apache Flink 및 로컬에서 코드 변경 없이 실행되도록 설계되어 에서 개발할 수 있습니다IDE.

  • Amazon Managed Service for Apache Flink 및 에서 실행될 때 작동하도록 런타임 구성을 읽으려면 IDE애플리케이션이 에서 로컬로 독립 실행 중인지 자동으로 감지합니다IDE. 이 경우 애플리케이션은 런타임 구성을 다르게 로드합니다.

    1. 애플리케이션이 에서 독립 실행형 모드로 실행되고 있음을 감지하면 프로젝트의 리소스 폴더에 포함된 application_properties.json 파일을 IDE만듭니다. 파일의 내용은 다음과 같습니다.

    2. 애플리케이션이 Amazon Managed Service for Apache Flink에서 실행되면 기본 동작은 Amazon Managed Service for Apache Flink 애플리케이션에서 정의할 런타임 속성에서 애플리케이션 구성을 로드합니다. Managed Service for Apache Flink 애플리케이션 생성 및 구성을 참조하세요.

      private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
  • main() 메서드는 애플리케이션 데이터 흐름을 정의하고 실행합니다.

    • 기본 스트리밍 환경을 초기화합니다. 이 예제에서는 와 함께 StreamExecutionEnvironment 사용할 DataSteam API와 및 테이블 과 함께 StreamTableEnvironment 사용할 SQL를 모두 생성하는 방법을 보여줍니다API. 두 환경 객체는 서로 다른 를 사용하기 위해 동일한 런타임 환경에 대한 두 개의 개별 참조입니다APIs.

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    • 애플리케이션 구성 파라미터를 로드합니다. 이렇게 하면 애플리케이션이 실행 중인 위치에 따라 올바른 위치에서 자동으로 로드됩니다.

      Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    • 애플리케이션은 Kinesis Consumer 커넥터를 사용하여 입력 스트림에서 데이터를 읽는 소스를 정의합니다. 입력 스트림의 구성은 PropertyGroupId=에 정의되어 있습니다InputStream0. 스트림의 이름과 리전은 aws.region 각각 stream.name 및 라는 속성에 있습니다. 간소화를 위해 이 소스는 레코드를 문자열로 읽습니다.

      private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... }
    • 그런 다음 애플리케이션은 Kinesis Streams Sink 커넥터를 사용하여 출력 스트림으로 데이터를 전송하는 싱크를 정의합니다. 출력 스트림 이름 및 리전은 입력 스트림과 OutputStream0유사한 PropertyGroupId=에 정의됩니다. 싱크는 소스에서 데이터를 DataStream 가져오는 내부 에 직접 연결됩니다. 실제 애플리케이션에서는 소스와 싱크 간에 약간의 변환이 있습니다.

      private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... }
    • 마지막으로 방금 정의한 데이터 흐름을 실행합니다. 데이터 흐름에 필요한 모든 연산자를 정의한 후 이 명령은 main() 메서드의 마지막 명령이어야 합니다.

      env.execute("Flink streaming Java API skeleton");

pom.xml 파일 사용

pom.xml 파일은 애플리케이션에 필요한 모든 종속성을 정의하고 Maven Shade 플러그인을 설정하여 Flink에 필요한 모든 종속성이 포함된 fat-jar를 빌드합니다.

  • 일부 종속성에는 provided 범위가 있습니다. 이러한 종속성은 애플리케이션이 Amazon Managed Service for Apache Flink에서 실행될 때 자동으로 사용할 수 있습니다. 애플리케이션을 컴파일하거나 에서 애플리케이션을 로컬로 실행하는 데 필요합니다IDE. 자세한 내용은 로컬에서 애플리케이션 실행 단원을 참조하십시오. Amazon Managed Service for Apache Flink에서 사용할 런타임과 동일한 Flink 버전을 사용하고 있는지 확인합니다.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • 이 애플리케이션에서 사용하는 Kinesis 커넥터와 같은 기본 범위를 사용하여 폼에 Apache Flink 종속성을 추가해야 합니다. 자세한 내용은 Apache Flink 커넥터 사용 단원을 참조하십시오. 애플리케이션에 필요한 Java 종속성을 추가할 수도 있습니다.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
  • Maven Java 컴파일러 플러그인은 코드가 현재 Apache Flink에서 지원되는 JDK 버전인 Java 11에 대해 컴파일되었는지 확인합니다.

  • Maven Shade 플러그인은 런타임에서 제공하는 일부 라이브러리를 제외하고 fat-jar를 패키징합니다. 또한 ServicesResourceTransformer 및 라는 두 개의 변환기도 지정합니다ManifestResourceTransformer. 후자는 애플리케이션을 시작하는 main 메서드가 포함된 클래스를 구성합니다. 기본 클래스의 이름을 바꾸는 경우 이 변환기를 업데이트하는 것을 잊지 마세요.

  • <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>

입력 스트림에 샘플 레코드 쓰기

이 섹션에서는 애플리케이션이 처리할 수 있도록 샘플 레코드를 스트림으로 보냅니다. Python 스크립트 또는 Kinesis Data Generator를 사용하여 샘플 데이터를 생성하는 두 가지 옵션이 있습니다.

Python 스크립트를 사용하여 샘플 데이터 생성

Python 스크립트를 사용하여 샘플 레코드를 스트림으로 보낼 수 있습니다.

참고

이 Python 스크립트를 실행하려면 Python 3.x를 사용하고 AWS SDK for Python(Boto) 라이브러리가 설치되어 있어야 합니다.

Kinesis 입력 스트림으로 테스트 데이터 전송을 시작하려면:

  1. 데이터 생성기 GitHub 리포지토리 에서 데이터 생성기 stock.py Python 스크립트를 다운로드합니다.

  2. stock.py 스크립트를 실행합니다.

    $ python stock.py

자습서의 나머지 부분을 완료하는 동안 스크립트를 계속 실행합니다. 이제 Apache Flink 애플리케이션을 실행할 수 있습니다.

Kinesis Data Generator를 사용하여 샘플 데이터 생성

Python 스크립트를 사용하는 대신 호스팅 버전 에서도 사용할 수 있는 Kinesis Data Generator 를 사용하여 스트림에 무작위 샘플 데이터를 보낼 수 있습니다. Kinesis Data Generator는 브라우저에서 실행되며 시스템에 아무것도 설치할 필요가 없습니다.

Kinesis Data Generator를 설정하고 실행하려면:

  1. Kinesis Data Generator 설명서의 지침에 따라 도구에 대한 액세스를 설정합니다. 사용자와 암호를 설정하는 AWS CloudFormation 템플릿을 실행합니다.

  2. 템플릿에서 URL 생성된 를 통해 Kinesis Data Generator에 액세스합니다 CloudFormation. CloudFormation 템플릿이 완료된 후 출력 URL 탭에서 를 찾을 수 있습니다.

  3. 데이터 생성기 구성:

    • 리전: 이 자습서에 사용 중인 리전을 선택합니다. us-east-1

    • 스트림/전달 스트림: 애플리케이션이 사용할 입력 스트림을 선택합니다. ExampleInputStream

    • 초당 레코드 수: 100

    • 레코드 템플릿: 다음 템플릿을 복사하여 붙여넣습니다.

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. 템플릿 테스트: 테스트 템플릿을 선택하고 생성된 레코드가 다음과 유사한지 확인합니다.

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. 데이터 생성기 시작: 데이터 전송을 선택합니다.

이제 Kinesis Data Generator가 로 데이터를 전송합니다ExampleInputStream.

로컬에서 애플리케이션 실행

에서 로컬로 Flink 애플리케이션을 실행하고 디버깅할 수 있습니다IDE.

참고

계속하기 전에 입력 및 출력 스트림을 사용할 수 있는지 확인합니다. Amazon Kinesis 데이터 스트림 2개 생성을 참조하세요. 또한 두 스트림 모두에서 읽고 쓸 수 있는 권한이 있는지 확인합니다. 세션 인증 AWS을 참조하세요.

로컬 개발 환경을 설정하려면 Java 개발을 IDE 위해 Java 11 , JDKApache Maven 및 및 가 필요합니다. 필수 사전 조건을 충족하는지 확인합니다. 연습을 완료하기 위한 사전 조건 이행을 참조하세요.

Java 프로젝트를 로 가져오기 IDE

에서 애플리케이션 작업을 시작하려면 애플리케이션을 Java 프로젝트로 가져와IDE야 합니다.

복제한 리포지토리에는 여러 예제가 포함되어 있습니다. 각 예제는 별도의 프로젝트입니다. 이 자습서의 경우 ./java/GettingStarted 하위 디렉터리의 콘텐츠를 로 가져옵니다IDE.

Maven을 사용하여 코드를 기존 Java 프로젝트로 삽입합니다.

참고

새 Java 프로젝트를 가져오는 정확한 프로세스는 사용 IDE 중인 에 따라 달라집니다.

로컬 애플리케이션 구성 확인

로컬에서 실행될 때 애플리케이션은 아래의 프로젝트 리소스 폴더에 있는 application_properties.json 파일의 구성을 사용합니다./src/main/resources. 다른 Kinesis 스트림 이름 또는 리전을 사용하도록 이 파일을 편집할 수 있습니다.

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

IDE 실행 구성 설정

모든 Java 애플리케이션을 실행하는 com.amazonaws.services.msf.BasicStreamingJob것처럼 기본 클래스 를 실행하여 에서 IDE 직접 Flink 애플리케이션을 실행하고 디버깅할 수 있습니다. 애플리케이션을 실행하기 전에 실행 구성을 설정해야 합니다. 설정은 사용 IDE 중인 에 따라 달라집니다. 예를 들어 IntelliJ IDEA 설명서의 구성 실행/디버그를 참조하세요. 특히 다음을 설정해야 합니다.

  1. 클래스 경로 에 provided 종속성을 추가합니다. 이는 로컬에서 실행할 때 provided 범위가 있는 종속성이 애플리케이션에 전달되도록 하기 위해 필요합니다. 이 설정이 없으면 애플리케이션에 class not found 오류가 즉시 표시됩니다.

  2. 자격 AWS 증명을 전달하여 Kinesis 스트림에 액세스합니다. 가장 빠른 방법은 AWS IntelliJ용 도구 키트를 IDEA사용하는 것입니다. 실행 구성에서 이 IDE 플러그인을 사용하여 특정 AWS 프로파일을 선택할 수 있습니다. AWS 인증은 이 프로파일을 사용하여 수행됩니다. 보안 인증 정보를 직접 전달할 AWS 필요가 없습니다.

  3. JDK 11을 사용하여 애플리케이션을 IDE 실행하는지 확인합니다.

에서 애플리케이션 실행 IDE

에 대한 실행 구성을 설정한 후 일반 Java 애플리케이션처럼 실행하거나 디버깅할 BasicStreamingJob수 있습니다.

참고

명령줄에서 Maven에서 직접 생성한 fat-jarjava -jar ...를 로 실행할 수 없습니다. 이 jar에는 애플리케이션을 독립적으로 실행하는 데 필요한 Flink 코어 종속 항목이 포함되어 있지 않습니다.

애플리케이션이 성공적으로 시작되면 독립 실행형 미니클러스터와 커넥터 초기화에 대한 일부 정보를 기록합니다. 그러면 애플리케이션이 시작될 때 Flink가 일반적으로 내보내는 여러 로그INFO와 일부 WARN 로그가 표시됩니다.

13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....

초기화가 완료되면 애플리케이션에서 추가 로그 항목을 내보내지 않습니다. 데이터가 흐르는 동안에는 로그가 생성되지 않습니다.

다음 섹션에 설명된 대로 애플리케이션이 데이터를 올바르게 처리하고 있는지 확인하기 위해 입력 및 출력 Kinesis 스트림을 검사할 수 있습니다.

참고

데이터 흐름에 대한 로그를 내보내지 않는 것은 Flink 애플리케이션의 일반적인 동작입니다. 모든 레코드에 대한 로그를 내보내는 것은 디버깅에 편리할 수 있지만 프로덕션에서 실행할 때 상당한 오버헤드를 추가할 수 있습니다.

Kinesis 스트림의 입력 및 출력 데이터 관찰

Amazon Kinesis 콘솔의 Data Viewer를 사용하여 (샘플 Python 생성) 또는 Kinesis Data Generator(링크)를 통해 입력 스트림으로 전송된 레코드를 관찰할 수 있습니다. Amazon Kinesis

레코드를 관찰하려면
  1. https://console.aws.amazon.com/kinesis 에서 Kinesis 콘솔을 엽니다.

  2. 리전이 기본적으로 us-east-1 US East(버지니아 북부)인 이 자습서를 실행하는 곳과 동일한지 확인합니다. 리전이 일치하지 않는 경우 리전을 변경합니다.

  3. 데이터 스트림 을 선택합니다.

  4. ExampleInputStream 또는 중에서 관찰하려는 스트림을 선택합니다. ExampleOutputStream.

  5. 데이터 뷰어 탭을 선택합니다.

  6. 샤드를 선택하고 최신시작 위치로 유지한 다음 레코드 가져오기를 선택합니다. “이 요청에 대한 레코드를 찾을 수 없음” 오류가 표시될 수 있습니다. 그렇다면 레코드 가져오기 재시도를 선택합니다. 스트림 디스플레이에 게시된 최신 레코드입니다.

  7. 데이터 열에서 값을 선택하여 JSON 형식의 레코드 내용을 검사합니다.

로컬에서 애플리케이션 실행 중지

에서 실행 중인 애플리케이션을 중지합니다IDE. 는 IDE 일반적으로 “중지” 옵션을 제공합니다. 정확한 위치와 방법은 사용 IDE 중인 에 따라 다릅니다.

애플리케이션 코드 컴파일 및 패키징

이 섹션에서는 Apache Maven을 사용하여 Java 코드를 컴파일하고 JAR 파일로 패키징합니다. Maven 명령줄 도구 또는 를 사용하여 코드를 컴파일하고 패키징할 수 있습니다IDE.

Maven 명령줄을 사용하여 컴파일하고 패키징하려면:

Java GettingStarted 프로젝트가 포함된 디렉터리로 이동하여 다음 명령을 실행합니다.

$ mvn package

를 사용하여 컴파일하고 패키징하려면IDE:

IDE Maven 통합mvn package에서 를 실행합니다.

두 경우 모두 라는 JAR 파일이 생성됩니다target/amazon-msf-java-stream-app-1.0.jar.

참고

에서 '프로젝트 빌드'를 실행하면 JAR 파일이 생성되지 않을 IDE 수 있습니다.

애플리케이션 코드 JAR 파일 업로드

이 섹션에서는 이전 섹션에서 생성한 JAR 파일을 이 자습서의 시작 부분에서 생성한 Amazon Simple Storage Service(Amazon S3) 버킷에 업로드합니다. 이 단계를 완료하지 않은 경우 (링크)를 참조하세요.

애플리케이션 코드 JAR 파일을 업로드하려면
  1. 에서 Amazon S3 콘솔을 엽니다https://console.aws.amazon.com/s3/.

  2. 애플리케이션 코드에 대해 이전에 생성한 버킷을 선택합니다.

  3. 업로드를 선택합니다.

  4. [Add Files]를 선택합니다.

  5. 이전 단계에서 생성된 JAR 파일 로 이동합니다target/amazon-msf-java-stream-app-1.0.jar.

  6. 다른 설정을 변경하지 않고 업로드를 선택합니다.

주의

에서 올바른 JAR 파일을 선택해야 합니다<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar.

target 디렉터리에는 업로드할 필요가 없는 다른 JAR 파일도 포함되어 있습니다.

Managed Service for Apache Flink 애플리케이션 생성 및 구성

콘솔이나 AWS CLI를 사용하여 Managed Service for Apache Flink 애플리케이션을 생성하고 실행할 수 있습니다. 이 자습서에서는 콘솔을 사용합니다.

참고

콘솔을 사용하여 애플리케이션을 생성하면 AWS Identity and Access Management (IAM) 및 Amazon CloudWatch Logs 리소스가 생성됩니다. 를 사용하여 애플리케이션을 생성할 때 이러한 리소스를 별도로 AWS CLI생성합니다.

애플리케이션 생성

애플리케이션을 생성하려면
  1. https://console.aws.amazon.com/flink에서 Managed Service for Apache Flink 콘솔 열기

  2. us-east-1 US East(버지니아 북부)와 같은 올바른 리전이 선택되었는지 확인합니다.

  3. 오른쪽의 메뉴를 열고 Apache Flink 애플리케이션을 선택한 다음 스트리밍 애플리케이션 생성 을 선택합니다. 또는 초기 페이지의 시작하기 컨테이너에서 스트리밍 애플리케이션 생성을 선택합니다.

  4. 스트리밍 애플리케이션 생성 페이지에서:

    • 스트림 처리 애플리케이션을 설정할 방법을 선택합니다. 처음부터 생성을 선택합니다.

    • Apache Flink 구성, Application Flink 버전: Apache Flink 1.19를 선택합니다.

  5. 애플리케이션 구성

    • 애플리케이션 이름: 를 입력합니다MyApplication.

    • 설명: 를 입력합니다My java test app.

    • 애플리케이션 리소스에 대한 액세스: 필요한 정책kinesis-analytics-MyApplication-us-east-1으로 IAM 역할 생성/업데이트를 선택합니다.

  6. 애플리케이션 설정을 위한 템플릿 구성

    • 템플릿: 개발 을 선택합니다.

  7. 페이지 하단에서 스트리밍 애플리케이션 생성을 선택합니다.

참고

콘솔을 사용하여 Managed Service for Apache Flink 애플리케이션을 생성할 때 애플리케이션에 대한 IAM 역할 및 정책을 생성할 수 있습니다. 귀하의 애플리케이션은 이 역할 및 정책을 사용하여 종속 리소스에 액세스합니다. 이러한 IAM 리소스는 다음과 같이 애플리케이션 이름과 리전을 사용하여 이름이 지정됩니다.

  • 정책: kinesis-analytics-service-MyApplication-us-east-1

  • 역할: kinesisanalytics-MyApplication-us-east-1

Amazon Managed Service for Apache Flink는 이전에는 Kinesis Data Analytics라고 불렸습니다. 자동으로 생성되는 리소스의 이름은 이전 버전과의 호환성을 kinesis-analytics- 위해 접두사가 붙습니다.

IAM 정책 편집

IAM 정책을 편집하여 Kinesis 데이터 스트림에 액세스할 수 있는 권한을 추가합니다.

정책을 편집하려면
  1. 에서 IAM 콘솔을 엽니다https://console.aws.amazon.com/iam/.

  2. 정책을 선택하세요. 이전 섹션에서 콘솔이 생성한 kinesis-analytics-service-MyApplication-us-east-1 정책을 선택합니다.

  3. 편집을 선택한 다음 JSON 탭을 선택합니다.

  4. 다음 정책 예제의 강조 표시된 부분을 정책에 추가하세요. 샘플 계정 교체IDs(012345678901)를 계정 ID와 함께 사용합니다.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. 페이지 하단에서 다음을 선택한 다음 변경 사항 저장을 선택합니다.

애플리케이션 구성

애플리케이션 구성을 편집하여 애플리케이션 코드 아티팩트를 설정합니다.

구성을 편집하려면
  1. MyApplication 페이지에서 구성을 선택합니다.

  2. 애플리케이션 코드 위치 섹션에서 다음을 수행합니다.

    • Amazon S3 버킷 에서 애플리케이션 코드에 대해 이전에 생성한 버킷을 선택합니다. 찾아보기를 선택하고 올바른 버킷을 선택한 다음 선택을 선택합니다. 버킷 이름을 클릭하지 마세요.

    • Amazon S3 객체 경로에는 amazon-msf-java-stream-app-1.0.jar를 입력합니다.

  3. 액세스 권한 에서 필요한 정책kinesis-analytics-MyApplication-us-east-1으로 IAM 역할 생성/업데이트를 선택합니다.

  4. 런타임 속성 섹션에서 다음 속성을 추가합니다.

  5. 새 항목 추가를 선택하고 다음 각 파라미터를 추가합니다.

    그룹 ID
    InputStream0 stream.name ExampleInputStream
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
  6. 다른 섹션은 수정하지 마세요.

  7. Save changes(변경 사항 저장)를 선택합니다.

참고

Amazon CloudWatch 로깅을 활성화하도록 선택하면 Managed Service for Apache Flink가 로그 그룹과 로그 스트림을 생성합니다. 이러한 리소스의 이름은 다음과 같습니다.

  • 로그 그룹: /aws/kinesis-analytics/MyApplication

  • 로그 스트림: kinesis-analytics-log-stream

애플리케이션을 실행합니다

이제 애플리케이션이 구성되었으며 실행할 준비가 되었습니다.

애플리케이션을 실행하려면
  1. Amazon Managed Service for Apache Flink용 콘솔에서 내 애플리케이션을 선택하고 실행을 선택합니다.

  2. 다음 페이지의 애플리케이션 복원 구성 페이지에서 최신 스냅샷으로 실행을 선택한 다음 실행을 선택합니다.

    애플리케이션의 상태 세부 정보는 에서 Ready Starting로, 이후 애플리케이션이 시작Running되면 로 전환됩니다.

애플리케이션이 Running 상태이면 이제 Flink 대시보드를 열 수 있습니다.

대시보드 열기
  1. Apache Flink 대시보드 열기를 선택합니다. 대시보드가 새 페이지에서 열립니다.

  2. 작업 실행 목록에서 볼 수 있는 단일 작업을 선택합니다.

    참고

    런타임 속성을 설정하거나 IAM 정책을 잘못 편집한 경우 애플리케이션 상태가 로 전환될 수 Running있지만 Flink 대시보드에 작업이 계속 다시 시작되고 있는 것으로 표시됩니다. 애플리케이션이 잘못 구성되었거나 외부 리소스에 액세스할 권한이 없는 경우 일반적인 장애 시나리오입니다.

    이 경우 Flink 대시보드의 예외 탭을 확인하여 문제의 원인을 확인합니다.

실행 중인 애플리케이션의 지표 관찰

MyApplication 페이지의 Amazon CloudWatch 지표 섹션에서 실행 중인 애플리케이션의 몇 가지 기본 지표를 볼 수 있습니다.

지표를 보려면
  1. 새로 고침 버튼 옆에 있는 드롭다운 목록에서 10초를 선택합니다.

  2. 애플리케이션이 실행 중이고 정상 상태이면 가동 시간 지표가 지속적으로 증가하는 것을 확인할 수 있습니다.

  3. 전체 재시작 지표는 0이어야 합니다. 증가하면 구성에 문제가 있을 수 있습니다. 문제를 조사하려면 Flink 대시보드의 예외 탭을 검토하세요.

  4. 정상 애플리케이션에서 실패한 체크포인트 수 지표는 0이어야 합니다.

    참고

    이 대시보드는 5분의 세분화된 고정 지표 세트를 표시합니다. 대시보드의 지표를 사용하여 사용자 지정 애플리케이션 CloudWatch 대시보드를 생성할 수 있습니다.

Kinesis 스트림에서 출력 데이터 관찰

Python 스크립트 또는 Kinesis Data Generator를 사용하여 여전히 입력에 데이터를 게시하고 있는지 확인합니다.

이제 이전에 이미 수행한 것과 https://console.aws.amazon.com/kinesis/마찬가지로 에서 Data Viewer를 사용하여 Managed Service for Apache Flink에서 실행되는 애플리케이션의 출력을 관찰할 수 있습니다.

출력을 보려면
  1. https://console.aws.amazon.com/kinesis 에서 Kinesis 콘솔을 엽니다.

  2. 리전이 이 자습서를 실행하는 데 사용하는 리전과 동일한지 확인합니다. 기본적으로 us-east-1US East(버지니아 북부)입니다. 필요한 경우 리전을 변경합니다.

  3. 데이터 스트림 을 선택합니다.

  4. 관찰하려는 스트림을 선택합니다. 본 자습서에서는 ExampleOutputStream를 사용합니다.

  5. 데이터 뷰어 탭을 선택합니다.

  6. 샤드를 선택하고 최신시작 위치 로 유지한 다음 레코드 가져오기를 선택합니다. “이 요청에 대한 레코드를 찾을 수 없음” 오류가 표시될 수 있습니다. 그렇다면 레코드 가져오기 재시도를 선택합니다. 스트림에 게시된 최신 레코드가 표시됩니다.

  7. 데이터 열에서 값을 선택하여 JSON 형식의 레코드 내용을 검사합니다.

애플리케이션 중지

애플리케이션을 중지하려면 Managed Service for Apache Flink 애플리케이션의 콘솔 페이지로 이동합니다MyApplication.

애플리케이션을 중지하려면
  1. 작업 드롭다운 목록에서 중지를 선택합니다.

  2. 애플리케이션의 상태 세부 정보는 에서 RunningStopping전환된 다음 애플리케이션이 완전히 중지Ready되면 로 전환됩니다.

    참고

    또한 Python 스크립트 또는 Kinesis Data Generator에서 입력 스트림으로 데이터 전송을 중지하는 것도 잊지 마세요.

다음 단계

AWS 리소스 정리