Python용 Apache Flink용 매니지드 서비스 생성 및 실행 애플리케이션 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink는 이전에 Amazon Kinesis Data Analytics for Apache Flink로 알려졌습니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Python용 Apache Flink용 매니지드 서비스 생성 및 실행 애플리케이션

이 섹션에서는 Kinesis 스트림을 소스로 사용하고 싱크를 사용하여 Python용 Apache Flink용 관리 서비스 애플리케이션을 생성합니다.

종속 리소스를 생성하십시오.

이 연습을 위해 Managed Service for Apache Flink를 생성하기 전에 먼저 다음과 같은 종속 리소스를 생성해야 합니다.

  • 입력 및 출력을 위한 Kinesis 스트림 2개.

  • 애플리케이션 코드를 저장하는 Amazon S3 버킷.

참고

이 자습서에서는 us-east-1 지역에 애플리케이션을 배포한다고 가정합니다. 다른 지역을 사용하는 경우 모든 단계를 적절하게 조정해야 합니다.

두 개의 Kinesis 스트림을 생성합니다.

이 연습을 위해 Apache Flink용 관리 서비스 애플리케이션을 생성하기 전에 애플리케이션을 배포하는 데 사용할 동일한 리전에 두 개의 Kinesis 데이터 스트림 (ExampleInputStreamExampleOutputStream) 을 생성하십시오 (이 예제에서는 us-east-1). 이 애플리케이션은 애플리케이션 소스 및 대상 스트림에 대해 이러한 스트림을 사용합니다.

Amazon Kinesis 콘솔 또는 다음 AWS CLI 명령을 사용하여 이러한 스트림을 만들 수 있습니다. 콘솔 지침은 Amazon Kinesis Data Streams 개발자 가이드데이터 스트림 생성 및 업데이트를 참조하세요.

데이터 스트림 (AWS CLI)을 생성하려면
  1. 첫 번째 스트림 (ExampleInputStream) 을 생성하려면 다음 Amazon Kinesis 명령을 create-stream AWS CLI 사용하십시오.

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1
  2. 애플리케이션에서 출력을 쓰는 데 사용하는 두 번째 스트림을 생성하려면 동일한 명령을 실행하여 스트림 명칭을 ExampleOutputStream으로 변경합니다.

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1

Amazon S3 버킷 생성

콘솔을 사용하여 Amazon S3 버킷을 생성할 수 있습니다. 리소스 생성에 대한 지침은 다음 주제를 참조하세요.

  • Amazon Simple Storage Service 사용 설명서S3 버킷을 생성하려면 어떻게 해야 합니까? Amazon S3 버킷에 전역적으로 고유한 이름을 지정합니다 (예: 로그인 이름 추가).

    참고

    이 자습서에서 사용하는 리전 (us-east-1) 에 S3 버킷을 생성해야 합니다.

기타 리소스

애플리케이션을 생성할 때 Apache Flink용 관리형 서비스는 다음과 같은 Amazon CloudWatch 리소스를 생성합니다 (아직 존재하지 않는 경우).

  • /AWS/KinesisAnalytics-java/<my-application>라는 로그 그룹.

  • kinesis-analytics-log-stream라는 로그 스트림.

로컬 개발 환경 설정

개발 및 디버깅을 위해 컴퓨터에서 Python Flink 응용 프로그램을 실행할 수 있습니다. 원하는 Python을 python main.py 사용하거나 IDE Python으로 명령줄에서 응용 프로그램을 시작할 수 있습니다.

참고

개발 머신에는 Python 3.10 또는 3.11, 자바 11, 아파치 메이븐, Git이 설치되어 있어야 합니다. IDE등의 PyCharm코드나 비주얼 스튜디오 코드를 사용하는 것이 좋습니다. 모든 사전 요구 사항을 충족하는지 확인하려면 계속 연습을 완료하기 위한 사전 요구 사항을 충족하십시오. 진행하기 전에 을 참조하십시오.

