Managed Service for Apache Flink 애플리케이션의 모범 사례 유지 관리 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink는 이전에 Amazon Kinesis Data Analytics for Apache Flink로 알려졌습니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Managed Service for Apache Flink 애플리케이션의 모범 사례 유지 관리

이 섹션에는 안정적이고 성능이 뛰어난 Managed Service for Apache Flink 애플리케이션을 개발하기 위한 정보와 권장 사항이 포함되어 있습니다.

uber 크기 최소화 JAR

Java/Scala application must be packaged in an uber (super/fat) JAR 및 에는 런타임에서 아직 제공하지 않은 모든 추가 필수 종속성이 포함됩니다. 그러나 uber의 크기는 애플리케이션 시작 및 재시작 시간에 JAR 영향을 미치며 JAR가 512MB 제한을 초과할 수 있습니다.

배포 시간을 최적화하려면 uber에 다음 항목이 포함되어JAR서는 됩니다.

  • 다음 예제에 표시된 대로 런타임에서 제공하는 모든 종속성입니다. POM 파일 또는 Gradle 구성compileOnlyprovided 범위가 있어야 합니다.

  • JUnit 또는 Mockito와 같이 테스트에만 사용되는 모든 종속성입니다. POM 파일 또는 Gradle 구성testImplementationtest 범위가 있어야 합니다.

  • 애플리케이션에서 실제로 사용하지 않는 모든 종속성입니다.

  • 애플리케이션에 필요한 모든 정적 데이터 또는 메타데이터입니다. 정적 데이터는 데이터 스토어 또는 Amazon S3와 같은 런타임에 애플리케이션에서 로드해야 합니다.

  • 이전 구성 설정에 대한 자세한 내용은이 POM 예제 파일을 참조하세요.

제공된 종속성

Managed Service for Apache Flink 런타임은 여러 종속성을 제공합니다. 이러한 종속성을 지방에 포함해서는 안 JAR 되며 POM 파일에 provided 범위가 있거나 maven-shade-plugin 구성에서 명시적으로 제외해야 합니다. 지방에 포함된 이러한 종속성JAR은 런타임 시 무시되지만 배포 중에 JAR 추가 오버헤드의 크기가 증가합니다.

런타임 버전 1.18, 1.19 및 1.20에서 런타임이 제공하는 종속성:

  • org.apache.flink:flink-core

  • org.apache.flink:flink-java

  • org.apache.flink:flink-streaming-java

  • org.apache.flink:flink-scala_2.12

  • org.apache.flink:flink-table-runtime

  • org.apache.flink:flink-table-planner-loader

  • org.apache.flink:flink-json

  • org.apache.flink:flink-connector-base

  • org.apache.flink:flink-connector-files

  • org.apache.flink:flink-clients

  • org.apache.flink:flink-runtime-web

  • org.apache.flink:flink-metrics-code

  • org.apache.flink:flink-table-api-java

  • org.apache.flink:flink-table-api-bridge-base

  • org.apache.flink:flink-table-api-java-bridge

  • org.apache.logging.log4j:log4j-slf4j-impl

  • org.apache.logging.log4j:log4j-api

  • org.apache.logging.log4j:log4j-core

  • org.apache.logging.log4j:log4j-1.2-api

또한 Managed Service for Apache Flink에서 애플리케이션 런타임 속성을 가져오는 데 사용되는 com.amazonaws:aws-kinesisanalytics-runtime:1.2.0라이브러리도 제공됩니다.

런타임에서 제공하는 모든 종속성은 다음 권장 사항을 사용하여 uber에 포함하지 않아야 합니다. JAR

  • Maven(pom.xml) 및 SBT (build.sbt)에서 provided 범위를 사용합니다.

  • Gradle(build.gradle)에서 compileOnly 구성을 사용합니다.

실수로 uber에 포함된 제공된 종속성JAR은 Apache Flink의 상위 우선 클래스 로드로 인해 런타임 시 무시됩니다. 자세한 내용은 Apache Flink 설명서parent-first-patterns의 섹션을 참조하세요.

커넥터

런타임에 포함되지 않은 커넥터를 제외한 대부분의 FileSystem 커넥터는 기본 범위()가 있는 POM 파일에 포함되어야 합니다compile.

기타 권장 사항

