Java에서 KCL을 사용하여 소비자 개발 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Java에서 KCL을 사용하여 소비자 개발

사전 조건

KCL 3.x 사용을 시작하기 전에 다음이 있는지 확인합니다.

  • Java Development Kit(JDK) 8 이상

  • AWS SDK for Java 2.x

  • 종속성 관리를 위한 Maven 또는 Gradle

KCL은 작업자가 실행 중인 컴퓨팅 호스트에서 CPU 사용률과 같은 CPU 사용률 지표를 수집하여 부하의 균형을 맞춰 작업자 간에 리소스 사용률 수준을 균등하게 달성합니다. KCL이 작업자로부터 CPU 사용률 지표를 수집할 수 있도록 하려면 다음 사전 조건을 충족해야 합니다.

Amazon Elastic Compute Cloud(Amazon EC2)

  • 운영 체제는 Linux OS여야 합니다.

  • EC2IMDSv22를 활성화해야 합니다.

Amazon EC2의 Amazon Elastic Container Service(Amazon ECS)

의 Amazon ECS AWS Fargate

Amazon EC2의 Amazon Elastic Kubernetes Service(Amazon EKS)

  • 운영 체제는 Linux OS여야 합니다.

의 Amazon EKS AWS Fargate

  • Fargate 플랫폼 1.3.0 이상.

중요

KCL이 작업자로부터 CPU 사용률 지표를 수집할 수 없는 경우 KCL은 다시 돌아가 작업자당 처리량을 사용하여 리스를 할당하고 플릿의 작업자 간에 로드의 균형을 맞춥니다. 자세한 내용은 KCL이 작업자에게 임대를 할당하고 로드의 균형을 조정하는 방법 단원을 참조하십시오.

종속성 설치 및 추가

Maven을 사용하는 경우 pom.xml 파일에 다음 종속성을 추가합니다. 3.x.x를 최신 KCL 버전으로 교체했는지 확인합니다.

<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>

Gradle을 사용하는 경우 build.gradle 파일에 다음을 추가합니다. 3.x.x를 최신 KCL 버전으로 교체했는지 확인합니다.

implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'

Maven Central Repository에서 최신 버전의 KCL을 확인할 수 있습니다.

소비자 구현

KCL 소비자 애플리케이션은 다음과 같은 주요 구성 요소로 구성됩니다.

RecordProcessor

RecordProcessor는 Kinesis 데이터 스트림 레코드를 처리하기 위한 비즈니스 로직이 상주하는 핵심 구성 요소입니다. 애플리케이션이 Kinesis 스트림에서 수신하는 데이터를 처리하는 방법을 정의합니다.

주요 책임:

  • 샤드 처리 초기화

  • Kinesis 스트림의 레코드 배치 처리

  • 샤드에 대한 종료 처리(예: 샤드가 분할 또는 병합되거나 리스가 다른 호스트로 인계되는 경우)

  • 체크포인트를 처리하여 진행 상황 추적

다음은 구현 예제입니다.

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.*; import software.amazon.kinesis.processor.ShardRecordProcessor; public class SampleRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); private String shardId; @Override public void initialize(InitializationInput initializationInput) { shardId = initializationInput.shardId(); MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()) ); // Checkpoint periodically processRecordsInput.checkpointer().checkpoint(); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting.", t); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Lost lease, so terminating."); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shardEnded(ShardEndedInput shardEndedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } }

다음은 예제에서 사용되는 각 메서드에 대한 자세한 설명입니다.

initialize(InitializationInput initializationInput)

  • 용도: 레코드를 처리하는 데 필요한 리소스 또는 상태를 설정합니다.

  • 호출 시: 한 번, KCL이이 레코드 프로세서에 샤드를 할당할 때.

  • 중요 사항:

    • initializationInput.shardId():이 프로세서가 처리할 샤드의 ID입니다.

    • initializationInput.extendedSequenceNumber(): 처리를 시작할 시퀀스 번호입니다.

processRecords(ProcessRecordsInput processRecordsInput)

  • 용도: 수신 레코드를 처리하고 선택적으로 진행 상황을 확인합니다.

  • 호출 시: 레코드 프로세서가 샤드에 대한 임대를 보유하고 있는 한 반복됩니다.

  • 중요 사항:

    • processRecordsInput.records(): 처리할 레코드 목록입니다.

    • processRecordsInput.checkpointer(): 진행 상황을 체크포인트하는 데 사용됩니다.

    • KCL이 실패하지 않도록 처리 중에 예외를 처리했는지 확인합니다.

    • 예상치 못한 작업자 충돌 또는 재시작 전에 체크포인트되지 않은 데이터와 같은 일부 시나리오에서는 동일한 레코드가 두 번 이상 처리될 수 있으므로이 방법은 idempotent여야 합니다.

    • 데이터 일관성을 보장하기 위해 체크포인트를 적용하기 전에 항상 버퍼링된 데이터를 플러시합니다.

leaseLost(LeaseLostInput leaseLostInput)

  • 용도:이 샤드 처리와 관련된 모든 리소스를 정리합니다.

  • 호출 시: 다른 스케줄러가이 샤드에 대한 리스를 인수하는 경우.

  • 중요 사항:

    • 이 메서드에서는 체크포인트 지정이 허용되지 않습니다.

