Amazon Managed Service for Apache Flink는 이전에 Amazon Kinesis Data Analytics for Apache Flink로 알려졌습니다.
기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
아파치 빔을 사용하여 애플리케이션 만들기
이 연습에서는 Apache Beam
참고
이 연습에 필수 사전 조건을 설정하려면 먼저 자습서: Managed Service for Apache Flink에서 사용 DataStream API 시작하기 연습을 완료하세요.
이 주제는 다음 섹션을 포함하고 있습니다:
종속 리소스 생성
이 연습을 위해 Managed Service for Apache Flink 애플리케이션을 생성하기 전에 다음과 같은 종속 리소스를 생성해야 합니다.
두 개의 Kinesis Data Streams(
ExampleInputStream
및ExampleOutputStream
)애플리케이션 코드를 저장할 Amazon S3 버킷(
ka-app-code-
)<username>
콘솔을 사용하여 Kinesis 스트림과 Amazon S3 버킷을 만들 수 있습니다. 이러한 리소스를 만드는 방법 설명은 다음 주제를 참조하세요.
Amazon Kinesis Data Streams 개발자 안내서의 데이터 스트림 만들기 및 업데이트. 데이터 스트림
ExampleInputStream
및ExampleOutputStream
에 명칭을 지정합니다.Amazon Simple Storage Service 사용 설명서의 S3 버킷을 생성하려면 어떻게 해야 합니까? 로그인 명칭(예:
ka-app-code-
)을 추가하여 Amazon S3 버킷에 전역적으로 고유한 명칭을 지정합니다.<username>
입력 스트림에 샘플 레코드 쓰기
이 섹션에서는 Python 스크립트를 사용하여 애플리케이션에서 처리할 임의의 문자열을 스트림에 씁니다.
참고
이 섹션에서는 AWS SDK for Python (Boto)
-
다음 콘텐츠를 가진
ping.py
이라는 파일을 생성합니다:import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
-
ping.py
스크립트를 실행합니다.$ python ping.py
자습서의 나머지 부분을 완료하는 동안 스크립트가 계속 돌아가게 둡니다.
애플리케이션 코드 다운로드 및 검토
이 예제의 Java 애플리케이션 코드는 에서 제공됩니다 GitHub. 애플리케이션 코드를 다운로드하려면 다음을 수행하세요.
아직 설치하지 않았다면 Git 클라이언트를 설치합니다. 자세한 정보는 Git 설치
를 참조하세요. 다음 명령을 사용하여 원격 리포지토리를 복제합니다:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
amazon-kinesis-data-analytics-java-examples/Beam
디렉터리로 이동합니다.
애플리케이션 코드는 BasicBeamStreamingJob.java
파일에 있습니다. 애플리케이션 코드에 대해 다음을 유의하십시오:
애플리케이션은 Apache ParDo
Beam을 사용하여 라는 사용자 지정 변환 함수를 호출하여 들어오는 레코드를 처리합니다. PingPongFn
PingPongFn
함수를 호출하는 코드는 다음과 같습니다..apply("Pong transform", ParDo.of(new PingPongFn())
Apache Beam을 사용하는 Managed Service for Apache Flink 애플리케이션에는 다음과 같은 구성 요소가 필요합니다. 이러한 구성 요소와 버전을
pom.xml
에 포함시키지 않으면 애플리케이션이 환경 종속성에서 잘못된 버전을 로드하고 버전이 일치하지 않으므로 애플리케이션이 런타임에 충돌합니다.<jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
PingPongFn
변환 함수는 입력 데이터가 ping이 아닌 경우 입력 데이터를 출력 스트림으로 전달합니다. 이 경우 pong\n 문자열을 출력 스트림으로 내보냅니다.변환 함수의 코드는 다음과 같습니다.
private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }
애플리케이션 코드를 컴파일합니다.
애플리케이션을 컴파일하려면 다음을 수행하세요.
아직 Java 및 Maven을 설치하지 않았으면 설치합니다. 자세한 정보는 자습서: Managed Service for Apache Flink에서 사용 DataStream API 시작하기자습서의 필수 사전 조건 완료 섹션을 참조하세요.
다음 명령을 사용하여 애플리케이션을 컴파일합니다.
mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
참고
제공된 소스 코드는 Java 11의 라이브러리를 사용합니다.
애플리케이션을 컴파일하면 애플리케이션 JAR 파일 () target/basic-beam-app-1.0.jar
이 생성됩니다.
아파치 플링크 스트리밍 자바 코드를 업로드하세요.
이 섹션에서는 종속 리소스 생성 섹션에서 생성한 Amazon S3 버킷에 애플리케이션 코드를 업로드합니다.
-
Amazon S3 콘솔에서 다음을 선택합니다. ka-app-code -
<username>
버킷을 선택하고 업로드를 선택합니다. -
파일 선택 단계에서 파일 추가를 선택합니다. 이전 단계에서 생성한
basic-beam-app-1.0.jar
파일로 이동합니다. 개체 정보에 대한 설정은 변경할 필요가 없으므로 업로드를 선택합니다.
이제 애플리케이션 코드가 애플리케이션에서 액세스할 수 있는 Amazon S3 버킷에 저장됩니다.
Apache Flink용 관리 서비스 애플리케이션을 만들고 실행합니다.
콘솔을 사용하여 애플리케이션을 생성, 구성, 업데이트 및 실행하려면 다음 단계를 수행하세요.
애플리케이션 생성
/flink에서 아파치 플링크용 관리 서비스 콘솔을 엽니다. https://console.aws.amazon.com
-
Managed Service for Apache Flink 대시보드에서 분석 애플리케이션 생성을 선택합니다.
-
Managed Service for Apache Flink - 애플리케이션 생성 페이지에서 다음과 같이 애플리케이션 세부 정보를 제공합니다.
-
애플리케이션 명칭에
MyApplication
을 입력합니다. -
런타임에서 Apache Flink를 선택합니다.
참고
아파치 빔은 현재 아파치 플링크 버전 1.19 이상과 호환되지 않습니다.
버전 풀다운에서 아파치 플링크 버전 1.15를 선택합니다.
-
-
액세스 권한에서 역할 생성/업데이트를 선택합니다. IAM
kinesis-analytics-MyApplication-us-west-2
-
애플리케이션 생성을 선택합니다.
참고
콘솔을 사용하여 Apache Flink용 관리형 서비스 애플리케이션을 만들 때 애플리케이션에 대한 IAM 역할 및 정책을 생성할 수 있습니다. 귀하의 애플리케이션은 이 역할 및 정책을 사용하여 종속 리소스에 액세스합니다. 이러한 IAM 리소스의 이름은 다음과 같이 애플리케이션 이름과 지역을 사용하여 지정됩니다.
-
정책:
kinesis-analytics-service-
MyApplication
-us-west-2
-
역할:
kinesis-analytics-MyApplication-
us-west-2
IAM정책 편집
IAM정책을 편집하여 Kinesis 데이터 스트림에 액세스할 수 있는 권한을 추가합니다.
에서 IAM https://console.aws.amazon.com/iam/
콘솔을 엽니다. -
정책을 선택하세요. 이전 섹션에서 콘솔이 생성한
kinesis-analytics-service-MyApplication-us-west-2
정책을 선택합니다. -
요약 페이지에서 정책 편집을 선택합니다. JSON탭을 선택합니다.
-
다음 정책 예제의 강조 표시된 부분을 정책에 추가하세요. 샘플 계정 교체 IDs (
012345678901
) 를 계정 ID로 사용하십시오.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:
012345678901
:log-group:*", "arn:aws:s3:::ka-app-code-<username>
/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" }
애플리케이션 구성
-
MyApplication페이지에서 구성을 선택합니다.
-
애플리케이션 구성 페이지에서 코드 위치를 입력합니다.
-
Amazon S3 버킷의 경우
ka-app-code-
를 입력합니다.<username>
-
Amazon S3 객체 경로에는
basic-beam-app-1.0.jar
를 입력합니다.
-
-
애플리케이션 리소스 액세스에서 액세스 권한으로 IAM 역할 생성/업데이트를 선택합니다
kinesis-analytics-MyApplication-us-west-2
. -
다음을 입력합니다:
그룹 ID 키 값 BeamApplicationProperties
InputStreamName
ExampleInputStream
BeamApplicationProperties
OutputStreamName
ExampleOutputStream
BeamApplicationProperties
AwsRegion
us-west-2
-
모니터링에서 지표 수준 모니터링이 애플리케이션으로 설정되어 있는지 확인합니다.
-
CloudWatch 로깅하려면 활성화 확인란을 선택합니다.
-
업데이트를 선택합니다.
참고
CloudWatch 로깅을 활성화하도록 선택하면 Apache Flink용 관리형 서비스에서 로그 그룹과 로그 스트림을 자동으로 생성합니다. 이러한 리소스의 이름은 다음과 같습니다.
-
로그 그룹:
/aws/kinesis-analytics/MyApplication
-
로그 스트림:
kinesis-analytics-log-stream
이 로그 스트림은 애플리케이션을 모니터링하는 데 사용됩니다. 이 로그 스트림은 애플리케이션이 결과를 전송하는 데 사용하는 로그 스트림과 다릅니다.
애플리케이션을 실행합니다
애플리케이션을 실행하고 Apache Flink 대시보드를 연 다음 원하는 Flink 작업을 선택하면 Flink 작업 그래프를 볼 수 있습니다.
CloudWatch 콘솔에서 Apache Flink용 관리 서비스 메트릭을 확인하여 애플리케이션이 작동하는지 확인할 수 있습니다.
리소스 정리 AWS
이 섹션에는 텀블링 윈도우 튜토리얼에서 만든 AWS 리소스를 정리하는 절차가 포함되어 있습니다.
이 주제는 다음 섹션을 포함하고 있습니다:
Apache Flink용 관리형 서비스 애플리케이션 삭제
/flink에서 아파치 플링크용 매니지드 서비스 콘솔을 엽니다. https://console.aws.amazon.com
Apache Flink용 관리 서비스 패널에서 선택합니다. MyApplication
애플리케이션 페이지에서 삭제를 선택한 다음 삭제를 확인합니다.
Kinesis 데이터 스트림을 삭제합니다.
/kinesis에서 Kinesis 콘솔을 엽니다. https://console.aws.amazon.com
Kinesis Data Streams 패널에서 을 선택합니다. ExampleInputStream
ExampleInputStream페이지에서 Kinesis 스트림 삭제를 선택한 다음 삭제를 확인합니다.
Kinesis 스트림 페이지에서 를 선택하고 작업을 선택하고 삭제를 선택한 다음 삭제를 확인합니다. ExampleOutputStream
Amazon S3 객체 및 버킷을 삭제합니다.
에서 Amazon S3 콘솔을 엽니다 https://console.aws.amazon.com/s3/
. 다음을 선택합니다. ka-app-code -
<username>
버킷.삭제를 선택한 후 버킷 이름을 입력하여 삭제를 확인합니다.
IAM리소스 삭제
에서 IAM 콘솔을 엽니다 https://console.aws.amazon.com/iam/
. 탐색 바에서 정책을 선택합니다.
필터 컨트롤에서 kinesis를 입력합니다.
kinesis-analytics-service- MyApplication -us-west-2 정책을 선택합니다.
정책 작업을 선택한 후 삭제를 선택합니다.
탐색 모음에서 역할을 선택합니다.
키네시스-애널리틱스- -US-West-2 역할을 선택합니다. MyApplication
역할 삭제를 선택하고 삭제를 확인합니다.
CloudWatch 리소스 삭제하기
에서 CloudWatch 콘솔을 엽니다 https://console.aws.amazon.com/cloudwatch/
. 탐색 바에서 로그를 선택합니다.
MyApplication/aws/kinesis-analytics/ 로그 그룹을 선택합니다.
로그 그룹 삭제를 선택한 다음 삭제를 확인합니다.
다음 단계
이제 Apache Beam을 사용하여 데이터를 변환하는 기본 Managed Service for Apache Flink 애플리케이션을 생성하고 실행했으니, 고급 Managed Service for Apache Flink 솔루션의 예에 대한 자세한 내용을 알아보려면 다음의 애플리케이션을 참조하세요.
Managed Service for Apache Flink 스트리밍 워크숍의 Beam
: 이 워크숍에서는 하나의 균일한 Apache Beam 파이프라인에서 배치와 스트리밍 측면을 결합한 종합 예를 탐색합니다.