일반적으로 Managed Service for Apache Flink에 JAR 제공된 Apache Flink uber에는 애플리케이션을 실행하는 데 필요한 최소 코드가 포함되어야 합니다. 소스 클래스, 테스트 데이터 세트 또는 부트스트래핑 상태를 포함하는 종속성을이 jar에 포함하면 안 됩니다. 런타임에 정적 리소스를 가져와야 하는 경우이 문제를 Amazon S3와 같은 리소스로 구분합니다. 이러한 예로는 상태 부트스트랩 또는 추론 모델이 있습니다.

시간을 내어 심층 종속성 트리를 고려하고 런타임이 아닌 종속성을 제거합니다.

Managed Service for Apache Flink는 512MB jar 크기를 지원하지만 이는 규칙의 예외로 간주되어야 합니다. Apache Flink는 현재 기본 구성을 통해 최대 104MB의 jar 크기를 지원하며, 이는 필요한 jar의 최대 대상 크기여야 합니다.

내결함성: 체크포인트 및 세이브포인트

체크포인트와 저장점을 사용하여 Managed Service for Apache Flink 애플리케이션에서 내결함성을 구현합니다. 애플리케이션을 개발하고 유지 관리할 때는 다음 사항에 유의하세요.

  • 애플리케이션에 대해 체크포인트를 활성화된 상태로 유지하는 것이 좋습니다. 체크포인트는 예정된 유지 관리 동안뿐만 아니라 서비스 문제, 애플리케이션 종속성 장애 및 기타 문제로 인해 예상치 못한 장애가 발생하는 경우에도 애플리케이션에 내결함성을 제공합니다. 유지 관리에 대한 자세한 내용은 Managed Service for Apache Flink의 유지 관리 작업 관리 섹션을 참조하세요.

  • 애플리케이션 개발 또는 문제 해결 false 중에 ApplicationSnapshotConfiguration::SnapshotsEnabled를 로 설정합니다. 애플리케이션이 중지될 때마다 스냅샷이 생성되므로 애플리케이션이 비정상 상태이거나 성능이 좋지 않을 경우 문제가 발생할 수 있습니다. 애플리케이션이 프로덕션 단계에 들어가고 안정된 이후에 SnapshotsEnabledtrue으로 설정합니다.

    참고

    애플리케이션이 올바른 상태 데이터로 제대로 다시 시작하려면 하루에 여러 번 스냅샷을 생성하는 것이 좋습니다. 스냅샷의 올바른 주기는 애플리케이션의 비즈니스 로직에 따라 다릅니다. 스냅샷을 자주 생성하면 최신 데이터를 복구할 수 있지만 비용이 증가하고 더 많은 시스템 리소스가 필요합니다.

    애플리케이션 다운타임 모니터링에 대한 자세한 내용은 섹션을 참조하세요.

내결함성에 대한 자세한 내용은 내결함성 구현 섹션을 확인하세요.

지원되지 않는 커넥터 버전

Apache Flink 버전 1.15 이상에서 Managed Service for Apache Flink는 애플리케이션에 번들로 제공되는 지원되지 않는 Kinesis 커넥터 버전을 사용하는 경우 애플리케이션이 자동으로 시작되거나 업데이트되지 않도록 합니다JARs. Managed Service for Apache Flink 버전 1.15 이상으로 업그레이드할 때 최신 Kinesis 커넥터를 사용하고 있는지 확인합니다. 이 버전은 버전 1.15.2와 같거나 더 최신 버전입니다. 다른 모든 버전은 Managed Service for Apache Flink에서 지원되지 않는데, 이는 Savepoint로 중지 기능에 일관성 문제나 장애가 발생하여 클린 중지/업데이트 작업을 방해할 수 있기 때문입니다. Amazon Managed Service for Apache Flink 버전의 커넥터 호환성에 대한 자세한 내용은 Apache Flink 커넥터를 참조하세요.

성능 및 병렬 처리