애플리케이션을 개발하고 로컬에서 실행하려면 Flink Python 라이브러리를 설치해야 합니다.

  1. VirtualEnv, Conda 또는 유사한 Python 도구를 사용하여 독립형 Python 환경을 만드십시오.

  2. 해당 환경에 PyFlink 라이브러리를 설치합니다. Apache Flink용 아마존 매니지드 서비스에서 사용할 것과 동일한 아파치 플링크 런타임 버전을 사용하십시오. 현재 권장 런타임은 1.19.1입니다.

    $ pip install apache-flink==1.19.1
  3. 애플리케이션을 실행할 때 환경이 활성 상태인지 확인하십시오. 에서 응용 프로그램을 실행하는 경우 해당 환경을 런타임으로 사용하고 IDE 있는지 확인하십시오. IDE 프로세스는 사용 중인 IDE 애플리케이션에 따라 달라집니다.

    참고

    PyFlink 라이브러리만 설치하면 됩니다. 컴퓨터에 Apache Flink 클러스터를 설치할 필요는 없습니다.

세션을 인증하세요. AWS

애플리케이션은 Kinesis 데이터 스트림을 사용하여 데이터를 게시합니다. 로컬에서 실행하는 경우 Kinesis 데이터 스트림에 쓸 수 있는 권한이 있는 유효한 AWS 인증 세션이 있어야 합니다. 다음 단계를 사용하여 세션을 인증하십시오.

  1. 유효한 자격 증명이 구성된 AWS CLI 및 이름이 지정된 프로필이 없는 경우 을 참조하십시오. AWS Command Line Interface (AWS CLI) 를 설정합니다.

  2. 다음 테스트 기록을 게시하여 AWS CLI 가 올바르게 구성되어 있고 사용자에게 Kinesis 데이터 스트림에 쓸 수 있는 권한이 있는지 확인하십시오.

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. 통합할 IDE 플러그인이 있는 AWS경우 이를 사용하여 에서 실행 중인 애플리케이션에 자격 증명을 전달할 수 있습니다. IDE 자세한 내용은 IntelliJ용 AWS 툴킷, Visual Studio Code용AWS 툴킷AWS IntelliJ용 툴킷을 참조하십시오. PyCharm IDEA

아파치 플링크 스트리밍 Python 코드 다운로드 및 검토

이 예제의 Python 응용 프로그램 코드는 에서 사용할 수 GitHub 있습니다. 애플리케이션 코드를 다운로드하려면 다음을 수행하세요.

  1. 다음 명령을 사용하여 원격 리포지토리를 복제합니다.

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. ./python/GettingStarted 디렉터리로 이동합니다.

애플리케이션 구성 요소 검토

애플리케이션 코드는 에 있습니다main.py. 우리는 Python에 SQL 내장된 것을 사용하여 응용 프로그램의 흐름을 정의합니다.

참고

