

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# KCL 1.x에서 KCL 2.x로 소비자 마이그레이션
<a name="kcl-migration"></a>

**중요**  
Amazon Kinesis Client Library(KCL) 버전 1.x 및 2.x는 오래되었습니다. KCL 1.x는 2026년 1월 30일에 지원이 종료됩니다. 버전 1.x를 사용하는 KCL 애플리케이션은 2026년 1월 30일 이전에 최신 KCL 버전으로 마이그레이션할 것을 **적극 권장**합니다. 최신 KCL 버전을 찾으려면 [GitHub의 Amazon Kinesis Client Library 페이지](https://github.com/awslabs/amazon-kinesis-client)를 참조하세요. 최신 KCL 버전에 대한 자세한 내용은 [Kinesis Client Library 사용](kcl.md) 섹션을 참조하세요. KCL 1.x에서 KCL 3.x로 마이그레이션하는 방법에 대한 자세한 내용은 [KCL 1.x에서 KCL 3.x로 마이그레이션](kcl-migration-1-3.md) 섹션을 참조하세요.

이 주제에서는 Kinesis Client Library(KCL) 버전 1.x와 버전 2.x 간의 차이점을 설명합니다. 또한, 소비자를 KCL 버전 1.x에서 버전 2.x로 마이그레이션하는 방법을 알려 드립니다. 클라이언트 마이그레이션 후 마지막 체크포인트 위치에서 처리 기록을 시작합니다.

KCL 버전 2.0에서는 다음과 같은 인터페이스 변경이 이루어졌습니다.


**KCL 인터페이스 변경 사항**  

| KCL 1.x 인터페이스 | KCL 2.0 인터페이스 | 
| --- | --- | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor | software.amazon.kinesis.processor.ShardRecordProcessor | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory | software.amazon.kinesis.processor.ShardRecordProcessorFactory | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware | software.amazon.kinesis.processor.ShardRecordProcessor에 포함됨 | 

**Topics**
+ [레코드 프로세서 마이그레이션](#recrod-processor-migration)
+ [레코드 프로세서 팩토리 마이그레이션](#recrod-processor-factory-migration)
+ [작업자 마이그레이션](#worker-migration)
+ [Amazon Kinesis 클라이언트 구성](#client-configuration)
+ [유휴 시간 제거](#idle-time-removal)
+ [클라이언트 구성 제거](#client-configuration-removals)

## 레코드 프로세서 마이그레이션
<a name="recrod-processor-migration"></a>

다음은 KCL 1.x에 구현된 레코드 프로세서를 보여주는 예제입니다.

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        //
        // Process records, and possibly checkpoint
        //
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**레코드 프로세서 클래스를 마이그레이션하려면**

1. 다음과 같이 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` 및 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware`의 인터페이스를 `software.amazon.kinesis.processor.ShardRecordProcessor`로 변경합니다.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
   public class TestRecordProcessor implements ShardRecordProcessor {
   ```

1. `import` 및 `initialize` 메서드의 `processRecords` 문을 업데이트합니다.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
   ```

1. `shutdown` 메서드를 새 메서드인 `leaseLost`, `shardEnded`, `shutdownRequested`으로 바꿉니다.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

다음은 업데이트된 버전의 레코드 프로세서 클래스입니다.

```
package com.amazonaws.kcl;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class TestRecordProcessor implements ShardRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

## 레코드 프로세서 팩토리 마이그레이션
<a name="recrod-processor-factory-migration"></a>

레코드 프로세스 팩토리는 리스가 필요할 경우 레코드 프로세서 생성을 담당합니다. 다음은 KCL 1.x 팩토리의 예입니다.

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class TestRecordProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new TestRecordProcessor();
    }
}
```

**레코드 프로세서 팩토리를 마이그레이션하려면**

1. 구현된 인터페이스를 다음과 같이 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory`에서 `software.amazon.kinesis.processor.ShardRecordProcessorFactory`로 변경합니다.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
   public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
   ```

1. `createProcessor`에 대한 반환 서명을 변경합니다.

   ```
   // public IRecordProcessor createProcessor() {
   public ShardRecordProcessor shardRecordProcessor() {
   ```

다음은 2.0의 레코드 프로세서 팩토리의 예입니다.

```
package com.amazonaws.kcl;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new TestRecordProcessor();
    }
}
```

## 작업자 마이그레이션
<a name="worker-migration"></a>

KCL 버전 2.0에서는 `Scheduler`라는 새로운 클래스가 `Worker` 클래스를 대체합니다. 다음은 KCL 1.x 워커의 예입니다.

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

**작업자를 마이그레이션하려면**

1. `Worker` 클래스에 대한 `import` 문을 `Scheduler` 및 `ConfigsBuilder` 클래스에 대한 가져오기 문으로 변경합니다.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. 다음 예와 같이 `ConfigsBuilder` 및 `Scheduler`를 생성합니다.

   `KinesisClientUtil`을 사용하여 `KinesisAsyncClient`를 만들고 `KinesisAsyncClient`에 `maxConcurrency`를 구성하는 것이 좋습니다.
**중요**  
Amazon Kinesis Client는 `KinesisAsyncClient`를 구성하여 `KinesisAsyncClient`의 전체 임대 수에 더해 추가 사용량까지 허용할 수 있을 만큼 `maxConcurrency`를 충분히 높이지 않을 경우 지연 시간이 크게 증가할 수 있습니다.

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
   Region region = Region.AP_NORTHEAST_2;
   KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
   ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
   
   Scheduler scheduler = new Scheduler(
       configsBuilder.checkpointConfig(),
       configsBuilder.coordinatorConfig(),
       configsBuilder.leaseManagementConfig(),
       configsBuilder.lifecycleConfig(),
       configsBuilder.metricsConfig(),
       configsBuilder.processorConfig(),
       configsBuilder.retrievalConfig()
       );
   ```

## Amazon Kinesis 클라이언트 구성
<a name="client-configuration"></a>

Kinesis Client Library 2.0 릴리스에서는 클라이언트 구성이 단일 구성 클래스(`KinesisClientLibConfiguration`)에서 6개 구성 클래스로 이동했습니다. 다음 표에 마이그레이션에 대한 설명이 나와 있습니다.


**구성 필드와 새 클래스**  

| 원래 필드 | 새 구성 클래스 | 설명 | 
| --- | --- | --- | 
| applicationName | ConfigsBuilder | 이 KCL 애플리케이션의 이름입니다. tableName 및 consumerName의 기본값으로 사용됩니다. | 
| tableName | ConfigsBuilder | Amazon DynamoDB 리스 테이블에 사용된 테이블 이름 재정의를 허용합니다. | 
| streamName | ConfigsBuilder | 이 애플리케이션이 레코드를 처리하는 스트림의 이름입니다. | 
| kinesisEndpoint | ConfigsBuilder | 이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오. | 
| dynamoDBEndpoint | ConfigsBuilder | 이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오. | 
| initialPositionInStreamExtended | RetrievalConfig | KCL의 레코드 가져오기가 시작되는 샤드의 위치입니다(애플리케이션의 초기 실행으로 시작). | 
| kinesisCredentialsProvider | ConfigsBuilder | 이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오. | 
| dynamoDBCredentialsProvider | ConfigsBuilder | 이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오. | 
| cloudWatchCredentialsProvider | ConfigsBuilder | 이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오. | 
| failoverTimeMillis | LeaseManagementConfig | 리스 소유자가 실패했다고 간주하기 전에 전달해야 하는 시간(밀리초)입니다. | 
| workerIdentifier | ConfigsBuilder | 이 애플리케이션 프로세서 인스턴스화를 나타내는 고유 식별자입니다. 고유해야 합니다. | 
| shardSyncIntervalMillis | LeaseManagementConfig | 샤드 sync 호출 사이의 시간. | 
| maxRecords | PollingConfig | Kinesis가 반환하는 최대 레코드 수를 설정할 수 있습니다. | 
| idleTimeBetweenReadsInMillis | CoordinatorConfig | 이 옵션은 제거되었습니다. 유휴 시간 제거를 참조하십시오 | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig | 설정하면, Kinesis에서 제공된 레코드가 없는 경우에도 레코드 프로세서가 직접적으로 호출됩니다. | 
| parentShardPollIntervalMillis | CoordinatorConfig | 상위 샤드가 완료되었는지를 확인하기 위해 레코드 프로세서가 폴링을 수행해야 하는 간격입니다. | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig | 설정하면, 하위 리스에서 처리를 시작하자마자 리스가 제거됩니다. | 
| ignoreUnexpectedChildShards | LeaseManagementConfig | 설정하면, 진행 중인 샤드가 있는 하위 샤드가 무시됩니다. 이는 주로 DynamoDB Streams용입니다. | 
| kinesisClientConfig | ConfigsBuilder | 이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오. | 
| dynamoDBClientConfig | ConfigsBuilder | 이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오. | 
| cloudWatchClientConfig | ConfigsBuilder | 이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오. | 
| taskBackoffTimeMillis | LifecycleConfig | 실패한 작업을 재시도하기 위한 대기 시간. | 
| metricsBufferTimeMillis | MetricsConfig | CloudWatch 측정치 게시를 제어합니다. | 
| metricsMaxQueueSize | MetricsConfig | CloudWatch 측정치 게시를 제어합니다. | 
| metricsLevel | MetricsConfig | CloudWatch 측정치 게시를 제어합니다. | 
| metricsEnabledDimensions | MetricsConfig | CloudWatch 측정치 게시를 제어합니다. | 
| validateSequenceNumberBeforeCheckpointing | CheckpointConfig | 이 옵션은 제거되었습니다. 체크포인트 시퀀스 번호 검증을 참조하십시오. | 
| regionName | ConfigsBuilder | 이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오. | 
| maxLeasesForWorker | LeaseManagementConfig | 애플리케이션의 한 인스턴스가 허용해야 하는 최대 리스 수입니다. | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig | 애플리케이션이 한 번에 스틸을 시도해야 하는 최대 리스 수입니다. | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | Kinesis Client Library에서 DynamoDB 리스 테이블을 새로 생성해야 하는 경우 사용되는 DynamoDB 읽기 IOP입니다. | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | Kinesis Client Library에서 DynamoDB 리스 테이블을 새로 생성해야 하는 경우 사용되는 DynamoDB 읽기 IOP입니다. | 
| initialPositionInStreamExtended | LeaseManagementConfig | 애플리케이션이 읽기를 시작해야 하는 스트림 내 초기 위치입니다. 최초 리스 생성 시에만 사용됩니다. | 
| skipShardSyncAtWorkerInitializationIfLeasesExist | CoordinatorConfig | 리스 테이블에 기존 리스가 있는 경우 샤드 데이터 동기화를 비활성화합니다. TODO: KinesisEco-438 | 
| shardPrioritization | CoordinatorConfig | 사용할 샤드 우선 순위. | 
| shutdownGraceMillis | 해당 사항 없음 | 이 옵션은 제거되었습니다. MultiLang 제거를 참조하십시오. | 
| timeoutInSeconds | 해당 사항 없음 | 이 옵션은 제거되었습니다. MultiLang 제거를 참조하십시오. | 
| retryGetRecordsInSeconds | PollingConfig | 실패에 대한 GetRecords 시도 사이의 지연을 구성합니다. | 
| maxGetRecordsThreadPool | PollingConfig | GetRecords에 사용되는 스레드 풀 크기. | 
| maxLeaseRenewalThreads | LeaseManagementConfig | 리스 갱신 스레드 풀의 크기를 제어합니다. 애플리케이션에서 사용할 수 있는 리스가 많을수록 이 풀의 크기가 커야 합니다. | 
| recordsFetcherFactory | PollingConfig | 스트림에서 검색하는 페처를 생성하는 데 사용되는 팩토리를 교체할 수 있습니다. | 
| logWarningForTaskAfterMillis | LifecycleConfig | 작업이 완료되지 않은 경우 얼마나 대기한 후 경고가 기록될지를 지정합니다. | 
| listShardsBackoffTimeInMillis | RetrievalConfig | 실패 발생 시 ListShards 호출 간에 대기하는 시간(밀리초)입니다. | 
| maxListShardsRetryAttempts | RetrievalConfig | 포기하기 전에 ListShards가 재시도하는 최대 횟수입니다. | 

## 유휴 시간 제거
<a name="idle-time-removal"></a>

KCL 1.x 버전에서 `idleTimeBetweenReadsInMillis`는 다음 두 가지 수량에 해당합니다.
+ 작업 디스패칭 확인 간의 시간. 작업 간의 이 시간은 이제 `CoordinatorConfig#shardConsumerDispatchPollIntervalMillis`를 설정하여 구성할 수 있습니다.
+ Kinesis Data Streams에서 반환하는 레코드가 없는 경우 절전 모드로 들어가는 시간. 버전 2.0에서는 향상된 팬아웃 레코드가 해당 검색자로부터 푸시됩니다. 샤드 소비자에 대한 작업은 푸시된 요청이 도착한 경우에만 발생합니다.

## 클라이언트 구성 제거
<a name="client-configuration-removals"></a>

버전 2.0에서 KCL은 더 이상 클라이언트를 생성하지 않습니다. 이제 사용자가 유효한 클라이언트를 제공해야 합니다. 따라서 클라이언트 생성을 제어하던 모든 구성 파라미터는 삭제되었습니다. 이러한 파라미터가 필요한 경우 `ConfigsBuilder`에 클라이언트를 제공하기 전에 클라이언트에서 설정할 수 있습니다.


****  

| 제거된 필드 | 동일 구성 | 
| --- | --- | 
| kinesisEndpoint | SDK KinesisAsyncClient를 원하는 엔드포인트 KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build()로 구성합니다. | 
| dynamoDBEndpoint | SDK DynamoDbAsyncClient를 원하는 엔드포인트 DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build()로 구성합니다. | 
| kinesisClientConfig | SDK KinesisAsyncClient를 필요한 구성 KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build()로 구성합니다. | 
| dynamoDBClientConfig | SDK DynamoDbAsyncClient를 필요한 구성 DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build()로 구성합니다. | 
| cloudWatchClientConfig | SDK CloudWatchAsyncClient를 필요한 구성 CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build()로 구성합니다. | 
| regionName | SDK를 원하는 리전으로 구성합니다. 모든 SDK 클라이언트에 대해 동일합니다. 예를 들어 KinesisAsyncClient.builder().region(Region.US\$1WEST\$12).build()입니다. | 