애플리케이션 병렬 처리를 조정하고 성능 저하를 방지함으로써 애플리케이션을 모든 처리량 수준에 맞게 확장할 수 있습니다. 애플리케이션을 개발하고 유지 관리할 때는 다음 사항에 유의하세요.

  • 모든 애플리케이션 소스 및 싱크가 충분히 프로비저닝되고 병목 현상이 발생하지 않는지 확인하세요. 소스와 싱크가 다른 AWS 서비스인 경우를 사용하여 해당 서비스를 모니터링합니다CloudWatch.

  • 병렬 처리가 매우 높은 애플리케이션의 경우 애플리케이션의 모든 연산자에게 높은 수준의 병렬 처리가 적용되는지 확인하세요. 기본적으로 Apache Flink는 애플리케이션 그래프의 모든 연산자에 대해 동일한 애플리케이션 병렬 처리를 적용합니다. 이로 인해 소스 또는 싱크의 프로비저닝 문제가 발생하거나 연산자 데이터 처리에 병목 현상이 발생할 수 있습니다. 를 사용하여 코드에서 각 연산자의 병렬 처리를 변경할 수 있습니다setParallelism.

  • 애플리케이션의 연산자에 대한 병렬 처리 설정의 의미를 이해하세요. 연산자의 병렬 처리를 변경하면 연산자의 병렬 처리가 현재 설정과 호환되지 않을 때 생성된 스냅샷에서 애플리케이션을 복원하지 못할 수 있습니다. 연산자 병렬 처리 설정에 대한 자세한 내용은 연산자의 최대 병렬 처리를 명시적으로 설정하기를 참조하세요.

조정 구현에 대한 자세한 내용은 애플리케이션 규모 조정 구현 섹션을 참조하세요.

연산자별 병렬 처리 설정

기본적으로 모든 연산자는 애플리케이션 수준으로 설정된 병렬 처리를 갖습니다. 를 사용하여 단일 연산자의 병렬 처리를 재정의할 DataStream API 수 있습니다.setParallelism(x). 연산자 병렬 처리를 애플리케이션 병렬 처리 수와 같거나 낮은 모든 병렬 처리로 설정할 수 있습니다.

가능하면 연산자 병렬 처리를 애플리케이션 병렬 처리의 함수로 정의하세요. 이렇게 하면 애플리케이션 병렬 처리에 따라 연산자 병렬 처리가 달라집니다. 예를 들어 자동 크기 조정을 사용하는 경우 모든 연산자의 병렬 처리가 같은 비율로 달라집니다.

int appParallelism = env.getParallelism(); ... ...ops.setParalleism(appParallelism/2);

경우에 따라 연산자 병렬 처리를 상수로 설정해야 할 수 있습니다. 예를 들어 Kinesis Stream 소스의 병렬 처리를 샤드 수로 설정합니다. 이러한 경우, 예를 들어 소스 스트림을 리샤딩해야 하는 경우, 코드를 변경하지 않고 변경하려면 연산자 병렬 처리를 애플리케이션 구성 파라미터로 전달하는 것을 고려해야 합니다.

로깅

CloudWatch 로그를 사용하여 애플리케이션의 성능 및 오류 조건을 모니터링할 수 있습니다. 애플리케이션에 대한 로깅을 구성할 때는 다음 사항에 유의하세요.

  • 런타임 문제를 디버깅할 수 있도록 애플리케이션에 대한 CloudWatch 로깅을 활성화합니다.

  • 애플리케이션에서 처리 중인 모든 레코드에 대해 로그 항목을 생성하지 마세요. 이로 인해 처리 중에 심각한 병목 현상이 발생하고 데이터 처리 시 배압이 발생할 수 있습니다.

  • 애플리케이션이 제대로 실행되지 않을 때 알려주는 CloudWatch 경보를 생성합니다. 자세한 내용은 단원을 참조하세요.

로깅 구현에 대한 자세한 내용은 섹션을 참조하세요.

코딩