최적화된 개발자 환경을 위해 애플리케이션은 Apache Flink용 Amazon Managed Service와 로컬 시스템에서 모두 코드 변경 없이 실행되도록 설계되었습니다. 애플리케이션은 환경 변수를 IS_LOCAL = true 사용하여 로컬에서 실행 중일 때를 감지합니다. 셸이나 셸의 실행 구성에서 환경 변수를 IS_LOCAL = true 설정해야 합니다IDE.

  • 애플리케이션은 실행 환경을 설정하고 런타임 구성을 읽습니다. Apache Flink용 Amazon 관리형 서비스와 로컬에서 모두 작동하기 위해 애플리케이션은 변수를 확인합니다. IS_LOCAL

    • 다음은 Apache Flink용 Amazon 관리형 서비스에서 애플리케이션을 실행할 때의 기본 동작입니다.

      1. 애플리케이션과 함께 패키징된 종속성을 로드합니다. 자세한 내용은 (링크) 를 참조하십시오.

      2. Apache Flink용 Amazon 관리형 서비스 애플리케이션에서 정의한 런타임 속성에서 구성을 로드합니다. 자세한 내용은 (링크) 을 참조하십시오.

    • 로컬에서 애플리케이션을 실행할 IS_LOCAL = true 때 애플리케이션이 이를 감지하는 경우:

      1. 프로젝트에서 외부 종속성을 로드합니다.

      2. 프로젝트에 포함된 application_properties.json 파일에서 구성을 로드합니다.

        ... APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" ... is_local = ( True if os.environ.get("IS_LOCAL") else False ) ... if is_local: APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar", )
  • 애플리케이션은 Kinesis Connector를 사용하여 CREATE TABLE 명령문을 사용하여 소스 테이블을 정의합니다. 이 테이블은 입력 Kinesis 스트림에서 데이터를 읽습니다. 애플리케이션은 런타임 구성에서 스트림 이름, 지역 및 초기 위치를 가져옵니다.

    table_env.execute_sql(f""" CREATE TABLE prices ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{input_stream_name}', 'aws.region' = '{input_stream_region}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """)
  • 또한 이 예제에서 애플리케이션은 Kinesis Connector를 사용하여 싱크 테이블을 정의합니다. 이 테이블은 출력 Kinesis 스트림으로 데이터를 전송합니다.

    table_env.execute_sql(f""" CREATE TABLE output ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{output_stream_name}', 'aws.region' = '{output_stream_region}', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )""")
  • 마지막으로 애플리케이션은 소스 INSERT INTO... 테이블의 싱크 테이블을 SQL 실행합니다. 좀 더 복잡한 애플리케이션의 경우 싱크에 쓰기 전에 데이터를 변환하는 추가 단계가 있을 수 있습니다.

    table_result = table_env.execute_sql("""INSERT INTO output SELECT ticker, price, event_time FROM prices""")
  • 애플리케이션을 로컬에서 실행하려면 main() 함수 끝에 다른 단계를 추가해야 합니다.

    if is_local: table_result.wait()

    이 명령문이 없으면 응용 프로그램을 로컬에서 실행할 때 응용 프로그램이 즉시 종료됩니다. Apache Flink용 아마존 매니지드 서비스에서 애플리케이션을 실행할 때는 이 명령문을 실행해서는 안 됩니다.

종속성 관리 JAR

PyFlink 애플리케이션에는 일반적으로 하나 이상의 커넥터가 필요합니다. 이 자습서의 애플리케이션은 Kinesis 커넥터를 사용합니다. Apache Flink는 JVM Java에서 실행되므로 Python으로 애플리케이션을 구현했는지 여부에 관계없이 커넥터가 JAR 파일로 배포됩니다. Apache Flink용 Amazon Managed Service에 애플리케이션을 배포할 때는 애플리케이션과 함께 이러한 종속성을 패키징해야 합니다.

이 예제에서는 Apache Maven을 사용하여 종속성을 가져오고 Apache Flink용 관리형 서비스에서 실행되도록 애플리케이션을 패키징하는 방법을 보여줍니다.

참고

종속성을 가져오고 패키징하는 다른 방법이 있습니다. 이 예제는 하나 이상의 커넥터에서 올바르게 작동하는 방법을 보여줍니다. 또한 코드를 변경하지 않고도 Apache Flink용 Managed Service for Apache Flink에서 애플리케이션을 로컬에서 실행할 수 있습니다.

pom.xml 파일을 사용하세요.

Apache Maven은 이 pom.xml 파일을 사용하여 종속성 및 애플리케이션 패키징을 제어합니다.

모든 JAR 종속성은 블록의 pom.xml 파일에 지정됩니다. <dependencies>...</dependencies>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>4.3.0-1.19</version> </dependency> </dependencies> ...

사용할 커넥터의 올바른 아티팩트와 버전을 찾으려면 을 참조하십시오. 아파치 플링크용 매니지드 서비스와 함께 아파치 플링크 커넥터 사용하기 사용 중인 Apache Flink 버전을 참조해야 합니다. 이 예제에서는 Kinesis 커넥터를 사용합니다. 아파치 플링크 1.19의 커넥터 버전은 다음과 같습니다. 4.3.0-1.19

