기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
Kinesis 클라이언트 라이브러리 사용
KDS데이터 스트림의 데이터를 처리할 수 있는 사용자 지정 소비자 애플리케이션을 개발하는 방법 중 하나는 Kinesis Client Library () KCL 를 사용하는 것입니다.
주제
참고
KCL1.x와 KCL 2.x 모두 사용 시나리오에 따라 최신 KCL 1.x 버전 또는 KCL 2.x 버전으로 업그레이드하는 것이 좋습니다. KCL1.x와 KCL 2.x 모두 최신 종속성 및 보안 패치, 버그 수정, 이전 버전과 호환되는 새 기능이 포함된 최신 릴리스로 정기적으로 업데이트됩니다. 자세한 내용은 /release를 참조하십시오. https://github.com/awslabs/ amazon-kinesis-client
Kinesis Client Library란?
KCL분산 컴퓨팅과 관련된 많은 복잡한 작업을 처리하여 Kinesis 데이터 스트림의 데이터를 사용하고 처리할 수 있도록 도와줍니다. 여기에는 여러 소비자 애플리케이션 인스턴스 간의 로드 밸런싱, 소비자 애플리케이션 인스턴스 장애 대응, 처리된 레코드 체크포인트 및 리샤딩 대응이 포함됩니다. 에서는 이러한 모든 하위 작업을 처리하므로 사용자는 사용자 지정 레코드 처리 로직을 작성하는 KCL 데 집중할 수 있습니다.
KCL는 에서 AWS SDKs 사용할 수 있는 Kinesis Data APIs Streams와 다릅니다. Kinesis Data APIs Streams는 스트림 생성, 리샤딩, 레코드 추가 및 가져오기를 포함하여 Kinesis Data Streams의 다양한 측면을 관리하는 데 도움이 됩니다. 는 이러한 모든 하위 작업에 대한 추상화 계층을 KCL 제공하며, 특히 소비자 애플리케이션의 사용자 지정 데이터 처리 로직에 집중할 수 있도록 합니다. Kinesis 데이터 API 스트림에 대한 자세한 내용은 APIAmazon Kinesis 레퍼런스를 참조하십시오.
중요
Java KCL 라이브러리입니다. Java 이외의 언어에 대한 지원은 라는 다국어 인터페이스를 사용하여 제공됩니다. MultiLangDaemon 이 데몬은 Java 기반이며 Java 이외의 언어를 사용하는 경우 백그라운드에서 실행됩니다. KCL 예를 들어 KCL Python용 를 설치하고 소비자 응용 프로그램 전체를 Python으로 작성하더라도 다음과 같은 이유로 시스템에 Java를 설치해야 MultiLangDaemon 합니다. 또한 MultiLangDaemon 에는 사용 사례에 맞게 사용자 지정해야 할 수 있는 몇 가지 기본 설정 (예: 연결 AWS 지역) 이 있습니다. MultiLangDaemon on에 대한 자세한 GitHub 내용은 KCL MultiLangDaemon 프로젝트를
는 레코드 처리 로직과 Kinesis Data Streams 사이에서 중개자 KCL 역할을 합니다. KCL은 다음과 같은 작업을 수행합니다.
-
데이터 스트림에 연결합니다
-
데이터 스트림 내 샤드를 열거합니다.
-
임대를 사용하여 샤드 연결을 해당 워커와 조정합니다.
-
관리하는 모든 샤드의 레코드 프로세서를 인스턴스화합니다
-
데이터 스트림에서 데이터 레코드를 가져옵니다.
-
해당하는 레코드 프로세서로 레코드를 푸시합니다
-
처리된 레코드에 대해 체크포인트를 수행합니다
-
워커 인스턴스 수가 변경되거나 데이터 스트림이 리샤딩(샤드 분할 또는 병합)될 때 샤드-워커 연결(리스)의 균형을 유지합니다.
KCL사용 가능한 버전
현재 지원되는 다음 버전 중 KCL 하나를 사용하여 사용자 지정 소비자 애플리케이션을 구축할 수 있습니다.
-
KCL1.x
자세한 내용은 KCL1.x 소비자 개발 단원을 참조하세요.
-
KCL2.x
자세한 내용은 KCL2.x 소비자 개발 단원을 참조하세요.
KCL1.x 또는 KCL 2.x를 사용하여 공유 처리량을 사용하는 소비자 애플리케이션을 구축할 수 있습니다. 자세한 내용은 를 사용하여 처리량을 공유하는 맞춤형 소비자 개발 KCL 단원을 참조하십시오.
전용 처리량을 사용하는 소비자 애플리케이션 (향상된 팬아웃 소비자) 을 빌드하려면 2.x만 사용할 수 있습니다. KCL 자세한 내용은 전용 처리량 (향상된 팬아웃) 으로 맞춤형 소비자 개발 단원을 참조하십시오.
1.x와 2.x의 차이점에 대한 자세한 KCL 내용 및 KCL 1.x에서 KCL 2.x로 마이그레이션하는 방법에 대한 지침은 을 참조하십시오. KCL 소비자를 KCL 1.x에서 2.x로 KCL 마이그레이션
KCL개념
-
KCL소비자 애플리케이션 — 데이터 스트림을 사용하여 맞춤 KCL 구축되고 데이터 스트림에서 레코드를 읽고 처리하도록 설계된 애플리케이션입니다.
-
소비자 애플리케이션 인스턴스 - KCL 소비자 애플리케이션은 일반적으로 분산되어 있으며, 장애 발생 시 이를 조정하고 데이터 레코드 처리의 부하를 동적으로 분산하기 위해 하나 이상의 애플리케이션 인스턴스가 동시에 실행됩니다.
-
작업자 — KCL 소비자 애플리케이션 인스턴스가 데이터 처리를 시작하는 데 사용하는 상위 수준 클래스입니다.
중요
각 KCL 소비자 애플리케이션 인스턴스에는 워커가 한 명씩 있습니다.
워커는 샤드 및 리스 정보 동기화, 샤드 할당 추적, 샤드의 데이터 처리 등 다양한 작업을 초기화하고 감독합니다. 워커는 소비자 애플리케이션에 대한 구성 정보 (예: 이 소비자 애플리케이션이 처리할 데이터 레코드의 데이터 스트림 이름, 이 KCL 데이터 스트림에 액세스하는 데 필요한 AWS 자격 증명) 를 제공합니다KCL. 또한 워커는 해당 특정 KCL 소비자 애플리케이션 인스턴스를 킥시작하여 데이터 스트림의 데이터 레코드를 레코드 프로세서로 전달합니다.
중요
KCL1.x에서는 이 클래스를 Worker라고 합니다. (이들은 자바 KCL 리포지토리입니다) 에 대한 자세한 내용은 /blob/v1.x/src/main/java/com/AmazonAWS/Services/Kinesis/ClientLibrary/lib/Worker/Worker.java를 참조하십시오. https://github.com/awslabs/ amazon-kinesis-client
2.x에서는 이 클래스를 스케줄러라고 합니다. KCL KCL2.x의 스케줄러 용도는 1.x의 작업자 목적과 동일합니다. KCL KCL2.x의 스케줄러 클래스에 대한 자세한 내용은 /blob/master/ /src/main/java/software/amazon/kinesis/coordinator/Scheduler.java 를 참조하십시오. https://github.com/awslabs/ amazon-kinesis-client amazon-kinesis-client -
리스 - 워커와 샤드 간의 바인딩을 정의하는 데이터입니다. 분산된 KCL 소비자 애플리케이션은 임대를 사용하여 여러 작업자 간에 데이터 레코드 처리를 분할합니다. 언제든지 각 데이터 레코드 분할은 변수로 식별되는 임대 계약을 통해 특정 작업자에게 바인딩됩니다. leaseKey
기본적으로 작업자는 작업자 변수 값에 따라 하나 이상의 임대를 동시에 보유할 수 있습니다. maxLeasesFor
중요
모든 워커는 데이터 스트림에서 사용 가능한 모든 샤드에 대해 사용 가능한 모든 리스를 보유하고자 경쟁합니다. 하지만 한 번에 하나의 워커만 각 리스를 성공적으로 보유할 수 있습니다.
예를 들어, 소비자 애플리케이션 인스턴스 A에 워커 A가 있고 이 인스턴스가 4개의 샤드로 구성된 데이터 스트림을 처리하는 경우 워커 A는 샤드 1, 2, 3, 4에 대한 리스를 동시에 보유할 수 있습니다. 그러나 A와 B라는 2개의 소비자 애플리케이션 인스턴스에 각각 워커 A와 워커 B가 있고 이들 인스턴스가 4개의 샤드로 구성된 데이터 스트림을 처리하는 경우 워커 A와 워커 B는 샤드 1에 대한 리스를 동시에 보유할 수 없습니다. 한 워커가 특정 샤드의 데이터 레코드 처리를 중지할 준비가 되거나 실패할 때까지 이 샤드에 대한 리스를 보유합니다. 한 워커가 리스 보유를 중지하면 다른 워커가 리스를 인수하여 보유합니다.
자바 KCL 리포지토리에 대한 자세한 내용은 1.x의 경우 https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/AmazonAWS/services/kinesis/leases/impl/lease.java를, 2.x의 경우 /blob/master/ /src/main/java/software/amazon/kinesis/leases/Lease.java 를
참조하십시오. KCL https://github.com/awslabs/ amazon-kinesis-client amazon-kinesis-client KCL -
임대 테이블 - 소비자 애플리케이션 작업자가 임대하고 처리 중인 데이터 스트림의 샤드를 추적하는 데 사용되는 고유한 Amazon DynamoDB 테이블입니다. KDS KCL 소비자 애플리케이션이 실행되는 동안 임대 테이블은 데이터 스트림의 최신 샤드 정보와 동기화 상태를 유지해야 합니다 (작업자 내부 및 모든 작업자 간에). KCL 자세한 내용은 임대 테이블을 사용하여 소비자 애플리케이션에서 처리한 샤드를 추적할 수 있습니다. KCL 단원을 참조하십시오.
-
레코드 프로세서 — KCL 소비자 애플리케이션이 데이터 스트림에서 가져온 데이터를 처리하는 방법을 정의하는 로직입니다. 런타임 시 KCL 소비자 애플리케이션 인스턴스는 워커를 인스턴스화하고, 이 워커는 임대를 보유한 샤드당 레코드 프로세서 하나를 인스턴스화합니다.
임대 테이블을 사용하여 소비자 애플리케이션에서 처리한 샤드를 추적할 수 있습니다. KCL
리스 테이블이란 무엇입니까?
각 Amazon Kinesis Data Streams 애플리케이션에 대해 고유한 임대 테이블 (Amazon DynamoDB 테이블에 저장됨) 을 사용하여 소비자 애플리케이션 KCL 작업자가 임대하고 처리 중인 데이터 스트림의 샤드를 KDS 추적합니다. KCL
중요
KCL소비자 애플리케이션의 이름을 사용하여 이 소비자 애플리케이션이 사용하는 임대 테이블의 이름을 생성하므로 각 소비자 애플리케이션 이름은 고유해야 합니다.
소비자 애플리케이션이 실행되는 동안 Amazon DynamoDB 콘솔을 사용하여 리스 테이블을 볼 수 있습니다.
응용 프로그램이 시작될 때 KCL 소비자 응용 프로그램의 임대 테이블이 없는 경우 작업자 중 한 명이 이 응용 프로그램에 대한 임대 테이블을 생성합니다.
중요
Kinesis Data Streams 자체와 관련된 비용 외에도 DynamoDB 테이블 관련 비용이 계정에 청구됩니다.
리스 테이블의 각 행은 소비자 애플리케이션의 워커가 처리 중인 샤드를 나타냅니다. KCL소비자 애플리케이션이 데이터 스트림을 하나만 처리하는 경우 임대 테이블의 해시 키는 샤드 ID입니다. leaseKey
Java 소비자 애플리케이션용 동일한 KCL 2.x로 여러 데이터 스트림을 처리합니다.그렇다면 의 구조는 leaseKey 다음과 같습니다. account-id:StreamName:streamCreationTimestamp:ShardId
예: 111111111:multiStreamTest-1:12345:shardId-000000000336
.
각 행에는 샤드 ID 외에도 다음 데이터가 포함됩니다.
-
checkpoint: 샤드의 가장 최근 체크포인트 시퀀스 번호입니다. 이 값은 데이터 스트림의 모든 샤드에서 고유합니다.
-
checkpointSubSequence번호: Kinesis 프로듀서 라이브러리의 집계 기능을 사용할 때 이는 Kinesis 레코드 내의 개별 사용자 레코드를 추적하는 체크포인트의 확장입니다.
-
leaseCounter: 임대 버전 관리에 사용되므로 작업자가 다른 근로자가 자신의 임대를 도용했음을 감지할 수 있습니다.
-
leaseKey: 임대의 고유 식별자. 각 리스는 데이터 스트림의 샤드에 특정적이며 한 번에 한 워커가 리스를 보유합니다.
-
leaseOwner: 이 임대차 계약을 체결한 근로자.
-
ownerSwitchesSince체크포인트: 마지막으로 체크포인트를 작성한 이후 이 임대 계약으로 인해 근로자가 몇 번이나 변경되었나요?
-
parentShardId: 하위 샤드에서 처리가 시작되기 전에 상위 샤드가 완전히 처리되었는지 확인하는 데 사용됩니다. 그러면 스트림에 입력된 순서대로 레코드가 처리됩니다.
-
hashrange:
PeriodicShardSyncManager
에서 주기적 동기화를 실행하여 리스 테이블에서 누락된 샤드를 찾고 필요한 경우 리스를 생성하는 데 사용됩니다.참고
이 데이터는 KCL 1.14 및 2.3부터 시작하는 모든 샤드의 임대 테이블에 있습니다. KCL
PeriodicShardSyncManager
및 리스와 샤드 간의 주기적 동기화에 대한 자세한 내용은 임대 테이블을 Kinesis 데이터 스트림의 샤드와 동기화하는 방법 섹션을 참조하세요. -
childshards:
LeaseCleanupManager
에서 하위 샤드의 처리 상태를 검토하고 리스 테이블에서 상위 샤드를 삭제할 수 있는지 여부를 결정하는 데 사용됩니다.참고
이 데이터는 KCL 1.14 및 2.3부터 시작하는 모든 샤드의 임대 테이블에 있습니다. KCL
-
shardID: 샤드의 ID입니다.
참고
이 데이터는 사용자가 Java 소비자 애플리케이션용 동일한 KCL 2.x로 여러 데이터 스트림을 처리합니다.를 수행하는 경우에만 리스 테이블에 있습니다. 이 기능은 Java의 경우 KCL 2.x에서만 지원되며, Java의 경우 KCL 2.3 이상부터 지원됩니다.
-
stream name
account-id:StreamName:streamCreationTimestamp
형식의 데이터 스트림 식별자입니다.참고
이 데이터는 사용자가 Java 소비자 애플리케이션용 동일한 KCL 2.x로 여러 데이터 스트림을 처리합니다.를 수행하는 경우에만 리스 테이블에 있습니다. 이 기능은 Java용 KCL 2.x에서만 지원되며, Java의 경우 KCL 2.3 이상부터 지원됩니다.
처리량
Amazon Kinesis Data Streams 애플리케이션이 프로비저닝된 처리량을 예외를 수신하는 경우 DynamoDB 테이블의 프로비저닝된 처리량을 늘려야 합니다. 에서는 초당 읽기 10회, 초당 쓰기 10회의 프로비저닝된 처리량으로 테이블을 KCL 생성하지만, 애플리케이션에 이 정도로는 충분하지 않을 수 있습니다. 예를 들어, Amazon Kinesis Data Streams 애플리케이션이 체크포인트를 자주 수행하거나 여러 샤드로 구성된 스트림에서 작동하는 경우 처리량이 더 필요할 수 있습니다.
DynamoDB의 프로비저닝된 처리량에 대한 자세한 내용은 Amazon DynamoDB 개발자 안내서의 읽기/쓰기 용량 모드 및 DynamoDB의 테이블 및 데이터 작업을 참조하세요.
임대 테이블을 Kinesis 데이터 스트림의 샤드와 동기화하는 방법
KCL소비자 애플리케이션 작업자는 임대를 사용하여 지정된 데이터 스트림의 샤드를 처리합니다. 특정 시간에 어떤 워커가 어떤 샤드를 리스하는지에 대한 정보는 리스 테이블에 저장됩니다. KCL소비자 애플리케이션이 실행되는 동안 임대 테이블은 데이터 스트림의 최신 샤드 정보와 동기화된 상태를 유지해야 합니다. KCL소비자 애플리케이션 부트스트래핑 중 (소비자 애플리케이션이 초기화 또는 재시작될 때) 및 처리 중인 샤드가 종료 (리샤딩) 에 도달할 때마다 Kinesis Data Streams 서비스에서 획득한 샤드 정보와 임대 테이블을 동기화합니다. 즉, 작업자 또는 KCL 소비자 애플리케이션은 초기 소비자 애플리케이션 부트스트랩 동안 그리고 소비자 애플리케이션에서 데이터 스트림 재공유 이벤트가 발생할 때마다 처리 중인 데이터 스트림과 동기화됩니다.
주제
KCL1.0 - 1.13 및 2.0 - 2.2에서의 동기화 KCL
KCL1.0 - 1.13 및 KCL 2.0 - 2.2에서는 소비자 애플리케이션의 부트스트래핑 중 및 각 데이터 스트림 재샤드 이벤트 중에 또는 검색을 호출하여 Kinesis Data Streams 서비스에서 획득한 샤드 정보와 임대 테이블을 KCL 동기화합니다. ListShards
DescribeStream
APIs 위에 나열된 모든 KCL 버전에서 소비자 애플리케이션의 각 워커는 KCL 소비자 애플리케이션의 부트스트래핑 중 및 각 스트림 reshard 이벤트에서 다음 단계를 완료하여 임대/샤드 동기화 프로세스를 수행합니다.
-
처리 중인 데이터 스트림에 대한 모든 샤드를 가져옵니다.
-
리스 테이블에서 모든 샤드 리스를 가져옵니다.
-
리스 테이블에 리스가 없는 각 열린 샤드를 필터링합니다.
-
발견된 모든 열린 샤드와 열린 상위 항목이 없는 각 열린 샤드를 반복합니다.
-
상위 항목 경로를 통해 계층 트리를 탐색하여 샤드가 하위 항목인지 확인합니다. 상위 샤드가 처리 중이거나(상위 샤드에 대한 리스 항목이 리스 테이블에 있음) 상위 샤드를 처리해야 하는 경우(예: 초기 위치가
TRIM_HORIZON
또는AT_TIMESTAMP
인 경우) 샤드는 하위 항목으로 간주됩니다. -
컨텍스트에 열려 있는 샤드가 하위 항목인 경우 초기 위치를 기준으로 샤드를 KCL 체크포인팅하고 필요한 경우 상위 데이터베이스를 위한 임대를 생성합니다.
-
KCL2.x에서의 동기화 (2.3 이상부터 시작) KCL
지원되는 최신 버전의 KCL 2.x (KCL2.3) 이상부터 라이브러리는 이제 다음과 같은 동기화 프로세스 변경 사항을 지원합니다. 이러한 리스/샤드 동기화 변경은 소비자 애플리케이션이 Kinesis Data Streams 서비스를 API 호출하는 횟수를 크게 줄이고 KCL 소비자 애플리케이션의 리스 관리를 최적화합니다. KCL
-
애플리케이션을 부트스트래핑하는 동안 리스 테이블이 비어 있는 경우
ListShard
API 의 필터링 옵션 (ShardFilter
선택적 요청 파라미터) 을 KCL 사용하여 파라미터로 지정된 시간에 열린 샤드의 스냅샷에 대해서만 임대를 검색하고 생성합니다.ShardFilter
ShardFilter
파라미터를 사용하면 의 응답을 필터링할 수 있습니다.ListShards
APIShardFilter
파라미터의 유일한 필수 속성은Type
입니다. KCLType
필터 속성과 다음과 같은 유효한 값을 사용하여 새 임대가 필요할 수 있는 오픈 샤드의 스냅샷을 식별하고 반환합니다.-
AT_TRIM_HORIZON
-TRIM_HORIZON
에서 열려 있던 모든 샤드가 응답에 포함됩니다. -
AT_LATEST
- 데이터 스트림의 현재 열려 있는 샤드만 응답에 포함됩니다. -
AT_TIMESTAMP
- 시작 타임스탬프가 지정된 타임스탬프보다 작거나 같고 종료 타임스탬프가 지정된 타임스탬프보다 크거나 같거나 여전히 열려 있는 모든 샤드가 응답에 포함됩니다.
ShardFilter
는RetrievalConfig#initialPositionInStreamExtended
에 지정된 샤드의 스냅샷에 대한 리스를 초기화하기 위해 빈 리스 테이블에 대한 리스를 생성할 때 사용됩니다.ShardFilter
에 대한 자세한 정보는 https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html을 참조하세요. -
-
모든 워커가 리스/하드 동기화를 수행하여 데이터 스트림의 최신 샤드로 리스 테이블을 최신 상태로 유지하는 대신 선출된 단일 워커 리더가 리스/샤드 동기화를 수행합니다.
-
KCL2.3은 의
ChildShards
반환 매개 변수를 사용하여 폐쇄된SubscribeToShard
APIs 샤드의SHARD_END
경우 발생하는 임대/샤드 동기화를 수행하므로 KCL 작업자는 처리가 완료된 샤드의 하위 샤드에 대해서만 임대를 생성할 수 있습니다.GetRecords
소비자 애플리케이션 전체에서 공유되는 경우 이 리스/샤드 동기화 최적화에서는 의 매개변수를 사용합니다.ChildShards
GetRecords
API 전용 처리량 (향상된 팬아웃) 소비자 애플리케이션의 경우 이러한 임대/샤드 동기화 최적화에는 의 매개 변수가 사용됩니다.ChildShards
SubscribeToShard
API 자세한 내용은, 및 을 참조하십시오. GetRecordsSubscribeToShardsChildShard -
위의 변경으로 모든 작업자가 기존의 모든 샤드에 대해 학습하는 모델에서 작업자가 각 작업자가 소유한 샤드의 하위 샤드에 대해서만 학습하는 모델로 이동하고 있습니다. KCL 따라서 소비자 애플리케이션 부트스트래핑 및 리샤드 이벤트 중에 발생하는 동기화 외에도 KCL 이제는 추가 주기적인 샤드/리스 스캔을 수행하여 리스 테이블의 잠재적 허점을 식별 (즉, 모든 새 샤드에 대해 파악) 하여 데이터 스트림의 전체 해시 범위가 처리되고 있는지 확인하고 필요한 경우 리스를 생성합니다.
PeriodicShardSyncManager
주기적인 리스/샤드 스캔 실행을 담당하는 구성 요소입니다.PeriodicShardSyncManager
IN KCL 2.3에 대한 자세한 내용은 /blob/master/ /src/main/java/software/amazon/kinesis/leases/ .java #L201 -L213을 참조하십시오 https://github.com/awslabs/amazon-kinesis-client. amazon-kinesis-client LeaseManagementConfig2.3에서는 다음과 같은 새 구성 옵션을 구성할 수 있습니다. KCL
PeriodicShardSyncManager
LeaseManagementConfig
명칭 기본값 설명 leasesRecoveryAuditorExecutionFrequencyMillis 12만(2분)
리스 테이블에서 부분 리스를 검색하는 감사자 작업의 빈도(밀리초)입니다. 감사자가 스트림의 리스에서 구멍을 발견하면
leasesRecoveryAuditorInconsistencyConfidenceThreshold
를 기반으로 샤드 동기화를 트리거합니다.leasesRecoveryAuditorInconsistencyConfidenceThreshold 3
리스 테이블의 데이터 스트림에 대한 리스가 불일치하는지 확인하기 위한 정기 감사 작업의 신뢰도 임곗값입니다. 감사자가 데이터 스트림에 대해 동일한 불일치 세트를 여러 번 연속해서 발견하면 샤드 동기화가 트리거됩니다.
이제 의 상태를 모니터링하기 위한 새 CloudWatch 메트릭도 생성됩니다.
PeriodicShardSyncManager
자세한 내용은 PeriodicShardSyncManager 단원을 참조하십시오. -
하나의 샤드 계층에 대해서만 리스를 생성하도록
HierarchicalShardSyncer
에 대한 최적화를 포함합니다.
KCL1.x에서의 동기화 (1.14 이상부터 시작KCL)
지원되는 최신 버전의 KCL 1.x (KCL1.14) 이상부터 라이브러리는 이제 동기화 프로세스에 대한 다음과 같은 변경 사항을 지원합니다. 이러한 리스/샤드 동기화 변경은 소비자 애플리케이션이 Kinesis Data Streams 서비스를 API 호출하는 횟수를 크게 줄이고 KCL 소비자 애플리케이션의 리스 관리를 최적화합니다. KCL
-
애플리케이션을 부트스트래핑하는 동안 리스 테이블이 비어 있는 경우
ListShard
API 의 필터링 옵션 (ShardFilter
선택적 요청 파라미터) 을 KCL 사용하여 파라미터로 지정된 시간에 열린 샤드의 스냅샷에 대해서만 임대를 검색하고 생성합니다.ShardFilter
ShardFilter
파라미터를 사용하면 의 응답을 필터링할 수 있습니다.ListShards
APIShardFilter
파라미터의 유일한 필수 속성은Type
입니다. KCLType
필터 속성과 다음과 같은 유효한 값을 사용하여 새 임대가 필요할 수 있는 오픈 샤드의 스냅샷을 식별하고 반환합니다.-
AT_TRIM_HORIZON
-TRIM_HORIZON
에서 열려 있던 모든 샤드가 응답에 포함됩니다. -
AT_LATEST
- 데이터 스트림의 현재 열려 있는 샤드만 응답에 포함됩니다. -
AT_TIMESTAMP
- 시작 타임스탬프가 지정된 타임스탬프보다 작거나 같고 종료 타임스탬프가 지정된 타임스탬프보다 크거나 같거나 여전히 열려 있는 모든 샤드가 응답에 포함됩니다.
ShardFilter
는KinesisClientLibConfiguration#initialPositionInStreamExtended
에 지정된 샤드의 스냅샷에 대한 리스를 초기화하기 위해 빈 리스 테이블에 대한 리스를 생성할 때 사용됩니다.ShardFilter
에 대한 자세한 정보는 https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html을 참조하세요. -
-
모든 워커가 리스/하드 동기화를 수행하여 데이터 스트림의 최신 샤드로 리스 테이블을 최신 상태로 유지하는 대신 선출된 단일 워커 리더가 리스/샤드 동기화를 수행합니다.
-
KCL1.14는 의
ChildShards
반환 매개 변수를 사용하여 폐쇄된SubscribeToShard
APIs 샤드의SHARD_END
경우 발생하는 임대/샤드 동기화를 수행하므로 KCL 작업자는 처리가 완료된 샤드의 하위 샤드에 대해서만 임대를 생성할 수 있습니다.GetRecords
자세한 내용은 및 GetRecordsChildShard을 참조하십시오. -
위의 변경으로 모든 작업자가 기존의 모든 샤드에 대해 학습하는 모델에서 작업자가 각 작업자가 소유한 샤드의 하위 샤드에 대해서만 학습하는 모델로 이동하고 있습니다. KCL 따라서 소비자 애플리케이션 부트스트래핑 및 리샤드 이벤트 중에 발생하는 동기화 외에도 KCL 이제는 추가 주기적인 샤드/리스 스캔을 수행하여 리스 테이블의 잠재적 허점을 식별 (즉, 모든 새 샤드에 대해 파악) 하여 데이터 스트림의 전체 해시 범위가 처리되고 있는지 확인하고 필요한 경우 리스를 생성합니다.
PeriodicShardSyncManager
주기적인 리스/샤드 스캔 실행을 담당하는 구성 요소입니다.KinesisClientLibConfiguration#shardSyncStrategyType
이ShardSyncStrategyType.SHARD_END
로 설정된 경우PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold
는 샤드 동기화 시행을 위한 리스 테이블에 구멍이 포함된 연속 스캔 수에 대한 임곗값을 결정하는 데 사용됩니다.KinesisClientLibConfiguration#shardSyncStrategyType
이ShardSyncStrategyType.PERIODIC
으로 설정된 경우leasesRecoveryAuditorInconsistencyConfidenceThreshold
은 무시됩니다.1.14에서는 다음과 같은 새 구성 옵션을 사용하여 구성할 수 있습니다. KCL
PeriodicShardSyncManager
LeaseManagementConfig
명칭 기본값 설명 leasesRecoveryAuditorInconsistencyConfidenceThreshold 3
리스 테이블의 데이터 스트림에 대한 리스가 불일치하는지 확인하기 위한 정기 감사 작업의 신뢰도 임곗값입니다. 감사자가 데이터 스트림에 대해 동일한 불일치 세트를 여러 번 연속해서 발견하면 샤드 동기화가 트리거됩니다.
이제 의 상태를 모니터링하기 위한 새 CloudWatch 메트릭도 생성됩니다.
PeriodicShardSyncManager
자세한 내용은 PeriodicShardSyncManager 단원을 참조하십시오. -
KCL1.14는 이제 지연 임대 정리도 지원합니다. 샤드가 데이터 스트림의 보존 기간을 지나 만료되었거나 리샤딩 작업의 결과로 닫힌 경우
SHARD_END
에 도달하면LeaseCleanupManager
가 리스를 비동기식으로 삭제합니다.새로운 구성 옵션을 사용하여
LeaseCleanupManager
를 구성할 수 있습니다.명칭 기본값 설명 leaseCleanupInterval밀리스 1분
리스 정리 스레드를 실행하는 간격입니다.
completedLeaseCleanupIntervalMillis 5분 리스가 완료되었는지 여부를 확인하는 간격입니다.
garbageLeaseCleanupIntervalMillis 30 분 리스가 가비지(즉, 데이터 스트림의 보존 기간을 지나 트리밍됨)인지 여부를 확인하는 간격입니다.
-
하나의 샤드 계층에 대해서만 리스를 생성하도록
KinesisShardSyncer
에 대한 최적화를 포함합니다.
Java 소비자 애플리케이션용 동일한 KCL 2.x로 여러 데이터 스트림을 처리합니다.
이 섹션에서는 두 개 이상의 데이터 스트림을 동시에 처리할 수 있는 KCL 소비자 응용 프로그램을 생성할 수 있도록 하는 Java KCL 2.x의 다음 변경 사항에 대해 설명합니다.
중요
멀티스트림 처리는 Java용 KCL 2.x에서만 지원되며 Java의 경우 KCL 2.3 이상부터 지원됩니다.
멀티스트림 처리는 KCL 2.x를 구현할 수 있는 다른 모든 언어에 대해 NOT 지원됩니다.
멀티스트림 처리는 모든 버전의 NOT 1.x에서 지원됩니다. KCL
-
MultistreamTracker 인터페이스
여러 스트림을 동시에 처리할 수 있는 소비자 애플리케이션을 구축하려면 라는 새 인터페이스를 구현해야 합니다 MultistreamTracker
. 이 인터페이스에는 KCL 소비자 응용 프로그램에서 처리할 데이터 스트림 및 해당 구성 목록을 반환하는 streamConfigList
메서드가 포함되어 있습니다. 처리되는 데이터 스트림은 소비자 애플리케이션 런타임 중에 변경될 수 있다는 점에 유의하십시오.streamConfigList
는 처리할 데이터 스트림의 변경 사항을 KCL 파악하기 위해 주기적으로 호출합니다.streamConfigList
메서드가 StreamConfig목록을 채웁니다. package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }
StreamIdentifier
및InitialPositionInStreamExtended
는 필수 필드이고consumerArn
은 선택 사항입니다. KCL2.x를 사용하여 향상된 팬아웃 소비자 애플리케이션을 구현하는consumerArn
경우에만 를 제공해야 합니다.에 대한 자세한 내용은 https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/
StreamIdentifier
/src/main/java/software/amazon/kinesis/common/ .java #L129 를 amazon-kinesis-client참조하십시오. StreamIdentifier 를 생성하려면 v2.5.0 이상에서 사용할 수 있는 및 에서 멀티스트림 인스턴스를 생성하는 것이 좋습니다. StreamIdentifier
streamArn
streamCreationEpoch
지원하지 않는 KCL v2.3 및 v2.4에서는 다음 형식을 사용하여streamArm
멀티스트림 인스턴스를 생성합니다.account-id:StreamName:streamCreationTimestamp
이 형식은 더 이상 사용되지 않으며 다음 주요 릴리스부터 더 이상 지원되지 않습니다.MultistreamTracker
에는 리스 테이블(formerStreamsLeasesDeletionStrategy
)에서 오래된 스트림의 리스를 삭제하기 위한 전략도 포함되어 있습니다. 소비자 애플리케이션 런타임 중에 전략이 CANNOT 변경된다는 점에 유의하십시오. 자세한 내용은 https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/ /src/main/java/software/amazon/kinesis/processor/.java를 amazon-kinesis-client참조하십시오. FormerStreamsLeasesDeletionStrategy -
ConfigsBuilder
소비자 애플리케이션을 구축할 때 사용할 모든 2.x 구성 설정을 지정하는 데 사용할 수 있는 애플리케이션 전반의 클래스입니다. KCL KCL ConfigsBuilder
클래스는 이제 인터페이스를 지원합니다.MultistreamTracker
ConfigsBuilder둘 중 하나를 데이터 스트림의 이름으로 초기화하여 다음 데이터 스트림의 레코드를 소비할 수 있습니다./** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
또는 여러 스트림을 동시에 처리하는 KCL 소비자 애플리케이션을 구현하려는
MultiStreamTracker
경우 를 사용하여 초기화할 ConfigsBuilder 수 있습니다.* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
KCL소비자 응용 프로그램에 대한 멀티스트림 지원이 구현됨에 따라 이제 응용 프로그램 임대 테이블의 각 행에 이 응용 프로그램이 처리하는 여러 데이터 스트림의 샤드 ID와 스트림 이름이 포함됩니다.
-
KCL소비자 애플리케이션에 대한 멀티스트림 지원이 구현되면 는 다음과 같은 구조를 leaseKey 취합니다.
account-id:StreamName:streamCreationTimestamp:ShardId
예:111111111:multiStreamTest-1:12345:shardId-000000000336
.중요
기존 KCL 소비자 애플리케이션이 하나의 데이터 스트림만 처리하도록 구성된 경우 leaseKey (임대 테이블의 해시 키) 는 샤드 ID입니다. 멀티스트림 지원의 경우 leaseKey 구조가 다음과 같아야 하므로 여러 데이터 스트림을 처리하도록 기존 KCL 소비자 애플리케이션을 재구성하면 임대 테이블이 깨집니다.
account-id:StreamName:StreamCreationTimestamp:ShardId
스키마 KCL 레지스트리와 함께 사용하십시오. AWS Glue
Kinesis 데이터 스트림을 AWS Glue 스키마 레지스트리와 통합할 수 있습니다. AWS Glue 스키마 레지스트리를 사용하면 스키마를 중앙에서 검색, 제어 및 발전시키는 동시에 생성된 데이터가 등록된 스키마에 의해 지속적으로 검증되도록 할 수 있습니다. 스키마는 데이터 레코드의 구조와 포맷을 정의합니다. 스키마는 신뢰할 수 있는 데이터 게시, 소비 또는 저장을 위한 버전 지정 사양입니다. AWS Glue스키마 레지스트리를 사용하면 스트리밍 애플리케이션 내에서 end-to-end 데이터 품질과 데이터 거버넌스를 개선할 수 있습니다. 자세한 내용은 AWS Glue Schema Registry를 참조하세요. 이 통합을 설정하는 방법 중 하나는 Java를 KCL 사용하는 것입니다.
중요
현재 Kinesis Data Streams AWS Glue 및 스키마 레지스트리 통합은 Java로 구현된 2.3 컨슈머를 KCL 사용하는 Kinesis 데이터 스트림에서만 지원됩니다. 다국어 지원은 제공되지 않습니다. KCL1.0 소비자는 지원되지 않습니다. KCLKCL2.3 이전의 2.x 소비자는 지원되지 않습니다.
를 사용하여 KCL Kinesis Data Streams와 스키마 레지스트리의 통합을 설정하는 방법에 대한 자세한 지침은 사용 사례: Amazon Kinesis Data Streams를 Glue 스키마 레지스트리와 통합의 “KCL/라이브러리를 사용하여 KPL 데이터와 상호 작용하기” 섹션을 참조하십시오. AWS