권장 프로그래밍 관행을 사용하여 애플리케이션의 성능과 안정성을 높일 수 있습니다. 애플리케이션 코드를 작성할 때 다음 사항을 유의하세요.

  • 애플리케이션 코드, 애플리케이션의 main 메서드 또는 사용자 정의 함수에는 system.exit()를 사용하지 마세요. 코드 내에서 애플리케이션을 종료하려면 Exception 또는 RuntimeException에서 파생된 예외를 발생시키세요. 이 예외에는 애플리케이션에 어떤 문제가 발생했는지에 대한 메시지가 포함되어 있습니다.

    서비스에서 이 예외를 처리하는 방법에 대한 다음 내용을 참고하세요.

    • 애플리케이션의 main 메서드에서 예외가 발생하는 경우 애플리케이션이 RUNNING 상태로 전환될 때 서비스가 이를 ProgramInvocationException으로 래핑하고 작업 관리자가 작업을 제출하지 못합니다.

    • 사용자 정의 함수에서 예외가 발생한 경우 작업 관리자는 작업을 실패하고 작업을 다시 시작하며 예외 세부 정보가 예외 로그에 기록됩니다.

  • 애플리케이션 JAR 파일과 포함된 종속성을 셰이딩하는 것이 좋습니다. 애플리케이션과 Apache Flink 런타임 간에 패키지 이름이 충돌할 가능성이 있는 경우에는 셰이딩을 사용하는 것이 좋습니다. 충돌이 발생하는 경우 애플리케이션 로그에 java.util.concurrent.ExecutionException 유형 예외가 포함될 수 있습니다. 애플리케이션 JAR 파일 셰이딩에 대한 자세한 내용은 Apache Maven Shade 플러그인을 참조하세요.

보안 인증 관리

장기 보안 인증을 프로덕션(또는 기타) 애플리케이션에 적용해서는 안 됩니다. 장기 보안 인증은 버전 관리 시스템에 체크인되어 쉽게 분실될 수 있습니다. 대신 Managed Service for Apache Flink 애플리케이션에 역할을 연결하고 해당 역할에 권한을 부여할 수 있습니다. 그러면 실행 중인 Flink 애플리케이션이 환경에서 해당 권한이 있는 임시 보안 인증을 가져올 수 있습니다. 인증에 사용자 이름과 암호가 필요한 데이터베이스IAM와 같이와 기본적으로 통합되지 않은 서비스에 인증이 필요한 경우 AWS Secrets Manager에 보안 암호를 저장하는 것을 고려해야 합니다.

많은 AWS 네이티브 서비스가 인증을 지원합니다.

샤드/파티션이 거의 없는 소스에서 읽기

Apache Kafka 또는 Kinesis Data Stream에서 읽을 때 스트림의 병렬처리(예: Kafka의 파티션 수와 Kinesis의 샤드 수)와 애플리케이션의 병렬 처리 간에 불일치가 있을 수 있습니다. 네이티브 설계에서는 애플리케이션의 병렬 처리를 스트림의 병렬 처리 이상으로 확장할 수 없습니다. 소스 연산자의 각 하위 작업은 1개 이상의 샤드/파티션에서만 읽을 수 있습니다. 즉, 샤드가 2개뿐인 스트림과 병렬 처리 수가 8인 애플리케이션의 경우 스트림에서 실제로 소비되는 하위 작업은 두 개뿐이고 하위 작업 6개는 유휴 상태로 유지됩니다. 이로 인해 애플리케이션의 처리량이 크게 제한될 수 있습니다. 특히 역직렬화를 소스에서 수행하는 경우(기본값) 더욱 그렇습니다.

이러한 영향을 줄이려면 스트림을 확장하거나 둘 중 하나를 선택할 수 있습니다. 하지만 이것이 항상 바람직하거나 가능한 것은 아닙니다. 또는 소스를 재구성하여 직렬화를 수행하지 않고 byte[]를 그냥 전달하도록 할 수도 있습니다. 그런 다음 데이터를 리밸런싱하여 모든 작업에 균등하게 분배한 다음 데이터를 역직렬화할 수 있습니다. 이렇게 하면 역직렬화에 모든 하위 작업을 활용할 수 있으며 비용이 많이 들 수 있는 이 작업은 더 이상 스트림의 샤드/파티션 수에 얽매이지 않아도 됩니다.

스튜디오 노트북 새로 고침 간격

단락 결과 새로 고침 간격을 변경하는 경우 최소 1000밀리초 이상의 값으로 설정합니다.

스튜디오 노트북 최적 성능

다음 문으로 테스트한 결과, events-per-secondnumber-of-keys를 곱한 값이 25,000,000 미만일 때 가장 좋은 성능을 보였습니다. 이는 15만 미만 events-per-second에 대한 것이었습니다.

SELECT key, sum(value) FROM key-values GROUP BY key

워터마크 전략과 유휴 샤드가 타임윈도우에 미치는 영향