참고

Apache Flink 1.19를 사용하는 경우 이 버전용으로 특별히 출시된 커넥터 버전은 없습니다. 1.18용으로 출시된 커넥터를 사용하십시오.

다운로드 및 패키지 종속성

Maven을 사용하여 pom.xml 파일에 정의된 종속성을 다운로드하고 Python Flink 응용 프로그램용으로 패키징합니다.

  1. 라는 Python Getting Started 프로젝트가 들어 있는 디렉토리로 이동합니다python/GettingStarted.

  2. 다음 명령 실행:

$ mvn package

Maven은 이라는 ./target/pyflink-dependencies.jar 새 파일을 만듭니다. 컴퓨터에서 로컬로 개발하는 경우 Python 응용 프로그램이 이 파일을 찾습니다.

참고

이 명령을 실행하는 것을 잊은 경우 응용 프로그램을 실행하려고 하면 다음과 같은 오류 메시지와 함께 실패합니다. 식별자 “kinesis의 팩토리를 찾을 수 없습니다.

입력 스트림에 샘플 레코드를 기록합니다.

이 섹션에서는 애플리케이션이 처리할 수 있도록 샘플 레코드를 스트림으로 전송합니다. Python 스크립트 또는 Kinesis 데이터 생성기를 사용하여 샘플 데이터를 생성하는 두 가지 옵션이 있습니다.

Python 스크립트를 사용하여 샘플 데이터 생성

Python 스크립트를 사용하여 샘플 레코드를 스트림으로 보낼 수 있습니다.

참고

이 Python 스크립트를 실행하려면 Python 3.x를 사용하고 AWS SDKPython용 (Boto) 라이브러리가 설치되어 있어야 합니다.

Kinesis 입력 스트림으로 테스트 데이터 전송을 시작하려면:

  1. 데이터 생성기 GitHub 리포지토리에서 데이터 생성기 stock.py Python 스크립트를 다운로드합니다.

  2. stock.py 스크립트를 실행합니다.

    $ python stock.py

자습서의 나머지 부분을 완료하는 동안 스크립트를 계속 실행하세요. 이제 Apache Flink 애플리케이션을 실행할 수 있습니다.

Kinesis 데이터 생성기를 사용하여 샘플 데이터 생성

Python 스크립트를 사용하는 대신 호스팅 버전으로도 제공되는 Kinesis Data Generator를 사용하여 무작위 샘플 데이터를 스트림에 보낼 수 있습니다. Kinesis Data Generator는 브라우저에서 실행되므로 시스템에 아무것도 설치할 필요가 없습니다.

Kinesis 데이터 생성기를 설정하고 실행하려면:

  1. Kinesis 데이터 생성기 설명서의 지침에 따라 도구에 대한 액세스 권한을 설정합니다. 사용자 및 암호를 설정하는 AWS CloudFormation 템플릿을 실행합니다.

  2. 템플릿에서 생성한 데이터를 통해 Kinesis 데이터 URL 생성기에 액세스할 수 있습니다 CloudFormation. CloudFormation 템플릿이 URL 완성된 후 출력 탭에서 찾을 수 있습니다.

  3. 데이터 생성기 구성:

    • 지역: 이 자습서에서 사용할 지역을 선택합니다. us-east-1

    • 스트림/전송 스트림: 애플리케이션에서 사용할 입력 스트림을 선택합니다. ExampleInputStream

    • 초당 레코드 수: 100

    • 레코드 템플릿: 다음 템플릿을 복사하여 붙여넣습니다.

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. 템플릿 테스트: 테스트 템플릿을 선택하고 생성된 레코드가 다음과 유사한지 확인합니다.

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. 데이터 생성기 시작: 데이터 전송 선택을 선택합니다.

Kinesis 데이터 생성기가 이제 에 데이터를 보내고 있습니다. ExampleInputStream

애플리케이션을 로컬에서 실행합니다.

명령줄에서 python main.py 또는 를 사용하여 실행하여 로컬에서 애플리케이션을 테스트할 수 있습니다IDE.