shardEnded(ShardEndedInput shardEndedInput)

  • 용도:이 샤드 및 체크포인트에 대한 처리를 완료합니다.

  • 호출 시: 샤드가 분할되거나 병합될 때이 샤드에 대한 모든 데이터가 처리되었음을 나타냅니다.

  • 중요 사항:

    • shardEndedInput.checkpointer(): 최종 체크포인트를 수행하는 데 사용됩니다.

    • 이 방법의 체크포인트는 처리를 완료하는 데 필수입니다.

    • 여기서 데이터 및 체크포인트를 플러시하지 않으면 샤드를 다시 열 때 데이터 손실 또는 중복 처리가 발생할 수 있습니다.

shutdownRequested(ShutdownRequestedInput shutdownRequestedInput)

  • 용도: KCL이 종료될 때 리소스를 확인하고 정리합니다.

  • 호출 시: 예를 들어 애플리케이션이 종료될 때와 같이 KCL이 종료될 때).

  • 중요 사항:

    • shutdownRequestedInput.checkpointer(): 종료 전에 체크포인트를 수행하는 데 사용됩니다.

    • 애플리케이션이 중지되기 전에 진행 상황이 저장되도록 메서드에 체크포인트를 구현했는지 확인합니다.

    • 여기에서 데이터 및 체크포인트를 플러시하지 않으면 애플리케이션이 다시 시작될 때 데이터가 손실되거나 레코드가 재처리될 수 있습니다.

중요

KCL 3.x는 이전 작업자를 종료하기 전에 체크포인트를 지정하여 한 작업자로부터 다른 작업자로 임대를 인계할 때 데이터 재처리를 줄일 수 있도록 합니다. shutdownRequested() 메서드에서 체크포인트 로직을 구현하지 않으면이 이점이 표시되지 않습니다. shutdownRequested() 메서드 내에 체크포인트 로직을 구현했는지 확인합니다.

RecordProcessorFactory

RecordProcessorFactory는 새 RecordProcessor 인스턴스를 생성할 책임이 있습니다. KCL은이 팩토리를 사용하여 애플리케이션이 처리해야 하는 각 샤드에 대해 새 RecordProcessor를 생성합니다.

주요 책임:

  • 온디맨드 방식으로 새 RecordProcessor 인스턴스 생성

  • 각 RecordProcessor가 올바르게 초기화되었는지 확인합니다.

다음은 구현 예제입니다.

import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new SampleRecordProcessor(); } }

이 예제에서는 shardRecordProcessor()가 호출될 때마다 공장에서 새 SampleRecordProcessor를 생성합니다. shardRecordProcessor 이를 확장하여 필요한 초기화 로직을 포함할 수 있습니다.

스케줄러

스케줄러는 KCL 애플리케이션의 모든 활동을 조정하는 상위 수준 구성 요소입니다. 이는 데이터 처리의 전반적인 오케스트레이션을 담당합니다.

주요 책임:

  • RecordProcessors의 수명 주기 관리

  • 샤드에 대한 임대 관리 처리

  • 좌표 체크포인트 지정

  • 애플리케이션의 여러 작업자 간에 샤드 처리 로드의 균형을 맞춥니다.

  • 정상적인 종료 및 애플리케이션 종료 신호 처리

스케줄러는 일반적으로 기본 애플리케이션에서 생성되고 시작됩니다. 스케줄러의 구현 예제는 기본 소비자 애플리케이션에서 확인할 수 있습니다.

기본 소비자 애플리케이션

기본 소비자 애플리케이션은 모든 구성 요소를 하나로 묶습니다. KCL 소비자를 설정하고, 필요한 클라이언트를 생성하고, 스케줄러를 구성하고, 애플리케이션의 수명 주기를 관리할 책임이 있습니다.

주요 책임:

  • AWS 서비스 클라이언트 설정(Kinesis, DynamoDB, CloudWatch)

  • KCL 애플리케이션 구성

  • 스케줄러 생성 및 시작

  • 애플리케이션 종료 처리

다음은 구현 예제입니다.

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; import java.util.UUID; public class SampleConsumer { private final String streamName; private final Region region; private final KinesisAsyncClient kinesisClient; public SampleConsumer(String streamName, Region region) { this.streamName = streamName; this.region = region; this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } public void run() { DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoDbAsyncClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() ); Thread schedulerThread = new Thread(scheduler); schedulerThread.setDaemon(true); schedulerThread.start(); } public static void main(String[] args) { String streamName = "your-stream-name"; // replace with your stream name Region region = Region.US_EAST_1; // replace with your region new SampleConsumer(streamName, region).run(); } }

KCL은 기본적으로 전용 처리량으로 향상된 팬아웃(EFO) 소비자를 생성합니다. 향상된 팬아웃에 대한 자세한 내용은 섹션을 참조하세요전용 처리량으로 향상된 팬아웃 소비자 개발. 소비자가 2명 미만이거나 200ms 미만의 읽기 전파 지연이 필요하지 않은 경우 공유 처리량 소비자를 사용하려면 스케줄러 객체에서 다음 구성을 설정해야 합니다.

configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))

다음 코드는 공유 처리량 소비자를 사용하는 스케줄러 객체를 생성하는 예제입니다.

가져오기:

import software.amazon.kinesis.retrieval.polling.PollingConfig;

코드:

Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) );/