Apache Kafka 및 Kinesis Data Streams에서 이벤트를 읽을 때 소스는 스트림의 속성을 기반으로 이벤트 시간을 설정할 수 있습니다. Kinesis의 경우 이벤트 시간은 대략적인 이벤트 도착 시간과 같습니다. 하지만 이벤트 소스에서 이벤트 시간을 설정하는 것만으로는 플링크 애플리케이션이 이벤트 시간을 사용하기에 충분하지 않습니다. 또한 소스는 소스에서 다른 모든 연산자에게 이벤트 시간에 대한 정보를 전파하는 워터마크를 생성해야 합니다. Flink 설명서에는 해당 프로세스의 작동 방식에 대한 좋은 개요가 나와 있습니다.

기본적으로 Kinesis에서 읽은 이벤트의 타임스탬프는 Kinesis에서 결정한 대략적인 도착 시간으로 설정됩니다. 애플리케이션에서 이벤트 시간이 제대로 작동하기 위한 추가 전제 조건은 워터마크 전략입니다.

WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(...));

그런 다음 assignTimestampsAndWatermarks 메서드를 사용하여 워터마크 전략을 DataStream에 적용합니다. 다음과 같은 몇 가지 유용한 기본 전략이 있습니다.

  • forMonotonousTimestamps()는 이벤트 시간(대략적인 도착 시간)만 사용하고 주기적으로 최대값을 워터마크로 표시합니다(각 특정 하위 작업에 대해).

  • forBoundedOutOfOrderness(Duration.ofSeconds(...))은 이전 전략과 비슷하지만 이벤트 시간인 워터마크 생성 기간을 사용합니다.

이 방법은 작동하지만 몇 가지 주의해야 할 사항이 있습니다. 워터마크는 하위 작업 수준에서 생성되며 오퍼레이터 그래프를 통해 흐릅니다.

Flink 설명서에서 발췌:

소스 함수의 각 병렬 하위 작업은 일반적으로 워터마크를 독립적으로 생성합니다. 이러한 워터마크는 특정 병렬 소스의 이벤트 시간을 정의합니다.

워터마크가 스트리밍 프로그램을 통해 흐르면서 워터마크가 도착하는 연산자에게도 이벤트 시간이 앞당겨집니다. 연산자가 이벤트 시간을 앞당길 때마다 후속 연산자를 위해 다운스트림에 새 워터마크가 생성됩니다.

일부 연산자는 여러 입력 스트림을 사용합니다. 예를 들어 조합 또는 keyBy(...) 또는 파티션(...) 함수를 따르는 연산자입니다. 이러한 연산자의 현재 이벤트 시간은 해당 입력 스트림의 이벤트 시간 중 최소값입니다. 입력 스트림이 이벤트 시간을 업데이트하면 연산자도 업데이트됩니다.

즉, 소스 하위 작업이 유휴 샤드에서 소비되는 경우, 다운스트림 연산자는 해당 하위 작업에서 새 워터마크를 받지 못하므로 시간 창을 사용하는 모든 다운스트림 연산자의 처리가 중단됩니다. 이를 방지하기 위해 고객은 워터마크 전략에 withIdleness 옵션을 추가할 수 있습니다. 이 옵션을 사용하면 연산자의 이벤트 시간을 계산할 때 유휴 업스팀 하위 작업에서 워터마크가 제외됩니다. 따라서 유휴 하위 작업으로 인해 다운스트림 연산자의 이벤트 시간 단축이 더 이상 방해되지 않습니다.

하지만 워터마크 전략이 내장된 유휴 옵션을 사용하면 이벤트를 읽는 하위 작업이 없는 경우(예: 스트림에 이벤트가 없는 경우) 이벤트 시간을 앞당길 수 없습니다. 이는 스트림에서 한정된 이벤트 세트를 읽는 테스트 사례에서 특히 두드러집니다. 마지막 이벤트를 읽은 후 이벤트 시간이 늘어나지 않으므로 마지막 이벤트가 포함된 마지막 창은 절대 닫히지 않습니다.