애플리케이션을 로컬에서 실행하려면 이전 섹션에서 설명한 대로 올바른 버전의 PyFlink 라이브러리가 설치되어 있어야 합니다. 자세한 내용은 (링크) 를 참조하십시오.

참고

계속하기 전에 입력 스트림과 출력 스트림을 사용할 수 있는지 확인하세요. 두 개의 Amazon Kinesis 데이터 스트림을 생성합니다.을 참조하세요. 또한 두 스트림 모두에서 읽고 쓸 수 있는 권한이 있는지 확인하십시오. 세션을 인증하세요. AWS을 참조하세요.

Python 프로젝트를 사용자 프로젝트로 가져오기 IDE

에서 응용 프로그램 작업을 시작하려면 Python 프로젝트로 가져와야 합니다. IDE

복제한 리포지토리에는 여러 예제가 들어 있습니다. 각 예제는 별도의 프로젝트입니다. 이 자습서에서는 ./python/GettingStarted 하위 디렉터리의 콘텐츠를 사용자 디렉토리로 가져오십시오. IDE

코드를 기존 Python 프로젝트로 가져옵니다.

참고

새 Python 프로젝트를 가져오는 정확한 프로세스는 사용 IDE 중인 프로젝트에 따라 다릅니다.

로컬 애플리케이션 구성을 확인하세요.

로컬에서 실행하는 경우 응용 프로그램은 프로젝트의 리소스 폴더에 있는 application_properties.json 파일의 구성을 사용합니다./src/main/resources. 이 파일을 편집하여 다른 Kinesis 스트림 이름 또는 지역을 사용할 수 있습니다.

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

Python 애플리케이션을 로컬에서 실행

응용 프로그램은 명령줄에서 일반 Python 스크립트로 실행하거나 에서 로컬로 실행할 수 IDE 있습니다.

명령줄에서 응용 프로그램을 실행하려면
  1. Conda와 같은 독립형 Python 환경 또는 Python Flink 라이브러리를 설치한 VirtualEnv 환경이 현재 활성화되어 있는지 확인하십시오.

  2. mvn package적어도 한 번은 실행했는지 확인하세요.

  3. IS_LOCAL = true 환경 변수를 설정합니다.

    $ export IS_LOCAL=true
  4. 응용 프로그램을 일반 Python 스크립트로 실행합니다.

    $python main.py
내에서 응용 프로그램을 실행하려면 IDE
  1. 다음 구성으로 main.py 스크립트를 IDE 실행하도록 구성하십시오.

    1. Conda와 같은 독립형 Python 환경 또는 라이브러리를 설치한 VirtualEnv PyFlink 환경을 사용하십시오.

    2. AWS 자격 증명을 사용하여 입력 및 출력 Kinesis 데이터 스트림에 액세스할 수 있습니다.

    3. IS_LOCAL = true을 설정합니다.

  2. 실행 구성을 설정하는 정확한 프로세스는 사용자에 따라 IDE 다르며 다양합니다.

  3. 를 IDE 설정했으면 Python 스크립트를 실행하고 응용 프로그램이 실행되는 IDE 동안 사용자가 제공하는 도구를 사용하십시오.

애플리케이션 로그를 로컬에서 검사하십시오.

로컬에서 실행하는 경우 애플리케이션이 시작될 때 몇 줄이 인쇄되어 표시되는 것을 제외하고는 콘솔에 어떤 로그도 표시되지 않습니다. PyFlink Python Flink 라이브러리가 설치된 디렉토리의 파일에 로그를 씁니다. 응용 프로그램은 시작될 때 로그의 위치를 인쇄합니다. 다음 명령을 실행하여 로그를 찾을 수도 있습니다.

$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
  1. 로깅 디렉터리에 있는 파일을 나열합니다. 일반적으로 단일 .log 파일을 찾을 수 있습니다.

  2. 애플리케이션이 실행되는 동안 파일을 tail -f <log-path>/<log-file>.log 추적하세요:.

