소비자 구현 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

소비자 구현

튜토리얼: KPL 및 KCL 1.x를 사용하여 실시간 주식 데이터 처리의 소비자 애플리케이션은 프로듀서 구현에서 만든 주식 거래 스트림을 계속 처리합니다. 그런 다음 1분마다 매매된 가장 인기 있는 주식들을 출력합니다. 이 애플리케이션은 Kinesis Client Library () 를 기반으로 구축되었으며, Kinesis Client Library (KCL) 는 소비자 앱에서 흔히 볼 수 있는 어려운 작업을 대부분 수행합니다. 자세한 내용은 KCL1.x 소비자 개발 단원을 참조하십시오.

소스 코드를 참조하여 다음 정보를 검토하십시오.

StockTradesProcessor 클래스

다음 작업을 수행하는 소비자의 기본 클래스가 제공됩니다.

  • 인수로 전달된 애플리케이션, 스트림 및 리전 이름을 읽습니다.

  • ~/.aws/credentials에서 자격 증명을 읽습니다.

  • RecordProcessorFactory 인스턴스에 의해 구현된 RecordProcessor의 서버 인스턴스를 제공하는 StockTradeRecordProcessor 인스턴스를 생성합니다.

  • RecordProcessorFactory인스턴스와 스트림 이름, 자격 증명, 애플리케이션 이름을 포함한 표준 구성을 사용하여 KCL 작업자를 만듭니다.

  • 워커는 각 샤드(이 소비자 인스턴스에 할당된 샤드)에 대해 새 스레드를 생성하며, 이 스레드는 Kinesis Data Streams에서 계속 반복적으로 레코드를 읽습니다. 그런 다음 RecordProcessor 인스턴스를 호출하여 수신한 각 일괄 레코드를 처리합니다.

StockTradeRecordProcessor 클래스

RecordProcessor 인스턴스를 구현하고 initialize, processRecordsshutdown의 세 가지 필수 메서드를 구현합니다.

Kinesis Client Library는 initializeshutdown을 사용하여 레코드 프로세서에 레코드 수신을 시작할 준비가 될 시점과 레코드 수신을 중지해야 할 시점을 각각 알리므로, 레코드 프로세스가 모든 애플리케이션별 설정 및 종료 작업을 수행할 수 있습니다. 이에 대한 코드가 제공됩니다. processRecords 메서드에서 기본 처리가 발생하며, 각 레코드에 대해 processRecord를 사용합니다. 이 후자의 메서드는 사용자에 대해 대부분의 빈 스켈레톤 코드로 제공되어 향후 설명할 다음 단계에서 구현됩니다.

원래 소스 코드에서 비어 있는 processRecord: reportStats, and resetStats에 대한 지원 메서드의 구현에 유의하십시오.

processRecords 메서드가 구현되며 다음 단계를 수행합니다.

  • 전달된 각 레코드의 경우 processRecord를 호출합니다.

  • 마지막 보고 이후 1분 이상이 경과된 경우 최신 통계를 인쇄하는 reportStats()를 호출한 후 통계를 지우는 resetStats()를 호출하여 다음 간격에 새 레코드만 포함되도록 합니다.

  • 다음 보고 시간을 설정합니다.

  • 마지막 체크포인트 이후 1분 이상이 경과된 경우 checkpoint()를 호출합니다.

  • 다음 검사 시간을 설정합니다.

이 메서드는 보고 및 검사 속도에 대해 60초 간격을 사용합니다. 검사에 대한 자세한 내용은 소비자에 대한 추가 정보 단원을 참조하세요.

StockStats 클래스

이 클래스는 시간에 따른 가장 인기 있는 주식에 대한 통계 추적 및 데이터 보존을 제공합니다. 다음 메서드가 포함된 이 코드가 제공됩니다.

  • addStockTrade(StockTrade): 지정된 StockTrade를 실행 중인 통계에 삽입합니다.

  • toString(): 형식이 지정된 문자열로 통계를 반환합니다.

이 클래스는 각 주식의 총 거래 수와 최대 거래 수를 누적하여 가장 인기 있는 주식을 추적합니다. 그리고 주식 거래가 발생할 때마다 이러한 계수가 업데이트됩니다.

다음 단계에 표시된 대로 StockTradeRecordProcessor 클래스의 메서드에 코드를 추가합니다.

소비자를 구현하려면
  1. 정확한 크기의 processRecord 객체를 인스턴스화하고, 해당 객체에 레코드 데이터를 추가하고, 문제가 있는 경우 경고를 기록하여 StockTrade 메서드를 구현합니다.

    StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array()); if (trade == null) { LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey()); return; } stockStats.addStockTrade(trade);
  2. 간단한 reportStats 메서드를 구현합니다. 기본 설정에 대한 출력 형식을 자유롭게 수정합니다.

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. 마지막으로 resetStats 메서드를 구현합니다. 그러면 새 stockStats 인스턴스가 생성됩니다.

    stockStats = new StockStats();