요약

  • 샤드가 유휴 상태인 경우 이 withIdleness 설정은 새 워터마크를 생성하지 않으며, 유휴 하위 작업이 보낸 마지막 워터마크는 다운스트림 연산자의 최소 워터마크 계산에서 제외됨

  • 내장된 워터마크 전략을 사용하면 마지막으로 열린 창은 절대 닫히지 않습니다(워터마크를 앞당기는 새 이벤트가 전송되지 않는 한). 단, 새 창이 생성되고 그 이후에는 열린 상태로 유지됨

  • Kinesis 스트림에서 시간을 설정하더라도 하나의 샤드가 다른 샤드보다 빨리 소비되는 경우(앱 초기화 중 또는 기존의 모든 샤드가 부모/자식 관계를 무시하고 병렬로 소비되는 TRIM_HORIZON을 사용할 때) 지연 이벤트가 계속 발생할 수 있음

  • 워터마크 전략의 withIdleness 설정이 유휴 샤드 (ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS에 대한 키네시스 소스별 설정을 더 이상 지원하지 않는 듯함

예제

다음 애플리케이션은 스트림에서 데이터를 읽고 이벤트 시간에 따라 세션 창을 생성합니다.

Properties consumerConfig = new Properties(); consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"); FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("...", new SimpleStringSchema(), consumerConfig); WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(15)); env.addSource(consumer) .assignTimestampsAndWatermarks(s) .map(new MapFunction<String, Long>() { @Override public Long map(String s) throws Exception { return Long.parseLong(s); } }) .keyBy(l -> 0l) .window(EventTimeSessionWindows.withGap(Time.seconds(10))) .process(new ProcessWindowFunction<Long, Object, Long, TimeWindow>() { @Override public void process(Long aLong, ProcessWindowFunction<Long, Object, Long, TimeWindow>.Context context, Iterable<Long>iterable, Collector<Object> collector) throws Exception { long count = StreamSupport.stream(iterable.spliterator(), false).count(); long timestamp = context.currentWatermark(); System.out.print("XXXXXXXXXXXXXX Window with " + count + " events"); System.out.println("; Watermark: " + timestamp + ", " + Instant.ofEpochMilli(timestamp)); for (Long l : iterable) { System.out.println(l); } } });

다음 예제에서는 8개의 이벤트가 16개의 샤드 스트림에 기록됩니다(처음 2개와 마지막 이벤트는 같은 샤드에 포함됨).

$ aws kinesis put-record --stream-name hp-16 --partition-key 1 --data MQ== $ aws kinesis put-record --stream-name hp-16 --partition-key 2 --data Mg== $ aws kinesis put-record --stream-name hp-16 --partition-key 3 --data Mw== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028721934184977530127978070210" } { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028795678659974022576354623682" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275134360684221592378842022114" } Wed Mar 23 11:19:57 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 4 --data NA== $ aws kinesis put-record --stream-name hp-16 --partition-key 5 --data NQ== $ date { "ShardId": "shardId-000000000010", "SequenceNumber": "49627894338570054070103749783042116732419934393936642210" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275659034489934342334017700066" } Wed Mar 23 11:20:10 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 6 --data Ng== $ date { "ShardId": "shardId-000000000001", "SequenceNumber": "49627894338369347363316974173886988345467035365375213586" } Wed Mar 23 11:20:22 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 7 --data Nw== $ date { "ShardId": "shardId-000000000008", "SequenceNumber": "49627894338525452579706688535878947299195189349725503618" } Wed Mar 23 11:20:34 CET 2022 $ sleep 60 $ aws kinesis put-record --stream-name hp-16 --partition-key 8 --data OA== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811029600823255837371928900796610" } Wed Mar 23 11:21:27 CET 2022

이렇게 입력하면 5개의 세션 창(이벤트 1,2,3, 이벤트 4,5, 이벤트 6, 이벤트 7, 이벤트 8)이 생성될 것입니다. 하지만 프로그램은 처음 4개의 창만 출력합니다.

11:59:21,529 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 5 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 5 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,531 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 4 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 4 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:23,209 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,244 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 event: 6; timestamp: 1648030822428, 2022-03-23T10:20:22.428Z 11:59:23,377 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,405 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,581 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,586 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:24,790 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 4; timestamp: 1648030809282, 2022-03-23T10:20:09.282Z event: 3; timestamp: 1648030797697, 2022-03-23T10:19:57.697Z event: 5; timestamp: 1648030810871, 2022-03-23T10:20:10.871Z 11:59:24,907 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 7; timestamp: 1648030834105, 2022-03-23T10:20:34.105Z event: 1; timestamp: 1648030794441, 2022-03-23T10:19:54.441Z event: 2; timestamp: 1648030796122, 2022-03-23T10:19:56.122Z event: 8; timestamp: 1648030887171, 2022-03-23T10:21:27.171Z XXXXXXXXXXXXXX Window with 3 events; Watermark: 1648030809281, 2022-03-23T10:20:09.281Z 3 1 2 XXXXXXXXXXXXXX Window with 2 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 4 5 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 6 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030887170, 2022-03-23T10:21:27.170Z 7