Kinesis 스트림의 입력 및 출력 데이터 관찰

Amazon Kinesis 콘솔의 데이터 뷰어를 사용하여 (샘플 Python 생성) 또는 Kinesis 데이터 생성기 (링크) 에서 입력 스트림으로 전송된 레코드를 관찰할 수 있습니다.

레코드를 관찰하려면:

애플리케이션이 로컬에서 실행되지 않도록 하세요.

에서 실행 중인 애플리케이션을 중지하십시오IDE. 는 IDE 일반적으로 “중지” 옵션을 제공합니다. 정확한 위치와 방법은 에 따라 다릅니다IDE.

애플리케이션 코드를 패키지화하세요

이 섹션에서는 Apache Maven을 사용하여 애플리케이션 코드와 모든 필수 종속성을.zip 파일로 패키징합니다.

Maven 패키지 명령을 다시 실행합니다.

$ mvn package

이 명령은 파일을 target/managed-flink-pyflink-getting-started-1.0.0.zip 생성합니다.

Amazon S3 버킷에 애플리케이션 패키지 업로드

이 단원에서는 이전 섹션에서 생성한.zip 파일을 이 자습서 시작 부분에서 생성한 Amazon Simple Storage Service (Amazon S3) 버킷에 업로드합니다. 이 단계를 완료하지 않은 경우 (링크) 를 참조하십시오.

애플리케이션 코드 JAR 파일을 업로드하려면
  1. 에서 Amazon S3 콘솔을 엽니다 https://console.aws.amazon.com/s3/.

  2. 애플리케이션 코드용으로 이전에 생성한 버킷을 선택합니다.

  3. 업로드를 선택합니다.

  4. [Add Files]를 선택합니다.

  5. 이전 단계에서 생성된.zip 파일로 이동합니다. target/managed-flink-pyflink-getting-started-1.0.0.zip

  6. 다른 설정을 변경하지 않고 업로드를 선택합니다.

Apache Flink용 관리 서비스 애플리케이션 생성 및 구성

콘솔 또는 를 사용하여 Apache Flink 애플리케이션용 관리 서비스를 만들고 구성할 수 있습니다. AWS CLI이 자습서에서는 콘솔을 사용하겠습니다.

애플리케이션 생성

  1. /flink에서 아파치 플링크용 매니지드 서비스 콘솔을 여십시오. https://console.aws.amazon.com

  2. 올바른 지역이 선택되었는지 확인하십시오: 미국 동부 (버지니아 북부) us-east-1.

  3. 오른쪽 메뉴를 열고 Apache Flink 애플리케이션을 선택한 다음 스트리밍 애플리케이션 생성을 선택합니다. 또는 초기 페이지의 시작하기 섹션에서 스트리밍 애플리케이션 만들기를 선택할 수도 있습니다.

  4. 스트리밍 애플리케이션 생성 페이지에서:

    • 스트림 처리 응용 프로그램을 설정하는 방법 선택에서 처음부터 만들기를 선택합니다.

    • 아파치 플링크 구성, 애플리케이션 플링크 버전의 경우 아파치 플링크 1.19를 선택하십시오.

    • 애플리케이션 구성의 경우:

      • 애플리케이션 명칭MyApplication을 입력합니다.

      • 설명My Python test app를 입력합니다.

      • 애플리케이션 리소스 액세스에서 필수 정책을 사용하여 kinesis-analytics- MyApplication -us-east-1 IAM 역할 생성/업데이트를 선택합니다.

    • 애플리케이션용 템플릿 설정의 경우:

      • 템플릿의 경우 개발을 선택합니다.

    • 스트리밍 애플리케이션 생성을 선택합니다.

참고

콘솔을 사용하여 Apache Flink용 관리 서비스 애플리케이션을 만들 때 애플리케이션에 대한 IAM 역할 및 정책을 생성할 수 있는 옵션이 제공됩니다. 귀하의 애플리케이션은 이 역할 및 정책을 사용하여 종속 리소스에 액세스합니다. 이러한 IAM 리소스의 이름은 다음과 같이 애플리케이션 이름과 지역을 사용하여 지정됩니다.

  • 정책: kinesis-analytics-service-MyApplication-us-west-2

  • 역할: kinesisanalytics-MyApplication-us-west-2

