

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Kembangkan konsumen dengan KCL
<a name="develop-kcl-consumers"></a>

Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi konsumen yang memproses data dari aliran data Kinesis Anda.

KCL tersedia dalam berbagai bahasa. Topik ini mencakup bagaimana mengembangkan konsumen KCL dalam bahasa Jawa dan non-Jawa.
+ [Untuk melihat referensi Javadoc Perpustakaan Klien Kinesis, lihat Javadoc Perpustakaan Klien Amazon Kinesis.](https://javadoc.io/doc/software.amazon.kinesis/amazon-kinesis-client/latest/index.html)
+ Untuk mengunduh KCL untuk Java dari GitHub, lihat [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) untuk Java.
+ Untuk menemukan KCL untuk Java di Apache Maven, lihat [KCL](https://central.sonatype.com/artifact/software.amazon.kinesis/amazon-kinesis-client) Maven Central Repository.

**Topics**
+ [Kembangkan konsumen dengan KCL di Jawa](develop-kcl-consumers-java.md)
+ [Mengembangkan konsumen dengan KCL dalam bahasa non-Jawa](develop-kcl-consumers-non-java.md)

# Kembangkan konsumen dengan KCL di Jawa
<a name="develop-kcl-consumers-java"></a>

## Prasyarat
<a name="develop-kcl-consumers-java-prerequisites"></a>

Sebelum Anda mulai menggunakan KCL 3.x, pastikan Anda memiliki yang berikut:
+ Java Development Kit (JDK) 8 atau lebih baru
+ AWS SDK untuk Java 2.x
+ Maven atau Gradle untuk manajemen ketergantungan

KCL mengumpulkan metrik pemanfaatan CPU seperti pemanfaatan CPU dari host komputasi yang dijalankan pekerja untuk menyeimbangkan beban untuk mencapai tingkat pemanfaatan sumber daya yang merata di seluruh pekerja. Untuk mengaktifkan KCL mengumpulkan metrik pemanfaatan CPU dari pekerja, Anda harus memenuhi prasyarat berikut:

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ Sistem operasi Anda harus OS Linux.
+ Anda harus mengaktifkan [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)di instans EC2 Anda.

 **Amazon Elastic Container Service (Amazon ECS) di Amazon EC2**
+ Sistem operasi Anda harus OS Linux.
+ Anda harus mengaktifkan titik [akhir metadata tugas ECS](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html) versi 4. 
+ Versi agen penampung Amazon ECS Anda harus 1.39.0 atau yang lebih baru.

 **Amazon ECS aktif AWS Fargate**
+ Anda harus mengaktifkan titik [akhir metadata tugas Fargate](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html) versi 4. Jika Anda menggunakan platform Fargate versi 1.4.0 atau yang lebih baru, ini diaktifkan secara default. 
+ Platform Fargate versi 1.4.0 atau yang lebih baru.

 **Layanan Amazon Elastic Kubernetes (Amazon EKS) di Amazon EC2** 
+ Sistem operasi Anda harus OS Linux.

 **Amazon EKS di AWS Fargate**
+ Platform Fargate 1.3.0 atau yang lebih baru.

**penting**  
Jika KCL tidak dapat mengumpulkan metrik pemanfaatan CPU dari pekerja, KCL akan kembali menggunakan throughput per pekerja untuk menetapkan sewa dan menyeimbangkan beban di seluruh pekerja di armada. Untuk informasi selengkapnya, lihat [Bagaimana KCL memberikan sewa kepada pekerja dan menyeimbangkan beban](kcl-dynamoDB.md#kcl-assign-leases).

## Instal dan tambahkan dependensi
<a name="develop-kcl-consumers-java-installation"></a>

Jika Anda menggunakan Maven, tambahkan dependensi berikut ke file Anda. `pom.xml` Pastikan Anda mengganti 3.xx ke versi KCL terbaru. 

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

Jika Anda menggunakan Gradle, tambahkan berikut ini ke `build.gradle` file Anda. Pastikan Anda mengganti 3.xx ke versi KCL terbaru. 

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

Anda dapat memeriksa versi terbaru KCL di Repositori Pusat [Maven](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client).

## Menerapkan konsumen
<a name="develop-kcl-consumers-java-implemetation"></a>

Aplikasi konsumen KCL terdiri dari komponen-komponen kunci berikut:

**Topics**
+ [RecordProcessor](#implementation-recordprocessor)
+ [RecordProcessorFactory](#implementation-recordprocessorfactory)
+ [Penjadwal](#implementation-scheduler)
+ [Aplikasi Konsumen Utama](#implementation-main)

### RecordProcessor
<a name="implementation-recordprocessor"></a>

RecordProcessor adalah komponen inti di mana logika bisnis Anda untuk memproses catatan aliran data Kinesis berada. Ini mendefinisikan bagaimana aplikasi Anda memproses data yang diterimanya dari aliran Kinesis.

Tanggung jawab utama:
+ Inisialisasi pemrosesan untuk pecahan
+ Memproses kumpulan catatan dari aliran Kinesis
+ Pemrosesan shutdown untuk pecahan (misalnya, ketika pecahan pecah atau bergabung, atau sewa diserahkan ke host lain)
+ Menangani checkpointing untuk melacak kemajuan

Berikut ini menunjukkan contoh implementasi:

```
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.*;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class SampleRecordProcessor implements ShardRecordProcessor {
    private static final String SHARD_ID_MDC_KEY = "ShardId";
    private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
    private String shardId;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.shardId();
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Processing {} record(s)", processRecordsInput.records().size());
            processRecordsInput.records().forEach(r -> 
                log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())
            );
            
            // Checkpoint periodically
            processRecordsInput.checkpointer().checkpoint();
        } catch (Throwable t) {
            log.error("Caught throwable while processing records. Aborting.", t);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Lost lease, so terminating.");
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Reached shard end checkpointing.");
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at shard end. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Scheduler is shutting down, checkpointing.");
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }
}
```

Berikut ini adalah penjelasan rinci dari setiap metode yang digunakan dalam contoh:

**inisialisasi (Inisialisasi InitializationInput Input)**
+ Tujuan: Siapkan sumber daya atau status yang diperlukan untuk memproses catatan.
+ Ketika disebut: Sekali, ketika KCL menetapkan pecahan ke prosesor rekaman ini.
+ Poin kunci:
  + `initializationInput.shardId()`: ID pecahan yang akan ditangani prosesor ini.
  + `initializationInput.extendedSequenceNumber()`: Nomor urut untuk mulai memproses dari.

**ProcessRecords () ProcessRecordsInput processRecordsInput**
+ Tujuan: Memproses catatan yang masuk dan kemajuan pos pemeriksaan opsional.
+ Ketika disebut: Berulang kali, selama prosesor rekaman memegang sewa untuk pecahan.
+ Poin kunci:
  + `processRecordsInput.records()`: Daftar catatan untuk diproses.
  + `processRecordsInput.checkpointer()`: Digunakan untuk memeriksa kemajuan.
  + Pastikan Anda menangani pengecualian apa pun selama pemrosesan untuk mencegah KCL gagal.
  + Metode ini harus idempoten, karena catatan yang sama dapat diproses lebih dari sekali dalam beberapa skenario, seperti data yang belum diperiksa sebelum pekerja yang tidak terduga mogok atau restart.
  + Selalu siram data buffer sebelum checkpointing untuk memastikan konsistensi data.

**LeaseLost () LeaseLostInput leaseLostInput**
+ Tujuan: Bersihkan sumber daya khusus untuk memproses pecahan ini.
+ Ketika itu disebut: Ketika Scheduler lain mengambil alih sewa untuk pecahan ini.
+ Poin kunci:
  + Checkpointing tidak diperbolehkan dalam metode ini.

**shardEnded () ShardEndedInput shardEndedInput**
+ Tujuan: Selesaikan pemrosesan untuk pecahan dan pos pemeriksaan ini.
+ Saat disebut: Ketika pecahan terbelah atau bergabung, menunjukkan semua data untuk pecahan ini telah diproses.
+ Poin kunci:
  + `shardEndedInput.checkpointer()`: Digunakan untuk melakukan pemeriksaan akhir.
  + Checkpointing dalam metode ini adalah wajib untuk menyelesaikan pemrosesan.
  + Gagal menyiram data dan pos pemeriksaan di sini dapat mengakibatkan hilangnya data atau pemrosesan duplikat saat pecahan dibuka kembali.

**ShutdownRequested () ShutdownRequestedInput shutdownRequestedInput**
+ Tujuan: Pos pemeriksaan dan bersihkan sumber daya saat KCL dimatikan.
+ Ketika dipanggil: Ketika KCL dimatikan, misalnya, ketika aplikasi dihentikan).
+ Poin kunci:
  + `shutdownRequestedInput.checkpointer()`: Digunakan untuk melakukan checkpointing sebelum shutdown.
  + Pastikan Anda menerapkan checkpointing dalam metode sehingga kemajuan disimpan sebelum aplikasi berhenti.
  + Kegagalan untuk menyiram data dan pos pemeriksaan di sini dapat mengakibatkan hilangnya data atau pemrosesan ulang catatan saat aplikasi dimulai ulang.

**penting**  
KCL 3.x memastikan lebih sedikit pemrosesan ulang data saat sewa diserahkan dari satu pekerja ke pekerja lain dengan pos pemeriksaan sebelum pekerja sebelumnya dimatikan. Jika Anda tidak menerapkan logika checkpointing dalam `shutdownRequested()` metode, Anda tidak akan melihat manfaat ini. Pastikan Anda telah menerapkan logika checkpointing di dalam metode. `shutdownRequested()`

### RecordProcessorFactory
<a name="implementation-recordprocessorfactory"></a>

RecordProcessorFactory bertanggung jawab untuk membuat RecordProcessor instance baru. KCL menggunakan pabrik ini untuk membuat yang baru RecordProcessor untuk setiap pecahan yang perlu diproses aplikasi.

Tanggung jawab utama:
+ Buat RecordProcessor instance baru sesuai permintaan
+ Pastikan masing-masing RecordProcessor diinisialisasi dengan benar

Berikut ini adalah contoh implementasi:

```
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new SampleRecordProcessor();
    }
}
```

Dalam contoh ini, pabrik membuat yang baru SampleRecordProcessor setiap kali shardRecordProcessor () dipanggil. Anda dapat memperluas ini untuk memasukkan logika inisialisasi yang diperlukan.

### Penjadwal
<a name="implementation-scheduler"></a>

Scheduler adalah komponen tingkat tinggi yang mengoordinasikan semua aktivitas aplikasi KCL. Ini bertanggung jawab atas orkestrasi keseluruhan pemrosesan data.

Tanggung jawab utama:
+ Mengelola siklus hidup RecordProcessors
+ Menangani manajemen sewa untuk pecahan
+ Mengkoordinasikan pos pemeriksaan
+ Menyeimbangkan beban pemrosesan pecahan di beberapa pekerja aplikasi Anda
+ Menangani sinyal shutdown dan penghentian aplikasi yang anggun

Scheduler biasanya dibuat dan dimulai di Aplikasi Utama. Anda dapat memeriksa contoh implementasi Scheduler di bagian berikut, Aplikasi Konsumen Utama. 

### Aplikasi Konsumen Utama
<a name="implementation-main"></a>

Aplikasi Konsumen Utama mengikat semua komponen bersama-sama. Ini bertanggung jawab untuk menyiapkan konsumen KCL, menciptakan klien yang diperlukan, mengonfigurasi Scheduler, dan mengelola siklus hidup aplikasi.

Tanggung jawab utama:
+ Mengatur klien AWS layanan (Kinesis, DynamoDB,) CloudWatch
+ Konfigurasikan aplikasi KCL
+ Buat dan mulai Scheduler
+ Menangani shutdown aplikasi

Berikut ini adalah contoh implementasi:

```
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import java.util.UUID;

public class SampleConsumer {
    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    public SampleConsumer(String streamName, Region region) {
        this.streamName = streamName;
        this.region = region;
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    public void run() {
        DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        
        ConfigsBuilder configsBuilder = new ConfigsBuilder(
            streamName, 
            streamName, 
            kinesisClient, 
            dynamoDbAsyncClient,
            cloudWatchClient, 
            UUID.randomUUID().toString(), 
            new SampleRecordProcessorFactory()
        );

        Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

    public static void main(String[] args) {
        String streamName = "your-stream-name"; // replace with your stream name
        Region region = Region.US_EAST_1; // replace with your region
        new SampleConsumer(streamName, region).run();
    }
}
```

 KCL menciptakan konsumen Enhanced Fan-out (EFO) dengan throughput khusus secara default. Untuk informasi selengkapnya tentang Enhanced Fan-out, lihat. [Kembangkan konsumen fan-out yang ditingkatkan dengan throughput khusus](enhanced-consumers.md) Jika Anda memiliki kurang dari 2 konsumen atau tidak memerlukan penundaan propagasi baca di bawah 200 ms, Anda harus mengatur konfigurasi berikut di objek scheduler untuk menggunakan konsumen throughput bersama:

```
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
```

Kode berikut adalah contoh pembuatan objek penjadwal yang menggunakan konsumen throughput bersama:

**Impor**:

```
import software.amazon.kinesis.retrieval.polling.PollingConfig;
```

**Kode**:

```
Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );/
```

# Mengembangkan konsumen dengan KCL dalam bahasa non-Jawa
<a name="develop-kcl-consumers-non-java"></a>

Bagian ini mencakup implementasi konsumen menggunakan Kinesis Client Library (KCL) di Python, Node.js, .NET, dan Ruby.

KCL adalah perpustakaan Java. Support untuk bahasa selain Java disediakan menggunakan antarmuka multi-bahasa yang disebut. `MultiLangDaemon` Daemon ini berbasis Java dan berjalan di latar belakang saat Anda menggunakan KCL dengan bahasa selain Java. Oleh karena itu, jika Anda menginstal KCL untuk bahasa non-Java dan menulis aplikasi konsumen Anda sepenuhnya dalam bahasa non-Java, Anda masih memerlukan Java diinstal pada sistem Anda karena itu. `MultiLangDaemon` Selanjutnya, `MultiLangDaemon` memiliki beberapa pengaturan default yang mungkin perlu Anda sesuaikan untuk kasus penggunaan Anda (misalnya, wilayah AWS yang terhubung dengannya). Untuk informasi selengkapnya tentang `MultiLangDaemon` on GitHub, lihat [ MultiLangDaemon proyek KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Sementara konsep inti tetap sama di seluruh bahasa, ada beberapa pertimbangan dan implementasi khusus bahasa. Untuk konsep inti tentang pengembangan konsumen KCL, lihat[Kembangkan konsumen dengan KCL di Jawa](develop-kcl-consumers-java.md). Untuk informasi lebih rinci tentang cara mengembangkan konsumen KCL dengan Python, Node.js, .NET, dan Ruby dan pembaruan terbaru, silakan merujuk ke repositori berikut: GitHub 
+ Python: [amazon-kinesis-client-python](https://github.com/awslabs/amazon-kinesis-client-python)
+ Node.js: [amazon-kinesis-client-nodejs](https://github.com/awslabs/amazon-kinesis-client-nodejs)
+ .NET: [amazon-kinesis-client-net](https://github.com/awslabs/amazon-kinesis-client-net)
+ Ruby: [amazon-kinesis-client-ruby](https://github.com/awslabs/amazon-kinesis-client-ruby)

**penting**  
Jangan gunakan versi pustaka KCL non-Java berikut jika Anda menggunakan JDK 8. Versi ini berisi dependensi (logback) yang tidak kompatibel dengan JDK 8.  
KCL Python 3.0.2 dan 2.2.0
KCL Node.js 2.3.0
KCL .NET 3.1.0
KCL Ruby 2.2.0
Sebaiknya gunakan versi yang dirilis sebelum atau sesudah versi yang terpengaruh ini saat bekerja dengan JDK 8.