출력에는 4개의 창만 표시됩니다(이벤트 8이 포함된 마지막 창은 빠짐). 이는 이벤트 시간과 워터마크 전략 때문입니다. 빌드별 워터마크 전략을 사용하면 스트림에서 읽은 마지막 이벤트 시간을 넘어서는 시간이 절대 진행되지 않기 때문에 마지막 창을 닫을 수 없습니다. 하지만 윈도우를 닫으려면 마지막 이벤트 이후 시간이 10초 이상 경과해야 합니다. 이 경우 마지막 워터마크는 2022-03-23T10:21:27.170 Z이지만 세션 창을 닫으려면 10초 및 1ms 후의 워터마크가 필요합니다.

워터마크 전략에서 이 withIdleness 옵션을 제거하면 창 연산자의 “글로벌 워터마크”를 진행할 수 없으므로 세션 창이 닫히지 않습니다.

참고로 Flink 애플리케이션을 시작할 때(또는 데이터 왜곡이 있는 경우) 일부 샤드는 다른 샤드보다 빨리 소모될 수 있습니다. 이로 인해 일부 워터마크가 하위 작업에서 너무 일찍 생성될 수 있습니다. 하위 작업은 구독한 다른 샤드의 워터마크를 소비하지 않고 한 샤드의 콘텐츠를 기반으로 워터마크를 내보낼 수 있습니다. 이를 완화하는 방법은 안전 버퍼 (forBoundedOutOfOrderness(Duration.ofSeconds(30))를 추가하거나 늦게 도착하는 이벤트 (allowedLateness(Time.minutes(5))를 명시적으로 허용하는 다양한 워터마크 전략을 사용하는 것입니다.

모든 연산자에 UUID 대해 설정

Managed Service for Apache Flink가 스냅샷이 있는 애플리케이션에 대한 Flink 작업을 시작하는 경우 특정 문제로 인해 Flink 작업이 시작되지 않을 수 있습니다. 그 중 하나는 연산자 ID 불일치입니다. Flink는 Flink 작업 그래프 연산자에 IDs 대해 명시적이고 일관된 연산자를 기대합니다. 명시적으로 설정하지 않으면 Flink는 연산자의 ID를 자동 생성합니다. 이는 Flink가 이러한 연산자를 사용하여 작업 그래프에서 연산자를 IDs 고유하게 식별하고 이를 사용하여 각 연산자의 상태를 저장점에 저장하기 때문입니다.

연산자 ID 불일치 문제는 Flink가 작업 그래프IDs의 연산자와 저장점에 IDs 정의된 연산자 간에 1:1 매핑을 찾지 못할 때 발생합니다. 이는 명시적인 일관된 연산자가 설정되지 않고 FlinkIDs가 모든 작업 그래프 생성과 일치하지 않을 수 IDs 있는 연산자를 자동 생성할 때 발생합니다. 유지보수 실행 중에는 애플리케이션에서 이 문제가 발생할 가능성이 높습니다. 이를 방지하려면 flink 코드의 모든 연산자에 UUID 대해를 설정하는 고객을 권장합니다. 자세한 내용은 프로덕션 준비 아래의 모든 연산자에 UUID 대해 설정 항목을 참조하세요.

Maven 셰이드 플러그인 ServiceResourceTransformer 에 추가

Flink는 Java의 서비스 공급자 인터페이스(SPI)를 사용하여 커넥터 및 형식과 같은 구성 요소를 로드합니다. 를 사용하는 여러 Flink 종속성으로 인해 uber-jar에서 충돌이 발생하고 예기치 않은 애플리케이션 동작이 SPI 발생할 수 있습니다. https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/#transform-table-connectorformat-resources pom.xml에 정의된 Maven 셰이드 플러그인ServiceResourceTransformer의를 추가하는 것이 좋습니다.

<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <id>shade</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers combine.children="append"> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- ... --> </transformers> </configuration> </execution> </executions> </plugin>