아파치 플링크용 아마존 매니지드 서비스는 이전에 Kinesis Data Analytics로 알려졌습니다. 자동으로 생성되는 리소스 이름에는 이전 버전과의 호환성을 위해 접두사가 붙습니다. kinesis-analytics

정책을 편집합니다. IAM

IAM정책을 편집하여 Amazon S3 버킷에 액세스할 수 있는 권한을 추가합니다.

IAM정책을 편집하여 S3 버킷 권한을 추가하려면
  1. 에서 IAM 콘솔을 엽니다 https://console.aws.amazon.com/iam/.

  2. 정책을 선택하세요. 이전 섹션에서 콘솔이 생성한 kinesis-analytics-service-MyApplication-us-east-1 정책을 선택합니다.

  3. 편집을 선택한 다음 JSON탭을 선택합니다.

  4. 다음 정책 예제의 강조 표시된 부분을 정책에 추가하세요. 샘플 계정 교체 IDs (012345678901) 를 계정 ID로 사용하십시오.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. 다음변경 사항 저장을 차례로 선택합니다.

애플리케이션 구성

애플리케이션 구성을 편집하여 애플리케이션 코드 아티팩트를 설정합니다.

애플리케이션을 구성하려면
  1. MyApplication페이지에서 구성을 선택합니다.

  2. 애플리케이션 코드 위치 섹션에서:

    • Amazon S3 버킷의 경우 애플리케이션 코드용으로 이전에 생성한 버킷을 선택합니다. [Browse] 를 선택하고 올바른 버킷을 선택한 다음 [Choose] 를 선택합니다. 버킷 이름을 선택하지 마세요.

    • Amazon S3 객체 경로에는 managed-flink-pyflink-getting-started-1.0.0.zip를 입력합니다.

  3. 액세스 권한에서 필수 정책이 kinesis-analytics-MyApplication-us-east-1 포함된 IAM 역할 생성/업데이트를 선택합니다.

  4. 런타임 속성으로 이동하여 다른 모든 설정의 기본값을 유지합니다.

  5. 새 항목 추가를 선택하고 다음 매개변수를 각각 추가합니다.

    그룹 ID
    InputStream0 stream.name ExampleInputStream
    InputStream0 flink.stream.initpos LATEST
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
    kinesis.analytics.flink.run.options python main.py
    kinesis.analytics.flink.run.options jarfile lib/pyflink-dependencies.jar
  6. 다른 섹션은 수정하지 말고 변경 내용 저장을 선택합니다.

참고

Amazon CloudWatch 로깅을 활성화하도록 선택하면 Apache Flink용 관리형 서비스에서 로그 그룹과 로그 스트림을 자동으로 생성합니다. 이러한 리소스의 이름은 다음과 같습니다.

  • 로그 그룹: /aws/kinesis-analytics/MyApplication

  • 로그 스트림: kinesis-analytics-log-stream

애플리케이션을 실행합니다

이제 애플리케이션이 구성되어 실행 준비가 되었습니다.

애플리케이션을 실행하려면
  1. Apache Flink용 Amazon 매니지드 서비스 콘솔에서 [내 애플리케이션] 을 선택하고 [실행] 을 선택합니다.

  2. 다음 페이지인 애플리케이션 복원 구성 페이지에서 최신 스냅샷으로 실행을 선택한 다음 실행을 선택합니다.

    애플리케이션 세부 정보의 상태가 애플리케이션 시작 시점 사이를 Ready Starting Running 오갔다가 다시 시작한 시점으로 바뀝니다.

애플리케이션이 Running 상태에 있으면 이제 Flink 대시보드를 열 수 있습니다.

