기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
소비자를 KCL 1.x에서 2.x로 KCL 마이그레이션
이 항목에서는 Kinesis 클라이언트 라이브러리 버전 1.x와 2.x 버전 () 간의 차이점을 설명합니다. KCL 또한 소비자를 버전 1.x에서 버전 2.x로 마이그레이션하는 방법도 보여줍니다. KCL 클라이언트 마이그레이션 후 마지막 체크포인트 위치에서 처리 기록을 시작합니다.
버전 2.0에서는 다음과 같은 인터페이스 변경 사항이 KCL 도입되었습니다.
KCL1.x 인터페이스 | KCL2.0 인터페이스 |
---|---|
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor |
software.amazon.kinesis.processor.ShardRecordProcessor |
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory |
software.amazon.kinesis.processor.ShardRecordProcessorFactory |
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware |
software.amazon.kinesis.processor.ShardRecordProcessor 에 포함됨 |
레코드 프로세서 마이그레이션
다음 예제는 KCL 1.x용으로 구현된 레코드 프로세서를 보여줍니다.
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
레코드 프로세서 클래스를 마이그레이션하려면
-
다음과 같이
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
및com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
의 인터페이스를software.amazon.kinesis.processor.ShardRecordProcessor
로 변경합니다.// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
-
import
및initialize
메서드의processRecords
문을 업데이트합니다.// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
-
shutdown
메서드를 새 메서드인leaseLost
,shardEnded
,shutdownRequested
으로 바꿉니다.// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
다음은 업데이트된 버전의 레코드 프로세서 클래스입니다.
package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
레코드 프로세서 팩토리 마이그레이션
레코드 프로세스 팩토리는 리스가 필요할 경우 레코드 프로세서 생성을 담당합니다. 다음은 KCL 1.x 팩토리의 예입니다.
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
레코드 프로세서 팩토리를 마이그레이션하려면
-
구현된 인터페이스를 다음과 같이
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
에서software.amazon.kinesis.processor.ShardRecordProcessorFactory
로 변경합니다.// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
-
createProcessor
에 대한 반환 서명을 변경합니다.// public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
다음은 2.0의 레코드 프로세서 팩토리의 예입니다.
package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }
작업자 마이그레이션
버전 KCL Scheduler
2.0에서는 라는 새 클래스가 클래스를 대체합니다Worker
. 다음은 KCL 1.x 워커의 예제입니다.
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
작업자를 마이그레이션하려면
-
Worker
클래스에 대한import
문을Scheduler
및ConfigsBuilder
클래스에 대한 가져오기 문으로 변경합니다.// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
-
다음 예와 같이
ConfigsBuilder
및Scheduler
를 생성합니다.KinesisClientUtil
을 사용하여KinesisAsyncClient
를 만들고KinesisAsyncClient
에maxConcurrency
를 구성하는 것이 좋습니다.중요
Amazon Kinesis Client는
KinesisAsyncClient
를 구성하여KinesisAsyncClient
의 전체 임대 수에 더해 추가 사용량까지 허용할 수 있을 만큼maxConcurrency
를 충분히 높이지 않을 경우 지연 시간이 크게 증가할 수 있습니다.import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );
Amazon Kinesis 클라이언트 구성
Kinesis Client Library 2.0 릴리스에서는 클라이언트 구성이 단일 구성 클래스(KinesisClientLibConfiguration
)에서 6개 구성 클래스로 이동했습니다. 다음 표에 마이그레이션에 대한 설명이 나와 있습니다.
원래 필드 | 새 구성 클래스 | 설명 |
---|---|---|
applicationName |
ConfigsBuilder |
이 KCL 애플리케이션의 이름입니다. tableName 및 consumerName 의 기본값으로 사용됩니다. |
tableName |
ConfigsBuilder |
Amazon DynamoDB 리스 테이블에 사용된 테이블 이름 재정의를 허용합니다. |
streamName |
ConfigsBuilder |
이 애플리케이션이 레코드를 처리하는 스트림의 이름입니다. |
kinesisEndpoint |
ConfigsBuilder |
이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오. |
dynamoDBEndpoint |
ConfigsBuilder |
이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오. |
initialPositionInStreamExtended |
RetrievalConfig |
애플리케이션이 처음 실행될 때부터 레코드를 가져오기 시작하는 샤드 내 위치입니다. KCL |
kinesisCredentialsProvider |
ConfigsBuilder |
이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오. |
dynamoDBCredentialsProvider |
ConfigsBuilder |
이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오. |
cloudWatchCredentialsProvider |
ConfigsBuilder |
이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오. |
failoverTimeMillis |
LeaseManagementConfig | 리스 소유자가 실패했다고 간주하기 전에 전달해야 하는 시간(밀리초)입니다. |
workerIdentifier |
ConfigsBuilder |
이 애플리케이션 프로세서 인스턴스화를 나타내는 고유 식별자입니다. 고유해야 합니다. |
shardSyncIntervalMillis |
LeaseManagementConfig | 샤드 sync 호출 사이의 시간. |
maxRecords |
PollingConfig |
Kinesis가 반환하는 최대 레코드 수를 설정할 수 있습니다. |
idleTimeBetweenReadsInMillis |
CoordinatorConfig |
이 옵션은 제거되었습니다. 유휴 시간 제거를 참조하십시오 |
callProcessRecordsEvenForEmptyRecordList |
ProcessorConfig |
설정하면, Kinesis에서 제공된 레코드가 없는 경우에도 레코드 프로세서가 직접적으로 호출됩니다. |
parentShardPollIntervalMillis |
CoordinatorConfig |
상위 샤드가 완료되었는지를 확인하기 위해 레코드 프로세서가 폴링을 수행해야 하는 간격입니다. |
cleanupLeasesUponShardCompletion |
LeaseManagementConfig |
설정하면, 하위 리스에서 처리를 시작하자마자 리스가 제거됩니다. |
ignoreUnexpectedChildShards |
LeaseManagementConfig |
설정하면, 진행 중인 샤드가 있는 하위 샤드가 무시됩니다. 이는 주로 DynamoDB Streams용입니다. |
kinesisClientConfig |
ConfigsBuilder |
이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오. |
dynamoDBClientConfig |
ConfigsBuilder |
이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오. |
cloudWatchClientConfig |
ConfigsBuilder |
이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오. |
taskBackoffTimeMillis |
LifecycleConfig |
실패한 작업을 재시도하기 위한 대기 시간. |
metricsBufferTimeMillis |
MetricsConfig |
메트릭 게시를 제어합니다 CloudWatch . |
metricsMaxQueueSize |
MetricsConfig |
CloudWatch 메트릭 게시를 제어합니다. |
metricsLevel |
MetricsConfig |
CloudWatch 메트릭 게시를 제어합니다. |
metricsEnabledDimensions |
MetricsConfig |
CloudWatch 메트릭 게시를 제어합니다. |
validateSequenceNumberBeforeCheckpointing |
CheckpointConfig |
이 옵션은 제거되었습니다. 체크포인트 시퀀스 번호 검증을 참조하십시오. |
regionName |
ConfigsBuilder |
이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오. |
maxLeasesForWorker |
LeaseManagementConfig |
애플리케이션의 한 인스턴스가 허용해야 하는 최대 리스 수입니다. |
maxLeasesToStealAtOneTime |
LeaseManagementConfig |
애플리케이션이 한 번에 스틸을 시도해야 하는 최대 리스 수입니다. |
initialLeaseTableReadCapacity |
LeaseManagementConfig |
Kinesis 클라이언트 라이브러리가 새 DynamoDB 임대 테이블을 생성해야 하는 경우 사용되는 DynamoDB IOPs 읽기입니다. |
initialLeaseTableWriteCapacity |
LeaseManagementConfig |
Kinesis 클라이언트 라이브러리가 새 DynamoDB 임대 테이블을 생성해야 하는 경우 사용되는 DynamoDB IOPs 읽기입니다. |
initialPositionInStreamExtended |
LeaseManagementConfig | 애플리케이션이 읽기를 시작해야 하는 스트림 내 초기 위치입니다. 최초 리스 생성 시에만 사용됩니다. |
skipShardSyncAtWorkerInitializationIfLeasesExist |
CoordinatorConfig |
리스 테이블에 기존 리스가 있는 경우 샤드 데이터 동기화를 비활성화합니다. TODO KinesisEco: -438 |
shardPrioritization |
CoordinatorConfig |
사용할 샤드 우선 순위. |
shutdownGraceMillis |
N/A | 이 옵션은 제거되었습니다. 제거를 참조하십시오 MultiLang . |
timeoutInSeconds |
N/A | 이 옵션은 제거되었습니다. MultiLang 삭제를 참조하십시오. |
retryGetRecordsInSeconds |
PollingConfig |
실패 GetRecords 시도 사이의 지연 시간을 구성합니다. |
maxGetRecordsThreadPool |
PollingConfig |
사용된 스레드 풀 크기. GetRecords |
maxLeaseRenewalThreads |
LeaseManagementConfig |
리스 갱신 스레드 풀의 크기를 제어합니다. 애플리케이션에서 사용할 수 있는 리스가 많을수록 이 풀의 크기가 커야 합니다. |
recordsFetcherFactory |
PollingConfig |
스트림에서 검색하는 페처를 생성하는 데 사용되는 팩토리를 교체할 수 있습니다. |
logWarningForTaskAfterMillis |
LifecycleConfig |
작업이 완료되지 않은 경우 얼마나 대기한 후 경고가 기록될지를 지정합니다. |
listShardsBackoffTimeInMillis |
RetrievalConfig |
실패 발생 시 ListShards 호출 간에 대기하는 시간(밀리초)입니다. |
maxListShardsRetryAttempts |
RetrievalConfig |
포기하기 전에 ListShards 가 재시도하는 최대 횟수입니다. |
유휴 시간 제거
1.x 버전에서는 다음과 KCL 같은 두 가지 idleTimeBetweenReadsInMillis
수량에 해당합니다.
-
작업 디스패칭 확인 간의 시간. 작업 간의 이 시간은 이제
CoordinatorConfig#shardConsumerDispatchPollIntervalMillis
를 설정하여 구성할 수 있습니다. -
Kinesis Data Streams에서 반환하는 레코드가 없는 경우 절전 모드로 들어가는 시간. 버전 2.0에서는 향상된 팬아웃 레코드가 해당 검색자로부터 푸시됩니다. 샤드 소비자에 대한 작업은 푸시된 요청이 도착한 경우에만 발생합니다.
클라이언트 구성 제거
버전 2.0에서는 더 KCL 이상 클라이언트를 생성하지 않습니다. 이제 사용자가 유효한 클라이언트를 제공해야 합니다. 따라서 클라이언트 생성을 제어하던 모든 구성 파라미터는 삭제되었습니다. 이러한 파라미터가 필요한 경우 ConfigsBuilder
에 클라이언트를 제공하기 전에 클라이언트에서 설정할 수 있습니다.
제거된 필드 | 동일 구성 |
---|---|
kinesisEndpoint |
기본 엔드포인트로 구성:KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis
endpoint>")).build() . SDK KinesisAsyncClient |
dynamoDBEndpoint |
선호 엔드포인트로 구성:DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb
endpoint>")).build() . SDK DynamoDbAsyncClient |
kinesisClientConfig |
필요한 SDK KinesisAsyncClient 구성으로 구성:KinesisAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
dynamoDBClientConfig |
필요한 SDK DynamoDbAsyncClient 구성으로 구성:DynamoDbAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
cloudWatchClientConfig |
필요한 SDK CloudWatchAsyncClient 구성으로 구성:CloudWatchAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
regionName |
원하는 지역으로 SDK 구성하십시오. 이는 모든 SDK 클라이언트에서 동일합니다. 예: KinesisAsyncClient.builder().region(Region.US_WEST_2).build() . |