소비자를 실행하려면
  1. 프로듀서 구현에서 작성한 생산자를 실행하여 시뮬레이션된 주식 거래 레코드를 스트림에 첨가합니다.

  2. 이전에 (IAM사용자를 생성할 때) 검색한 액세스 키와 비밀 키 쌍이 파일에 ~/.aws/credentials 저장되어 있는지 확인합니다.

  3. 다음과 같은 인수를 사용하여 StockTradesProcessor 클래스를 실행합니다.

    StockTradesProcessor StockTradeStream us-west-2

    us-west-2 이외의 리전에 스트림을 생성한 경우 여기에 해당 리전을 대신 지정해야 합니다.

1분 후 다음과 같은 출력이 표시되어야 하며, 그 이후로 매분마다 새로 고침됩니다.

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

소비자에 대한 추가 정보

KCL1.x 소비자 개발 및 다른 부분에서 설명하는 Kinesis Client Library의 이점을 잘 알고 있는 경우 여기에서 Kinesis Client Library를 사용해야 하는 이유에 대해 궁금할 수 있습니다. 단일 샤드 스트림과 단일 소비자 인스턴스만 사용하여 처리하지만 를 사용하여 소비자를 구현하는 것이 여전히 더 쉽습니다. KCL 생산자 단원의 코드 구현 단계를 소비자와 비교하면 소비자 구현이 비교적 쉽다는 것을 알 수 있습니다. 이는 주로 에서 KCL 제공하는 서비스에 기인합니다.

이 애플리케이션에서는 개별 레코드를 처리할 수 있는 레코드 프로세서 클래스를 구현하는 것에 중점을 둡니다. Kinesis Data Streams에서 레코드를 가져오는 방법에 대해 걱정할 필요가 없습니다. KCL 새 레코드가 있을 때마다 레코드를 가져오고 레코드 프로세서를 호출합니다. 또한 얼마나 많은 샤드와 소비자 인스턴스가 있는지에 대해서도 걱정할 필요가 없습니다. 스트림이 확장되면 둘 이상의 샤드 또는 소비자 인스턴스를 처리하기 위해 애플리케이션을 다시 작성할 필요가 없습니다.

체크포인팅이라는 용어는 지금까지 소비되고 처리된 데이터 레코드까지 스트림의 지점을 기록하는 것을 의미합니다. 애플리케이션이 충돌하는 경우 스트림의 시작 부분이 아니라 해당 지점부터 스트림을 읽습니다. 검사 주체와 다양한 디자인 패턴 및 이에 대한 모범 사례는 이 장의 범위를 벗어나지만, 프로덕션 환경에서 직면할 수 있는 사항입니다.

에서 프로듀서 구현 학습한 것처럼 Kinesis Data API Streams의 put 작업은 파티션 키를 입력으로 받습니다. Kinesis Data Streams는 파티션 키를 메커니즘으로 사용하여 레코드를 여러 샤드로 분할합니다(스트림에 샤드가 여러 개 있는 경우). 동일한 파티션 키는 항상 동일한 샤드에 라우팅됩니다. 이를 통해 특정 샤드를 처리하는 소비자는 동일한 파티션 키가 있는 레코드는 해당 소비자에게만 전송되며, 다른 소비자에 전송될 수 없다는 가정에 기반하여 설계할 수 있습니다. 따라서 소비자의 작업자는 필요한 데이터가 누락될 수 있다는 걱정 없이 동일한 파티션 키가 있는 모든 레코드를 집계할 수 있습니다.

이 애플리케이션에서는 소비자의 레코드 처리가 집중적이지 않으므로 샤드 하나를 사용하여 스레드와 동일한 스레드에서 처리를 수행할 수 있습니다. KCL 하지만 실제로 먼저 샤드 수를 확장하는 것을 고려해 보십시오. 일부 경우에는 처리를 다른 스레드로 전환하거나, 레코드 처리가 집약적으로 예상될 경우 스레드 풀을 사용할 수 있습니다. 이렇게 하면 다른 스레드가 레코드를 병렬로 처리하는 동안 에서 새 레코드를 더 빠르게 가져올 KCL 수 있습니다. 멀티스레드 설계는 간단하지 않으며 고급 기술을 사용하여 접근해야 하므로 일반적으로 샤드 수를 늘리는 것이 가장 효과적인 확장 방법입니다.

다음 단계

(선택 사항) 소비자 확대