대시보드 열기
  1. [Apache Flink 대시보드 열기] 를 선택합니다. 대시보드가 새 페이지에서 열립니다.

  2. 실행 중인 작업 목록에서 볼 수 있는 단일 작업을 선택합니다.

    참고

    런타임 속성을 설정하거나 IAM 정책을 잘못 편집하면 애플리케이션 상태가 로 Running 바뀔 수 있지만 Flink 대시보드에는 작업이 계속 다시 시작되고 있다고 표시됩니다. 이는 애플리케이션이 잘못 구성되었거나 외부 리소스에 액세스할 수 있는 권한이 없는 경우 흔히 발생하는 오류 시나리오입니다.

    이 경우 Flink 대시보드의 예외 탭에서 문제의 원인을 확인하세요.

실행 중인 애플리케이션의 메트릭을 살펴보세요.

MyApplication페이지의 Amazon CloudWatch 지표 섹션에서 실행 중인 애플리케이션의 몇 가지 기본 지표를 볼 수 있습니다.

지표를 보려면
  1. 새로 고침 버튼 옆의 드롭다운 목록에서 10초를 선택합니다.

  2. 애플리케이션이 실행 중이고 정상이면 가동 시간 지표가 계속 증가하는 것을 볼 수 있습니다.

  3. 전체 재시작 지표는 0이어야 합니다. 증가하면 구성에 문제가 있을 수 있습니다. 문제를 조사하려면 Flink 대시보드의 예외 탭을 검토하세요.

  4. 정상 애플리케이션에서는 실패한 체크포인트 수 지표가 0이어야 합니다.

    참고

    이 대시보드에는 5분 단위의 고정된 지표 집합이 표시됩니다. 대시보드의 모든 메트릭을 사용하여 사용자 지정 애플리케이션 대시보드를 만들 수 있습니다. CloudWatch

Kinesis 스트림의 출력 데이터 관찰

Python 스크립트 또는 Kinesis 데이터 생성기를 사용하여 여전히 데이터를 입력에 게시하고 있는지 확인하십시오.

이전에 이미 수행한 것과 마찬가지로 이제 의 데이터 뷰어를 사용하여 Apache Flink용 관리 서비스에서 실행되는 애플리케이션의 출력을 관찰할 수 있습니다. https://console.aws.amazon.com/kinesis/

출력을 보려면
  1. /kinesis에서 Kinesis 콘솔을 엽니다. https://console.aws.amazon.com

  2. 지역이 이 자습서를 실행하는 데 사용하는 지역과 동일한지 확인하십시오. 기본적으로 이 지역은 미국 동부-1미국 동부 (버지니아 북부) 입니다. 필요한 경우 지역을 변경하십시오.

  3. 데이터 스트림을 선택합니다.

  4. 관찰하려는 스트림을 선택합니다. 본 자습서에서는 ExampleOutputStream를 사용합니다.

  5. 데이터 뷰어 탭을 선택합니다.

  6. 샤드를 선택하고 최신 버전을 시작 위치로 유지한 다음 레코드 가져오기를 선택합니다. “이 요청에 대한 레코드를 찾을 수 없음” 오류가 표시될 수 있습니다. 그렇다면 레코드 가져오기 재시도를 선택하십시오. 스트림에 게시된 최신 레코드가 표시됩니다.

  7. 데이터 열에서 값을 선택하여 레코드 콘텐츠를 JSON 형식별로 검사합니다.

애플리케이션을 중지합니다.

애플리케이션을 중지하려면 이라는 이름의 Apache Flink용 관리 서비스 애플리케이션의 콘솔 페이지로 이동합니다. MyApplication

애플리케이션을 중지하려면
  1. 작업 드롭다운 목록에서 중지를 선택합니다.

  2. 애플리케이션 세부 정보의 상태가 에서 RunningStopping 전환된 다음 애플리케이션이 완전히 Ready 중지된 시점으로 전환됩니다.

    참고

    Python 스크립트 또는 Kinesis 데이터 생성기에서 입력 스트림으로 데이터를 보내는 것도 중단하는 것을 잊지 마십시오.

다음 단계

리소스를 정리하세요. AWS