

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Menggunakan AWS Lambda dengan Amazon DynamoDB
<a name="with-ddb"></a>

**catatan**  
[Jika Anda ingin mengirim data ke target selain fungsi Lambda atau memperkaya data sebelum mengirimnya, lihat Amazon Pipes. EventBridge ](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)

Anda dapat menggunakan AWS Lambda fungsi untuk memproses catatan dalam aliran [Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html). Dengan DynamoDB Streams, Anda dapat memicu fungsi Lambda untuk melakukan pekerjaan tambahan setiap kali tabel DynamoDB diperbarui.

Saat memproses aliran DynamoDB, Anda perlu menerapkan logika respons batch sebagian untuk mencegah catatan yang berhasil diproses dicoba ulang ketika beberapa catatan dalam batch gagal. [Utilitas Batch Processor](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) dari Powertools for AWS Lambda tersedia dalam Python TypeScript,, .NET, dan Java dan menyederhanakan implementasi ini dengan secara otomatis menangani logika respons batch sebagian, mengurangi waktu pengembangan dan meningkatkan keandalan.

**Topics**
+ [Polling dan batching stream](#dynamodb-polling-and-batching)
+ [Posisi awal polling dan streaming](#dyanmo-db-stream-poll)
+ [Pembaca serpihan secara bersamaan di DynamoDB Streams](#events-dynamodb-simultaneous-readers)
+ [Contoh peristiwa](#events-sample-dynamodb)
+ [Memproses catatan DynamoDB dengan Lambda](services-dynamodb-eventsourcemapping.md)
+ [Mengkonfigurasi respons batch sebagian dengan DynamoDB dan Lambda](services-ddb-batchfailurereporting.md)
+ [Menyimpan catatan yang dibuang untuk sumber peristiwa DynamoDB di Lambda](services-dynamodb-errors.md)
+ [Menerapkan pemrosesan aliran DynamoDB stateful di Lambda](services-ddb-windows.md)
+ [Parameter Lambda untuk pemetaan sumber peristiwa Amazon DynamoDB](services-ddb-params.md)
+ [Menggunakan pemfilteran acara dengan sumber acara DynamoDB](with-ddb-filtering.md)
+ [Tutorial: Menggunakan AWS Lambda dengan aliran Amazon DynamoDB](with-ddb-example.md)

## Polling dan batching stream
<a name="dynamodb-polling-and-batching"></a>

Lambda melakukan polling shard dalam aliran DynamoDB untuk catatan dengan tingkat dasar sebanyak 4 kali per detik. Saat rekaman tersedia, Lambda memanggil fungsi Anda dan menunggu hasilnya. Jika pemrosesan berhasil, Lambda melanjutkan polling sampai menerima lebih banyak catatan.

Secara default, Lambda memanggil fungsi Anda segera setelah catatan tersedia. Jika batch yang dibaca Lambda dari sumber peristiwa hanya memiliki satu catatan di dalamnya, Lambda hanya mengirimkan satu catatan ke fungsi tersebut. *Untuk menghindari menjalankan fungsi dengan sejumlah kecil catatan, Anda dapat memberi tahu sumber acara untuk menyangga catatan hingga 5 menit dengan mengonfigurasi jendela batching.* Sebelum menjalankan fungsi, Lambda terus membaca catatan dari sumber acara hingga mengumpulkan batch penuh, jendela batching kedaluwarsa, atau batch mencapai batas muatan 6 MB. Untuk informasi selengkapnya, lihat [Perilaku batching](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching).

**Awas**  
Pemetaan sumber peristiwa Lambda memproses setiap peristiwa setidaknya sekali, dan pemrosesan duplikat catatan dapat terjadi. Untuk menghindari potensi masalah yang terkait dengan duplikat peristiwa, kami sangat menyarankan agar Anda membuat kode fungsi Anda idempoten. Untuk mempelajari lebih lanjut, lihat [Bagaimana cara membuat fungsi Lambda saya idempoten](https://repost.aws/knowledge-center/lambda-function-idempotent) di Pusat Pengetahuan. AWS 

Lambda tidak menunggu [ekstensi](lambda-extensions.md) yang dikonfigurasi selesai sebelum mengirim batch berikutnya untuk diproses. Dengan kata lain, ekstensi Anda dapat terus berjalan saat Lambda memproses kumpulan catatan berikutnya. Hal ini dapat menyebabkan masalah pembatasan jika Anda melanggar pengaturan atau [batasan konkurensi](lambda-concurrency.md) akun Anda. Untuk mendeteksi apakah ini merupakan masalah potensial, pantau fungsi Anda dan periksa apakah Anda melihat [metrik konkurensi](monitoring-concurrency.md#general-concurrency-metrics) yang lebih tinggi dari yang diharapkan untuk pemetaan sumber peristiwa Anda. Karena waktu yang singkat di antara pemanggilan, Lambda mungkin secara singkat melaporkan penggunaan konkurensi yang lebih tinggi daripada jumlah pecahan. Ini bisa benar bahkan untuk fungsi Lambda tanpa ekstensi.

Konfigurasikan [ ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor)pengaturan untuk memproses satu pecahan aliran DynamoDB dengan lebih dari satu pemanggilan Lambda secara bersamaan. Anda dapat menentukan jumlah batch bersamaan yang polling-nya dibuat Lambda dari shard melalui faktor paralelisasi mulai dari 1 (default) hingga 10. Misalnya, saat Anda menyetel `ParallelizationFactor` ke 2, Anda dapat memiliki maksimum 200 pemanggilan Lambda bersamaan untuk memproses 100 pecahan aliran DynamoDB (meskipun dalam praktiknya, Anda mungkin melihat nilai yang berbeda untuk metrik). `ConcurrentExecutions` Hal ini membantu meningkatkan skala throughput pemrosesan ketika volume data tidak stabil dan [IteratorAge](monitoring-metrics-types.md#performance-metrics) tinggi. Saat Anda meningkatkan jumlah batch bersamaan per pecahan, Lambda masih memastikan pemrosesan dalam urutan pada level item (partisi dan kunci sortir).

## Posisi awal polling dan streaming
<a name="dyanmo-db-stream-poll"></a>

Ketahuilah bahwa polling streaming selama pembuatan dan pembaruan pemetaan sumber acara pada akhirnya konsisten.
+ Selama pembuatan pemetaan sumber acara, mungkin diperlukan beberapa menit untuk memulai acara polling dari aliran.
+ Selama pembaruan pemetaan sumber acara, mungkin diperlukan beberapa menit untuk menghentikan dan memulai kembali acara pemungutan suara dari aliran.

Perilaku ini berarti bahwa jika Anda menentukan `LATEST` sebagai posisi awal untuk aliran, pemetaan sumber peristiwa dapat melewatkan peristiwa selama pembuatan atau pembaruan. Untuk memastikan tidak ada peristiwa yang terlewatkan, tentukan posisi awal aliran sebagai`TRIM_HORIZON`.

## Pembaca serpihan secara bersamaan di DynamoDB Streams
<a name="events-dynamodb-simultaneous-readers"></a>

Untuk tabel Single-region yang bukan tabel global, Anda dapat mendesain hingga dua fungsi Lambda untuk dibaca dari pecahan DynamoDB Streams yang sama secara bersamaan. Melebihi batas ini dapat mengakibatkan throttling permintaan. Untuk tabel global, kami sarankan Anda membatasi jumlah fungsi simultan menjadi satu untuk menghindari pembatasan permintaan.

## Contoh peristiwa
<a name="events-sample-dynamodb"></a>

**Example**  

```
{
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "2",
      "eventVersion": "1.0",
      "dynamodb": {
        "OldImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "SequenceNumber": "222",
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 59,
        "NewImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "MODIFY",
      "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525",
      "eventSource": "aws:dynamodb"
    }
  ]}
```

# Memproses catatan DynamoDB dengan Lambda
<a name="services-dynamodb-eventsourcemapping"></a>

Buat pemetaan sumber kejadian untuk memberi tahu Lambda agar mengirim rekaman dari aliran Anda ke fungsi Lambda. Anda dapat membuat beberapa pemetaan sumber kejadian untuk memproses data yang sama dengan beberapa fungsi Lambda, atau untuk memproses item dari beberapa aliran dengan satu fungsi.

Anda dapat mengonfigurasi pemetaan sumber peristiwa untuk memproses catatan dari aliran yang berbeda. Akun AWS Untuk mempelajari selengkapnya, lihat [Membuat pemetaan sumber peristiwa lintas akun](#services-dynamodb-eventsourcemapping-cross-account).

**Untuk mengonfigurasi fungsi agar dibaca dari DynamoDB Streams, lampirkan AWSLambda kebijakan terkelola AWS Peran [DBExecutionDynamo ke peran eksekusi](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaDynamoDBExecutionRole.html), lalu buat pemicu DynamoDB.**

**Untuk menambahkan izin dan membuat pemicu**

1. Buka [Halaman fungsi](https://console.aws.amazon.com/lambda/home#/functions) di konsol Lambda.

1. Pilih nama sebuah fungsi.

1. Pilih tab **Konfigurasi**, lalu pilih **Izin**.

1. Di bawah **Nama peran**, pilih tautan ke peran eksekusi Anda. Tautan ini membuka peran di konsol IAM.  
![\[\]](http://docs.aws.amazon.com/id_id/lambda/latest/dg/images/execution-role.png)

1. Pilih **Tambahkan izin**, lalu pilih **Lampirkan kebijakan**.  
![\[\]](http://docs.aws.amazon.com/id_id/lambda/latest/dg/images/attach-policies.png)

1. Di bidang pencarian, masukkan`AWSLambdaDynamoDBExecutionRole`. Tambahkan kebijakan ini ke peran eksekusi Anda. Ini adalah kebijakan AWS terkelola yang berisi izin yang perlu dibaca fungsi Anda dari aliran DynamoDB. Untuk informasi selengkapnya tentang kebijakan ini, lihat [DBExecutionPeran AWSLambda Dynamo](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaDynamoDBExecutionRole.html) di *Referensi Kebijakan AWS Terkelola*.

1. Kembali ke fungsi Anda di konsol Lambda. Di bagian **Gambaran umum fungsi**, pilih **Tambah pemicu**.  
![\[\]](http://docs.aws.amazon.com/id_id/lambda/latest/dg/images/add-trigger.png)

1. Pilih jenis pemicu.

1. Konfigurasikan opsi yang diperlukan, lalu pilih **Tambah**.

Lambda mendukung opsi berikut untuk sumber acara DynamoDB:

**Opsi sumber kejadian**
+ **Tabel DynamoDB** – Tabel DynamoDB tempat untuk membaca catatan.
+ **Ukuran batch** – Jumlah rekaman yang akan dikirimkan ke fungsi dalam setiap batch, paling banyak 10.000. Lambda meneruskan semua rekaman dalam batch ke fungsi dalam satu panggilan, selama ukuran total kejadian tidak melebihi [batas payload](gettingstarted-limits.md) untuk invokasi sinkron (6 MB).
+ **Jendela batch** – Tentukan jumlah waktu maksimum untuk mengumpulkan rekaman sebelum memanggil fungsi, dalam hitungan detik.
+ **Posisi mulai** – Hanya memproses rekaman baru, atau semua rekaman yang ada.
  + **Terbaru** – Memproses rekaman baru yang ditambahkan ke stream.
  + **Trim horizon** – Memproses semua rekaman dalam aliran.

  Setelah memproses rekaman yang sudah ada, fungsi berhenti dan melanjutkan memproses rekaman baru.
+ **Tujuan kegagalan** — Antrian SQS standar atau topik SNS standar untuk catatan yang tidak dapat diproses. Ketika Lambda membuang sekumpulan catatan yang terlalu tua atau telah kehabisan semua percobaan ulang, Lambda mengirimkan detail tentang batch ke antrian atau topik.
+ **Percobaan ulang** – Jumlah maksimum percobaan ulang Lambda saat fungsi memunculkan kesalahan. Hal ini tidak berlaku untuk kesalahan layanan atau pembatasan di mana batch tidak mencapai fungsi.
+ **Maximum age of record** – Usia maksimum rekaman yang dikirimkan Lambda ke fungsi Anda.
+ **Split batch on error** – Ketika fungsi mengembalikan kesalahan, bagi batch menjadi dua bagian sebelum mencoba kembali. Pengaturan ukuran batch asli Anda tetap tidak berubah.
+ **Batch bersamaan per pecahan** — Secara bersamaan memproses beberapa batch dari pecahan yang sama.
+ **Diaktifkan** – Atur ke true untuk mengaktifkan pemetaan sumber kejadian. Atur ke false untuk menghentikan pemrosesan rekaman. Lambda mencatat rekaman terakhir yang diproses dan melanjutkan pemrosesan dari titik tersebut saat pemetaan diaktifkan kembali.

**catatan**  
Anda tidak dikenakan biaya untuk panggilan GetRecords API yang dipanggil oleh Lambda sebagai bagian dari pemicu DynamoDB.

Untuk mengelola konfigurasi sumber kejadian nanti, pilih pemicu di desainer.

## Membuat pemetaan sumber peristiwa lintas akun
<a name="services-dynamodb-eventsourcemapping-cross-account"></a>

[Amazon DynamoDB sekarang mendukung kebijakan berbasis sumber daya.](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/access-control-resource-based.html) Dengan kemampuan ini, Anda dapat memproses data dari aliran DynamoDB menjadi satu Akun AWS dengan fungsi Lambda di akun lain.

Untuk membuat pemetaan sumber peristiwa untuk fungsi Lambda menggunakan aliran DynamoDB di tempat Akun AWS lain, Anda harus mengonfigurasi aliran menggunakan kebijakan berbasis sumber daya untuk memberikan izin fungsi Lambda Anda untuk membaca catatan. *Untuk mempelajari cara mengonfigurasi streaming untuk akses lintas akun, lihat [Berbagi akses dengan fungsi Lambda lintas](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/rbac-cross-account-access.html#rbac-analyze-cross-account-lambda-access) akun di Panduan Pengembang Amazon DynamoDB.*

Setelah mengonfigurasi streaming Anda dengan kebijakan berbasis sumber daya yang memberi fungsi Lambda Anda izin yang diperlukan, buat pemetaan sumber peristiwa dengan ARN aliran lintas akun Anda. Anda dapat menemukan ARN streaming di bawah tab **Ekspor dan aliran** tabel di konsol DynamoDB lintas akun. 

Saat menggunakan konsol Lambda, rekatkan ARN aliran langsung ke bidang input tabel DynamoDB di halaman pembuatan pemetaan sumber peristiwa.

 **Catatan:** Pemicu lintas wilayah tidak didukung. 

# Mengkonfigurasi respons batch sebagian dengan DynamoDB dan Lambda
<a name="services-ddb-batchfailurereporting"></a>

Ketika menggunakan dan memproses data streaming dari sumber peristiwa, secara default Lambda memeriksa urutan nomor tertinggi batch hanya ketika batch berhasil diselesaikan. Lambda memperlakukan semua hasil lain sebagai sepenuhnya gagal dan mencoba lagi pemrosesan batch hingga batas coba lagi. Untuk memungkinkan keberhasilan parsial saat memproses batch dari aliran, aktifkan `ReportBatchItemFailures`. Mengizinkan keberhasilan parsial dapat membantu mengurangi jumlah percobaan ulang pada catatan, meskipun tidak sepenuhnya mencegah kemungkinan percobaan ulang dalam catatan yang sukses.

Untuk mengaktifkan `ReportBatchItemFailures`, masukkan nilai enum **ReportBatchItemFailures** dalam daftar [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes). Daftar ini menunjukkan tipe respon yang diaktifkan untuk fungsi Anda. Anda dapat mengonfigurasi daftar ini saat [membuat](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) atau [memperbarui](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) pemetaan sumber peristiwa.

**catatan**  
Bahkan ketika kode fungsi Anda mengembalikan respons kegagalan sebagian batch, respons ini tidak akan diproses oleh Lambda kecuali `ReportBatchItemFailures` fitur tersebut diaktifkan secara eksplisit untuk pemetaan sumber acara Anda.

## Sintaks laporan
<a name="streams-batchfailurereporting-syntax"></a>

Saat mengonfigurasi pelaporan pada kegagalan item batch, kelas `StreamsEventResponse` dikembalikan dengan daftar kegagalan item batch. Anda dapat menggunakan objek `StreamsEventResponse` untuk mengembalikan nomor urutan catatan gagal pertama dalam batch. Anda juga dapat membuat kelas kustom Anda sendiri menggunakan sintaks respons yang benar. Struktur JSON berikut menunjukkan sintaks respons yang diperlukan:

```
{ 
  "batchItemFailures": [ 
        {
            "itemIdentifier": "<SequenceNumber>"
        }
    ]
}
```

**catatan**  
Jika `batchItemFailures` array berisi beberapa item, Lambda menggunakan catatan dengan nomor urut terendah sebagai pos pemeriksaan. Lambda kemudian mencoba kembali semua catatan mulai dari pos pemeriksaan itu.

## Status berhasil dan gagal
<a name="streams-batchfailurereporting-conditions"></a>

Lambda memperlakukan batch sebagai sepenuhnya berhasil jika Anda mengembalikan salah satu dari berikut:
+ Daftar `batchItemFailure` kosong
+ Daftar `batchItemFailure` nol
+ `EventResponse` kosong
+ `EventResponse` nol

Lambda memperlakukan batch sebagai sepenuhnya gagal jika Anda mengembalikan salah satu dari berikut:
+ String `itemIdentifier` kosong
+ `itemIdentifier` nol
+ `itemIdentifier` dengan nama kunci yang buruk

Lambda mencoba kembali kegagalan berdasarkan strategi coba lagi Anda.

## Membagi batch
<a name="streams-batchfailurereporting-bisect"></a>

Jika invokasi Anda gagal dan `BisectBatchOnFunctionError` diaktifkan, batch dibagi terlepas dari pengaturan `ReportBatchItemFailures` Anda.

Ketika respons berhasil batch parsial diterima dan kedua `BisectBatchOnFunctionError` dan `ReportBatchItemFailures` diaktifkan, batch dibagi dua di nomor urutan yang dikembalikan dan Lambda mencoba hanya catatan yang tersisa.

Untuk menyederhanakan implementasi logika respons batch sebagian, pertimbangkan untuk menggunakan [utilitas Prosesor Batch](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) dari Powertools untuk AWS Lambda, yang secara otomatis menangani kompleksitas ini untuk Anda.

Berikut adalah beberapa contoh kode fungsi yang mengembalikan daftar pesan gagal IDs dalam batch:

------
#### [ .NET ]

**SDK untuk .NET**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Melaporkan kegagalan item batch DynamoDB dengan Lambda menggunakan.NET.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda_DDB;

public class Function
{
    public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)

    {
        context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");
        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>();
        StreamsEventResponse streamsEventResponse = new StreamsEventResponse();

        foreach (var record in dynamoEvent.Records)
        {
            try
            {
                var sequenceNumber = record.Dynamodb.SequenceNumber;
                context.Logger.LogInformation(sequenceNumber);
            }
            catch (Exception ex)
            {
                context.Logger.LogError(ex.Message);
                batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber });
            }
        }

        if (batchItemFailures.Count > 0)
        {
            streamsEventResponse.BatchItemFailures = batchItemFailures;
        }

        context.Logger.LogInformation("Stream processing complete.");
        return streamsEventResponse;
    }
}
```

------
#### [ Go ]

**SDK untuk Go V2**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Melaporkan kegagalan item batch DynamoDB dengan Lambda menggunakan Go.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

type BatchItemFailure struct {
	ItemIdentifier string `json:"ItemIdentifier"`
}

type BatchResult struct {
	BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"`
}

func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) {
	var batchItemFailures []BatchItemFailure
	curRecordSequenceNumber := ""

	for _, record := range event.Records {
		// Process your record
		curRecordSequenceNumber = record.Change.SequenceNumber
	}

	if curRecordSequenceNumber != "" {
		batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber})
	}
	
	batchResult := BatchResult{
		BatchItemFailures: batchItemFailures,
	}

	return &batchResult, nil
}

func main() {
	lambda.Start(HandleRequest)
}
```

------
#### [ Java ]

**SDK untuk Java 2.x**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Melaporkan kegagalan item batch DynamoDB dengan Lambda menggunakan Java.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord;

import java.util.ArrayList;
import java.util.List;

public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, StreamsEventResponse> {

    @Override
    public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) {

        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
        String curRecordSequenceNumber = "";

        for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) {
          try {
                //Process your record
                StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb();
                curRecordSequenceNumber = dynamodbRecord.getSequenceNumber();
                
            } catch (Exception e) {
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
                return new StreamsEventResponse(batchItemFailures);
            }
        }
       
       return new StreamsEventResponse();   
    }
}
```

------
#### [ JavaScript ]

**SDK untuk JavaScript (v3)**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Melaporkan kegagalan item batch DynamoDB dengan penggunaan Lambda. JavaScript  

```
export const handler = async (event) => {
  const records = event.Records;
  let curRecordSequenceNumber = "";

  for (const record of records) {
    try {
      // Process your record
      curRecordSequenceNumber = record.dynamodb.SequenceNumber;
    } catch (e) {
      // Return failed record's sequence number
      return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] };
    }
  }

  return { batchItemFailures: [] };
};
```
Melaporkan kegagalan item batch DynamoDB dengan penggunaan Lambda. TypeScript  

```
import {
  DynamoDBBatchResponse,
  DynamoDBBatchItemFailure,
  DynamoDBStreamEvent,
} from "aws-lambda";

export const handler = async (
  event: DynamoDBStreamEvent
): Promise<DynamoDBBatchResponse> => {
  const batchItemFailures: DynamoDBBatchItemFailure[] = [];
  let curRecordSequenceNumber;

  for (const record of event.Records) {
    curRecordSequenceNumber = record.dynamodb?.SequenceNumber;

    if (curRecordSequenceNumber) {
      batchItemFailures.push({
        itemIdentifier: curRecordSequenceNumber,
      });
    }
  }

  return { batchItemFailures: batchItemFailures };
};
```

------
#### [ PHP ]

**SDK untuk PHP**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Melaporkan kegagalan item batch DynamoDB dengan Lambda menggunakan PHP.  

```
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): array
    {
        $dynamoDbEvent = new DynamoDbEvent($event);
        $this->logger->info("Processing records");

        $records = $dynamoDbEvent->getRecords();
        $failedRecords = [];
        foreach ($records as $record) {
            try {
                $data = $record->getData();
                $this->logger->info(json_encode($data));
                // TODO: Do interesting work based on the new data
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
                // failed processing the record
                $failedRecords[] = $record->getSequenceNumber();
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");

        // change format for the response
        $failures = array_map(
            fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
            $failedRecords
        );

        return [
            'batchItemFailures' => $failures
        ];
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK untuk Python (Boto3)**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Melaporkan kegagalan item batch DynamoDB dengan Lambda menggunakan Python.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
    records = event.get("Records")
    curRecordSequenceNumber = ""
    
    for record in records:
        try:
            # Process your record
            curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"]
        except Exception as e:
            # Return failed record's sequence number
            return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}

    return {"batchItemFailures":[]}
```

------
#### [ Ruby ]

**SDK untuk Ruby**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Melaporkan kegagalan item batch DynamoDB dengan Lambda menggunakan Ruby.  

```
def lambda_handler(event:, context:)
    records = event["Records"]
    cur_record_sequence_number = ""
  
    records.each do |record|
      begin
        # Process your record
        cur_record_sequence_number = record["dynamodb"]["SequenceNumber"]
      rescue StandardError => e
        # Return failed record's sequence number
        return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]}
      end
    end
  
    {"batchItemFailures" => []}
  end
```

------
#### [ Rust ]

**SDK for Rust**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Melaporkan kegagalan item batch DynamoDB dengan Lambda menggunakan Rust.  

```
use aws_lambda_events::{
    event::dynamodb::{Event, EventRecord, StreamRecord},
    streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

/// Process the stream record
fn process_record(record: &EventRecord) -> Result<(), Error> {
    let stream_record: &StreamRecord = &record.change;

    // process your stream record here...
    tracing::info!("Data: {:?}", stream_record);

    Ok(())
}

/// Main Lambda handler here...
async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> {
    let mut response = DynamoDbEventResponse {
        batch_item_failures: vec![],
    };

    let records = &event.payload.records;

    if records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(response);
    }

    for record in records {
        tracing::info!("EventId: {}", record.event_id);

        // Couldn't find a sequence number
        if record.change.sequence_number.is_none() {
            response.batch_item_failures.push(DynamoDbBatchItemFailure {
                item_identifier: Some("".to_string()),
            });
            return Ok(response);
        }

        // Process your record here...
        if process_record(record).is_err() {
            response.batch_item_failures.push(DynamoDbBatchItemFailure {
                item_identifier: record.change.sequence_number.clone(),
            });
            /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
            return Ok(response);
        }
    }

    tracing::info!("Successfully processed {} record(s)", records.len());

    Ok(response)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

## Menggunakan Powertools untuk prosesor AWS Lambda batch
<a name="services-ddb-batchfailurereporting-powertools"></a>

Utilitas prosesor batch dari Powertools untuk AWS Lambda secara otomatis menangani logika respons batch paral, mengurangi kompleksitas penerapan pelaporan kegagalan batch. Berikut adalah contoh menggunakan prosesor batch:

**Python**  
Untuk contoh lengkap dan petunjuk penyiapan, lihat [dokumentasi prosesor batch](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/).
Memproses rekaman AWS Lambda aliran DynamoDB dengan prosesor batch.  

```
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
logger = Logger()

def record_handler(record):
    logger.info(record)
    # Your business logic here
    # Raise an exception to mark this record as failed
    
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event, 
        record_handler=record_handler, 
        processor=processor,
        context=context
    )
```

**TypeScript**  
Untuk contoh lengkap dan petunjuk penyiapan, lihat [dokumentasi prosesor batch](https://docs.aws.amazon.com/powertools/typescript/latest/features/batch/).
Memproses rekaman AWS Lambda aliran DynamoDB dengan prosesor batch.  

```
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { DynamoDBStreamEvent, Context } from 'aws-lambda';

const processor = new BatchProcessor(EventType.DynamoDBStreams);
const logger = new Logger();

const recordHandler = async (record: any): Promise<void> => {
    logger.info('Processing record', { record });
    // Your business logic here
    // Throw an error to mark this record as failed
};

export const handler = async (event: DynamoDBStreamEvent, context: Context) => {
    return processPartialResponse(event, recordHandler, processor, {
        context,
    });
};
```

**Java**  
Untuk contoh lengkap dan petunjuk penyiapan, lihat [dokumentasi prosesor batch](https://docs.powertools.aws.dev/lambda/java/latest/utilities/batch/).
Memproses rekaman AWS Lambda aliran DynamoDB dengan prosesor batch.  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

public class DynamoDBStreamBatchHandler implements RequestHandler<DynamodbEvent, StreamsEventResponse> {

    private final BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler;

    public DynamoDBStreamBatchHandler() {
        handler = new BatchMessageHandlerBuilder()
                .withDynamoDbBatchHandler()
                .buildWithRawMessageHandler(this::processMessage);
    }

    @Override
    public StreamsEventResponse handleRequest(DynamodbEvent ddbEvent, Context context) {
        return handler.processBatch(ddbEvent, context);
    }

    private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord, Context context) {
        // Process the change record
    }
}
```

**.NET**  
Untuk contoh lengkap dan petunjuk penyiapan, lihat [dokumentasi prosesor batch](https://docs.aws.amazon.com/powertools/dotnet/utilities/batch-processing/).
Memproses rekaman AWS Lambda aliran DynamoDB dengan prosesor batch.  

```
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;

[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]

namespace HelloWorld;

public class Customer
{
    public string? CustomerId { get; set; }
    public string? Name { get; set; }
    public string? Email { get; set; }
    public DateTime CreatedAt { get; set; }
}

internal class TypedDynamoDbRecordHandler : ITypedRecordHandler<Customer> 
{
    public async Task<RecordHandlerResult> HandleAsync(Customer customer, CancellationToken cancellationToken)
    {
        if (string.IsNullOrEmpty(customer.Email)) 
        {
            throw new ArgumentException("Customer email is required");
        }

        return await Task.FromResult(RecordHandlerResult.None); 
    }
}

public class Function
{
    [BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))]
    public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _)
    {
        return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; 
    }
}
```

# Menyimpan catatan yang dibuang untuk sumber peristiwa DynamoDB di Lambda
<a name="services-dynamodb-errors"></a>

Penanganan kesalahan untuk pemetaan sumber peristiwa DynamoDB tergantung pada apakah kesalahan terjadi sebelum fungsi dipanggil atau selama pemanggilan fungsi:
+ **Sebelum pemanggilan:** Jika pemetaan sumber peristiwa Lambda tidak dapat menjalankan fungsi karena pelambatan atau masalah lain, ia akan mencoba lagi hingga catatan kedaluwarsa atau melebihi usia maksimum yang dikonfigurasi pada pemetaan sumber peristiwa (). [MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)
+ **Selama pemanggilan:** Jika fungsi dipanggil tetapi menampilkan kesalahan, Lambda mencoba ulang hingga catatan kedaluwarsa, melebihi usia maksimum ([MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)), atau mencapai kuota coba ulang yang dikonfigurasi (). [MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts) Untuk kesalahan fungsi, Anda juga dapat mengonfigurasi [BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-response-BisectBatchOnFunctionError), yang membagi batch yang gagal menjadi dua batch yang lebih kecil, mengisolasi catatan buruk dan menghindari batas waktu. Memisahkan batch tidak menghabiskan kuota coba lagi.

Jika tindakan penanganan kesalahan gagal, Lambda membuang rekaman dan melanjutkan pemrosesan batch dari aliran. Dengan pengaturan default, ini berarti rekaman yang buruk dapat memblokir pemrosesan pada shard yang terpengaruh hingga selama satu hari. Untuk menghindari hal ini, konfigurasikan pemetaan sumber kejadian fungsi Anda dengan jumlah percobaan ulang yang wajar dan usia maksimum rekaman yang sesuai dengan kasus penggunaan Anda.

## Mengonfigurasi tujuan untuk pemanggilan yang gagal
<a name="dynamodb-on-failure-destination-console"></a>

Untuk menyimpan catatan pemanggilan pemetaan sumber peristiwa yang gagal, tambahkan tujuan ke pemetaan sumber peristiwa fungsi Anda. Setiap catatan yang dikirim ke tujuan adalah dokumen JSON yang berisi metadata tentang pemanggilan yang gagal. Untuk tujuan Amazon S3, Lambda juga mengirimkan seluruh catatan pemanggilan bersama dengan metadata. Anda dapat mengonfigurasi topik Amazon SNS, antrian Amazon SQS, bucket Amazon S3, atau Kafka sebagai tujuan.

Dengan tujuan Amazon S3, Anda dapat menggunakan fitur Pemberitahuan [Acara Amazon S3](https://docs.aws.amazon.com/) untuk menerima notifikasi saat objek diunggah ke bucket S3 tujuan Anda. Anda juga dapat mengonfigurasi Pemberitahuan Acara S3 untuk menjalankan fungsi Lambda lain untuk melakukan pemrosesan otomatis pada batch yang gagal.

Peran eksekusi Anda harus memiliki izin untuk tujuan:
+ **Untuk tujuan SQS: [sqs](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html):** SendMessage
+ **[Untuk tujuan SNS: SNS: Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html)**
+ **Untuk tujuan S3: s3: PutObject** [dan [s3:](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/ListObjectsV2.html)
+ **Untuk tujuan Kafka: [kafka-cluster](https://docs.aws.amazon.com/msk/latest/developerguide/kafka-actions.html):** WriteData

Anda dapat mengonfigurasi topik Kafka sebagai tujuan gagal untuk pemetaan sumber acara Kafka Anda. Ketika Lambda tidak dapat memproses catatan setelah percobaan ulang yang melelahkan atau ketika catatan melebihi usia maksimum, Lambda mengirimkan catatan yang gagal ke topik Kafka yang ditentukan untuk diproses nanti. Lihat [Menggunakan topik Kafka sebagai tujuan kegagalan](kafka-on-failure-destination.md).

Jika Anda telah mengaktifkan enkripsi dengan kunci KMS Anda sendiri untuk tujuan S3, peran eksekusi fungsi Anda juga harus memiliki izin untuk memanggil [kms](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html):. GenerateDataKey Jika kunci KMS dan tujuan bucket S3 berada di akun yang berbeda dari fungsi Lambda dan peran eksekusi, konfigurasikan kunci KMS untuk mempercayai peran eksekusi yang diizinkan. kms: GenerateDataKey

Untuk mengonfigurasi tujuan saat gagal menggunakan konsol, ikuti langkah-langkah berikut:

1. Buka [halaman Fungsi](https://console.aws.amazon.com/lambda/home#/functions) di konsol Lambda.

1. Pilih fungsi.

1. Di bagian **Gambaran umum fungsi**, pilih **Tambahkan tujuan**.

1. Untuk **Sumber**, pilih **Pemanggilan pemetaan sumber acara**.

1. Untuk **pemetaan sumber peristiwa**, pilih sumber peristiwa yang dikonfigurasi untuk fungsi ini.

1. Untuk **Kondisi**, pilih **On failure**. Untuk pemanggilan pemetaan sumber peristiwa, ini adalah satu-satunya kondisi yang diterima.

1. Untuk **tipe Tujuan**, pilih tipe tujuan yang Lambda kirimkan catatan pemanggilan.

1. Untuk **Tujuan**, pilih sumber daya.

1. Pilih **Simpan**.

Anda juga dapat mengonfigurasi tujuan pada kegagalan menggunakan AWS Command Line Interface (AWS CLI). Misalnya, [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html)perintah berikut menambahkan pemetaan sumber peristiwa dengan tujuan kegagalan SQS ke: `MyFunction`

```
aws lambda create-event-source-mapping \
--function-name "MyFunction" \
--event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
```

[update-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html)Perintah berikut memperbarui pemetaan sumber peristiwa untuk mengirim catatan pemanggilan yang gagal ke tujuan SNS setelah dua kali mencoba lagi, atau jika catatan berusia lebih dari satu jam.

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--maximum-retry-attempts 2 \
--maximum-record-age-in-seconds 3600 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'
```

Pengaturan yang diperbarui diterapkan secara asinkron dan tidak muncul dalam output hingga proses selesai. Gunakan perintah [get-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html) untuk melihat status saat ini.

Untuk menghapus tujuan, berikan string kosong sebagai argumen ke `destination-config` parameter:

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": ""}}'
```

### Praktik terbaik keamanan untuk tujuan Amazon S3
<a name="ddb-s3-destination-security"></a>

Menghapus bucket S3 yang dikonfigurasi sebagai tujuan tanpa menghapus tujuan dari konfigurasi fungsi Anda dapat menimbulkan risiko keamanan. Jika pengguna lain mengetahui nama bucket tujuan Anda, mereka dapat membuat ulang bucket di bucket tersebut. Akun AWS Catatan pemanggilan yang gagal akan dikirim ke bucket mereka, yang berpotensi mengekspos data dari fungsi Anda.

**Awas**  
Untuk memastikan bahwa catatan pemanggilan dari fungsi Anda tidak dapat dikirim ke bucket S3 di bucket lain Akun AWS, tambahkan kondisi ke peran eksekusi fungsi Anda yang membatasi `s3:PutObject` izin ke bucket di akun Anda. 

Contoh berikut menunjukkan kebijakan IAM yang membatasi `s3:PutObject` izin fungsi Anda ke bucket di akun Anda. Kebijakan ini juga memberi Lambda `s3:ListBucket` izin yang dibutuhkan untuk menggunakan bucket S3 sebagai tujuan.

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3BucketResourceAccountWrite",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*/*",
                "arn:aws:s3:::*"
            ],
            "Condition": {
                "StringEquals": {
                    "s3:ResourceAccount": "111122223333"
                }
            }
        }
    ]
}
```

Untuk menambahkan kebijakan izin ke peran eksekusi fungsi Anda menggunakan Konsol Manajemen AWS atau AWS CLI, lihat instruksi dalam prosedur berikut:

------
#### [ Console ]

**Untuk menambahkan kebijakan izin ke peran eksekusi fungsi (konsol)**

1. Buka [halaman Fungsi](https://console.aws.amazon.com/lambda/home#/functions) di konsol Lambda.

1. Pilih fungsi Lambda yang peran eksekusinya ingin Anda modifikasi.

1. Di tab **Konfigurasi**, pilih **Izin**.

1. Di tab **Peran eksekusi**, pilih **nama Peran** fungsi Anda untuk membuka halaman konsol IAM peran.

1. Tambahkan kebijakan izin ke peran dengan melakukan hal berikut:

   1. Di panel **Kebijakan izin**, pilih **Tambahkan izin**, lalu pilih **Buat** kebijakan sebaris.

   1. Di **Editor kebijakan**, pilih **JSON**.

   1. Rekatkan kebijakan yang ingin Anda tambahkan ke editor (ganti JSON yang ada), lalu pilih **Berikutnya**.

   1. Di bawah **Detail kebijakan**, masukkan **nama Kebijakan**.

   1. Pilih **Buat kebijakan**.

------
#### [ AWS CLI ]

**Untuk menambahkan kebijakan izin ke peran eksekusi fungsi (CLI)**

1. Buat dokumen kebijakan JSON dengan izin yang diperlukan dan simpan di direktori lokal.

1. Gunakan perintah IAM `put-role-policy` CLI untuk menambahkan izin ke peran eksekusi fungsi Anda. Jalankan perintah berikut dari direktori tempat Anda menyimpan dokumen kebijakan JSON dan ganti nama peran, nama kebijakan, dan dokumen kebijakan dengan nilai Anda sendiri.

   ```
   aws iam put-role-policy \
   --role-name my_lambda_role \
   --policy-name LambdaS3DestinationPolicy \
   --policy-document file://my_policy.json
   ```

------

### Contoh catatan pemanggilan Amazon SNS dan Amazon SQS
<a name="kinesis-on-failure-destination-example-sns-sqs"></a>

Contoh berikut menunjukkan catatan pemanggilan yang dikirim Lambda ke tujuan SQS atau SNS untuk aliran DynamoDB.

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:13:49.717Z",
    "DDBStreamBatchInfo": {
        "shardId": "shardId-00000001573689847184-864758bb",
        "startSequenceNumber": "800000000003126276362",
        "endSequenceNumber": "800000000003126276362",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z",
        "batchSize": 1,
        "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388"
    }
}
```

Anda dapat menggunakan informasi ini guna mengambil rekaman yang terpengaruh dari aliran untuk pemecahan masalah. Rekaman aktual tidak disertakan, jadi Anda harus memproses rekaman ini dan mengambilnya dari aliran sebelum kedaluwarsa dan hilang.

### Contoh catatan pemanggilan Amazon S3
<a name="kinesis-on-failure-destination-example-sns-sqs-s3"></a>

Contoh berikut menunjukkan rekaman pemanggilan yang dikirim Lambda ke bucket S3 untuk aliran DynamoDB. Selain semua bidang dari contoh sebelumnya untuk tujuan SQS dan SNS, `payload` bidang berisi catatan pemanggilan asli sebagai string JSON yang lolos.

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:13:49.717Z",
    "DDBStreamBatchInfo": {
        "shardId": "shardId-00000001573689847184-864758bb",
        "startSequenceNumber": "800000000003126276362",
        "endSequenceNumber": "800000000003126276362",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z",
        "batchSize": 1,
        "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388"
    },
    "payload": "<Whole Event>" // Only available in S3
}
```

Objek S3 yang berisi catatan pemanggilan menggunakan konvensi penamaan berikut:

```
aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>
```

# Menerapkan pemrosesan aliran DynamoDB stateful di Lambda
<a name="services-ddb-windows"></a>

Fungsi Lambda dapat menjalankan aplikasi pemrosesan aliran berkelanjutan. Aliran merupakan data tidak terbatas yang mengalir terus-menerus melalui aplikasi Anda. Untuk menganalisis informasi dari input yang terus diperbarui ini, Anda dapat mengikat catatan yang disertakan menggunakan jendela yang didefinisikan dalam hal waktu.

Jatuh jendela adalah jendela waktu yang berbeda yang membuka dan menutup secara berkala. Secara default, pemanggilan Lambda tidak memiliki status — Anda tidak dapat menggunakannya untuk memproses data di beberapa pemanggilan berkelanjutan tanpa database eksternal. Namun, dengan jendela yang jatuh, Anda dapat mempertahankan status Anda di seluruh pemanggilan. Status ini berisi hasil agregat pesan yang sebelumnya diproses untuk jendela saat ini. Status Anda maksimal bisa sebesar 1 MB per shard. Jika melebihi ukuran tersebut, Lambda mengakhiri jendela lebih awal.

Setiap rekaman dalam aliran milik jendela tertentu. Lambda akan memproses setiap rekaman setidaknya sekali, tetapi tidak menjamin bahwa setiap rekaman akan diproses hanya sekali. Dalam kasus yang jarang terjadi, seperti penanganan kesalahan, beberapa catatan mungkin diproses lebih dari sekali. Catatan selalu diproses secara berurutan pertama kali. Jika catatan diproses lebih dari satu kali, mereka mungkin diproses rusak.

## Agregasi dan pemrosesan
<a name="streams-tumbling-processing"></a>

Fungsi yang dikelola pengguna Anda dipanggil baik untuk agregasi maupun untuk memproses hasil akhir agregasi tersebut. Lambda mengumpulkan semua catatan yang diterima di jendela. Anda dapat menerima catatan ini dalam beberapa batch, masing-masing sebagai invokasi terpisah. Setiap invokasi menerima status. Jadi, saat menggunakan jendela tumbling, respons fungsi Lambda Anda harus berisi `state` properti. Jika respons tidak berisi `state` properti, Lambda menganggap ini sebagai pemanggilan yang gagal. Untuk memenuhi kondisi ini, fungsi Anda dapat mengembalikan `TimeWindowEventResponse` objek, yang memiliki bentuk JSON berikut:

**Example Nilai `TimeWindowEventResponse`**  

```
{
    "state": {
        "1": 282,
        "2": 715
    },
    "batchItemFailures": []
}
```

**catatan**  
Untuk fungsi Java, sebaiknya gunakan a `Map<String, String>` untuk mewakili status.

Di akhir jendela, tanda `isFinalInvokeForWindow` diatur ke `true` untuk menunjukkan bahwa ini adalah status akhir dan bahwa itu siap untuk diproses. Setelah pemrosesan, jendela selesai, dan invokasi akhir Anda selesai, lalu status dihapus.

Di akhir jendela Anda, Lambda menggunakan pemrosesan akhir untuk tindakan pada hasil agregasi. Pemrosesan akhir Anda dipanggil secara sinkron. Setelah pemanggilan berhasil, fungsi Anda memeriksa nomor urut dan pemrosesan aliran berlanjut. Jika invokasi tidak berhasil, fungsi Lambda Anda menunda pemrosesan lebih lanjut sampai invokasi sukses.

**Example DynamodbTimeWindowEvent**  

```
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ],
    "window": {
        "start": "2020-07-30T17:00:00Z",
        "end": "2020-07-30T17:05:00Z"
    },
    "state": {
        "1": "state1"
    },
    "shardId": "shard123456789",
    "eventSourceARN": "stream-ARN",
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}
```

## Konfigurasi
<a name="streams-tumbling-config"></a>

Anda dapat mengonfigurasi jendela berguling saat membuat atau memperbarui pemetaan sumber peristiwa. Untuk mengkonfigurasi jendela tumbling, tentukan jendela dalam hitungan detik ([TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds)). Contoh berikut AWS Command Line Interface (AWS CLI) perintah membuat pemetaan sumber acara streaming yang memiliki jendela jatuh 120 detik. Fungsi Lambda yang didefinisikan untuk agregasi dan pemrosesan diberi nama `tumbling-window-example-function`.

```
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \
--function-name tumbling-window-example-function \
--starting-position TRIM_HORIZON \
--tumbling-window-in-seconds 120
```

Lambda menentukan jatuh batas jendela berguling berdasarkan waktu ketika catatan dimasukkan ke dalam aliran. Semua catatan memiliki stempel waktu perkiraan yang tersedia yang digunakan Lambda dalam penentuan batas.

Agregasi jendela berguling tidak mendukung shard ulang. Ketika shard berakhir, Lambda menganggap jendela ditutup, dan shard anak memulai jendela mereka sendiri dalam status baru.

Jendela berguling sepenuhnya mendukung kebijakan coba lagi yang ada `maxRetryAttempts` dan `maxRecordAge`.

**Example Handler.py - Agregasi dan pemrosesan**  
Fungsi Python berikut menunjukkan cara untuk menggabungkan, lalu memproses status akhir Anda:  

```
def lambda_handler(event, context):
    print('Incoming event: ', event)
    print('Incoming state: ', event['state'])

#Check if this is the end of the window to either aggregate or process.
    if event['isFinalInvokeForWindow']:
        # logic to handle final state of the window
        print('Destination invoke')
    else:
        print('Aggregate invoke')

#Check for early terminations
    if event['isWindowTerminatedEarly']:
        print('Window terminated early')

    #Aggregation logic
    state = event['state']
    for record in event['Records']:
        state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1

    print('Returning state: ', state)
    return {'state': state}
```

# Parameter Lambda untuk pemetaan sumber peristiwa Amazon DynamoDB
<a name="services-ddb-params"></a>

Semua jenis sumber peristiwa Lambda berbagi operasi yang sama [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html)dan [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html)API. Namun, hanya beberapa parameter yang berlaku untuk DynamoDB Streams.


| Parameter | Diperlukan | Default | Catatan | 
| --- | --- | --- | --- | 
|  BatchSize  |  T  |  100  |  Maksimum: 10.000.  | 
|  BisectBatchOnFunctionError  |  T  |  false  | none  | 
|  DestinationConfig  |  T  | N/A  |  Antrian Amazon SQS standar atau tujuan topik Amazon SNS standar untuk catatan yang dibuang  | 
|  Diaktifkan  |  T  |  true  | none  | 
|  EventSourceArn  |  Y  | N/A |  ARN dari aliran data atau konsumen aliran  | 
|  FilterCriteria  |  T  | N/A  |  [Kontrol peristiwa mana yang dikirim Lambda ke fungsi Anda](invocation-eventfiltering.md)  | 
|  FunctionName  |  Y  | N/A  | none  | 
|  FunctionResponseTypes  |  T  | N/A |  Agar fungsi Anda melaporkan kegagalan tertentu dalam satu batch, sertakan nilainya `ReportBatchItemFailures``FunctionResponseTypes`. Untuk informasi selengkapnya, lihat [Mengkonfigurasi respons batch sebagian dengan DynamoDB dan Lambda](services-ddb-batchfailurereporting.md).  | 
|  MaximumBatchingWindowInSeconds  |  T  |  0  | none  | 
|  MaximumRecordAgeInSeconds  |  T  |  -1  |  -1 berarti tak terbatas: catatan gagal dicoba ulang sampai catatan kedaluwarsa. [Batas retensi data untuk DynamoDB Streams adalah 24](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.DataRetention) jam. Minimal: -1 Maksimal: 604.800  | 
|  MaximumRetryAttempts  |  T  |  -1  |  -1 berarti tak terbatas: catatan gagal dicoba ulang sampai catatan kedaluwarsa Minimal: 0 Maksimum: 10.000.  | 
|  ParallelizationFactor  |  T  |  1  |  Maksimal: 10  | 
|  StartingPosition  |  Y  | N/A  |  TRIM\$1HORIZON  | 
|  TumblingWindowInSeconds  |  T  | N/A  |  Minimal: 0 Maksimal: 900  | 

# Menggunakan pemfilteran acara dengan sumber acara DynamoDB
<a name="with-ddb-filtering"></a>

Anda dapat menggunakan pemfilteran peristiwa untuk mengontrol rekaman mana dari aliran atau antrian yang dikirim Lambda ke fungsi Anda. Untuk informasi umum tentang cara kerja penyaringan acara, lihat[Kontrol peristiwa mana yang dikirim Lambda ke fungsi Anda](invocation-eventfiltering.md).

Bagian ini berfokus pada penyaringan acara untuk sumber acara DynamoDB.

**catatan**  
Pemetaan sumber peristiwa DynamoDB hanya mendukung pemfilteran pada kunci. `dynamodb`

**Topics**
+ [Acara DynamoDB](#filtering-ddb)
+ [Pemfilteran dengan atribut tabel](#filtering-ddb-attributes)
+ [Memfilter dengan ekspresi Boolean](#filtering-ddb-boolean)
+ [Menggunakan operator Exists](#filtering-ddb-exists)
+ [Format JSON untuk pemfilteran DynamoDB](#filtering-ddb-JSON-format)

## Acara DynamoDB
<a name="filtering-ddb"></a>

Misalkan Anda memiliki tabel DynamoDB dengan `CustomerName` kunci utama dan atribut dan. `AccountManager` `PaymentTerms` Berikut ini menunjukkan contoh catatan dari aliran tabel DynamoDB Anda.

```
{
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
          "ApproximateCreationDateTime": "1678831218.0",
          "Keys": {
              "CustomerName": {
                  "S": "AnyCompany Industries"
              }
          },
          "NewImage": {
              "AccountManager": {
                  "S": "Pat Candella"
              },
              "PaymentTerms": {
                  "S": "60 days"
              },
              "CustomerName": {
                  "S": "AnyCompany Industries"
              }
          },
          "SequenceNumber": "111",
          "SizeBytes": 26,
          "StreamViewType": "NEW_IMAGE"
      }
  }
```

Untuk memfilter berdasarkan nilai kunci dan atribut dalam tabel DynamoDB Anda, gunakan kunci `dynamodb` dalam catatan. Bagian berikut memberikan contoh untuk berbagai jenis filter.

### Memfilter dengan tombol tabel
<a name="filtering-ddb-keys"></a>

Misalkan Anda ingin fungsi Anda memproses hanya catatan-catatan di mana kunci utamanya `CustomerName` adalah “AnyCompany Industri.” `FilterCriteria`Objeknya adalah sebagai berikut.

```
{
     "Filters": [
          {
              "Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"
          }
      ]
 }
```

Untuk kejelasan tambahan, berikut adalah nilai filter yang `Pattern` diperluas di JSON biasa. 

```
{
     "dynamodb": {
          "Keys": {
              "CustomerName": {
                  "S": [ "AnyCompany Industries" ]
                  }
              }
          }
 }
```

Anda dapat menambahkan filter menggunakan konsol, AWS CLI atau AWS SAM templat.

------
#### [ Console ]

Untuk menambahkan filter ini menggunakan konsol, ikuti instruksi [Melampirkan kriteria filter ke pemetaan sumber peristiwa (konsol)](invocation-eventfiltering.md#filtering-console) dan masukkan string berikut untuk **kriteria Filter**.

```
{ "dynamodb" : { "Keys" : { "CustomerName" : { "S" : [ "AnyCompany Industries" ] } } } }
```

------
#### [ AWS CLI ]

Untuk membuat pemetaan sumber peristiwa baru dengan kriteria filter ini menggunakan AWS Command Line Interface (AWS CLI), jalankan perintah berikut.

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"}]}'
```

Untuk menambahkan kriteria filter ini ke pemetaan sumber peristiwa yang ada, jalankan perintah berikut.

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"}]}'
```

------
#### [ AWS SAM ]

Untuk menambahkan filter ini menggunakan AWS SAM, tambahkan cuplikan berikut ke template YAMB untuk sumber acara Anda.

```
FilterCriteria:
   Filters:
     - Pattern: '{ "dynamodb" : { "Keys" : { "CustomerName" : { "S" : [ "AnyCompany Industries" ] } } } }'
```

------

## Pemfilteran dengan atribut tabel
<a name="filtering-ddb-attributes"></a>

Dengan DynamoDB, Anda juga dapat menggunakan `NewImage` tombol `OldImage` and untuk memfilter nilai atribut. Misalkan Anda ingin memfilter catatan di mana `AccountManager` atribut dalam gambar tabel terbaru adalah “Pat Candella” atau “Shirley Rodriguez.” `FilterCriteria`Objeknya adalah sebagai berikut.

```
{
    "Filters": [
        {
            "Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"
        }
    ]
}
```

Untuk kejelasan tambahan, berikut adalah nilai filter yang `Pattern` diperluas di JSON biasa.

```
{
    "dynamodb": {
        "NewImage": {
            "AccountManager": {
                "S": [ "Pat Candella", "Shirley Rodriguez" ]
            }
        }
    }
}
```

Anda dapat menambahkan filter menggunakan konsol, AWS CLI atau AWS SAM templat.

------
#### [ Console ]

Untuk menambahkan filter ini menggunakan konsol, ikuti instruksi [Melampirkan kriteria filter ke pemetaan sumber peristiwa (konsol)](invocation-eventfiltering.md#filtering-console) dan masukkan string berikut untuk **kriteria Filter**.

```
{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella", "Shirley Rodriguez" ] } } } }
```

------
#### [ AWS CLI ]

Untuk membuat pemetaan sumber peristiwa baru dengan kriteria filter ini menggunakan AWS Command Line Interface (AWS CLI), jalankan perintah berikut.

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"}]}'
```

Untuk menambahkan kriteria filter ini ke pemetaan sumber peristiwa yang ada, jalankan perintah berikut.

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"}]}'
```

------
#### [ AWS SAM ]

Untuk menambahkan filter ini menggunakan AWS SAM, tambahkan cuplikan berikut ke template YAMB untuk sumber acara Anda.

```
FilterCriteria:
  Filters:
    - Pattern: '{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella", "Shirley Rodriguez" ] } } } }'
```

------

## Memfilter dengan ekspresi Boolean
<a name="filtering-ddb-boolean"></a>

Anda juga dapat membuat filter menggunakan ekspresi Boolean AND. Ekspresi ini dapat mencakup parameter kunci dan atribut tabel Anda. Misalkan Anda ingin memfilter catatan di mana `NewImage` nilainya `AccountManager` adalah “Pat Candella” dan `OldImage` nilainya adalah “Terry Whitlock”. `FilterCriteria`Objeknya adalah sebagai berikut.

```
{
    "Filters": [
        {
            "Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } }"
        }
    ]
}
```

Untuk kejelasan tambahan, berikut adalah nilai filter yang `Pattern` diperluas di JSON biasa.

```
{ 
    "dynamodb" : { 
        "NewImage" : { 
            "AccountManager" : { 
                "S" : [ 
                    "Pat Candella" 
                ] 
            } 
        } 
    }, 
    "dynamodb": { 
        "OldImage": { 
            "AccountManager": { 
                "S": [ 
                    "Terry Whitlock" 
                ] 
            } 
        } 
    } 
}
```

Anda dapat menambahkan filter menggunakan konsol, AWS CLI atau AWS SAM templat.

------
#### [ Console ]

Untuk menambahkan filter ini menggunakan konsol, ikuti instruksi [Melampirkan kriteria filter ke pemetaan sumber peristiwa (konsol)](invocation-eventfiltering.md#filtering-console) dan masukkan string berikut untuk **kriteria Filter**.

```
{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella" ] } } } , "dynamodb" : { "OldImage" : { "AccountManager" : { "S" : [ "Terry Whitlock" ] } } } }
```

------
#### [ AWS CLI ]

Untuk membuat pemetaan sumber peristiwa baru dengan kriteria filter ini menggunakan AWS Command Line Interface (AWS CLI), jalankan perintah berikut.

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } } "}]}'
```

Untuk menambahkan kriteria filter ini ke pemetaan sumber peristiwa yang ada, jalankan perintah berikut.

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } } "}]}'
```

------
#### [ AWS SAM ]

Untuk menambahkan filter ini menggunakan AWS SAM, tambahkan cuplikan berikut ke template YAMB untuk sumber acara Anda.

```
FilterCriteria:
  Filters:
    - Pattern: '{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella" ] } } } , "dynamodb" : { "OldImage" : { "AccountManager" : { "S" : [ "Terry Whitlock" ] } } } }'
```

------

**catatan**  
DynamoDB event filtering tidak mendukung penggunaan operator numerik (sama numerik dan rentang numerik). Bahkan jika item dalam tabel Anda disimpan sebagai angka, parameter ini dikonversi ke string di objek catatan JSON.

## Menggunakan operator Exists
<a name="filtering-ddb-exists"></a>

Karena cara objek peristiwa JSON dari DynamoDB terstruktur, menggunakan operator Exists memerlukan perhatian khusus. Operator Exists hanya bekerja pada node daun di acara JSON, jadi jika pola filter Anda menggunakan Exists untuk menguji node perantara, itu tidak akan berfungsi. Pertimbangkan item tabel DynamoDB berikut:

```
{
  "UserID": {"S": "12345"},
  "Name": {"S": "John Doe"},
  "Organizations": {"L": [
      {"S":"Sales"},
      {"S":"Marketing"},
      {"S":"Support"}
    ]
  }
}
```

Anda mungkin ingin membuat pola filter seperti berikut yang akan menguji peristiwa yang berisi`"Organizations"`:

```
{ "dynamodb" : { "NewImage" : { "Organizations" : [ { "exists": true } ] } } }
```

Namun, pola filter ini tidak akan pernah mengembalikan kecocokan karena `"Organizations"` bukan simpul daun. Contoh berikut menunjukkan bagaimana benar menggunakan operator Exists untuk membangun pola filter yang diinginkan:

```
{ "dynamodb" : { "NewImage" : {"Organizations": {"L": {"S": [ {"exists": true } ] } } } } }
```

## Format JSON untuk pemfilteran DynamoDB
<a name="filtering-ddb-JSON-format"></a>

Untuk memfilter peristiwa dengan benar dari sumber DynamoDB, bidang data dan kriteria filter Anda untuk bidang data `dynamodb` () harus dalam format JSON yang valid. Jika salah satu bidang tidak dalam format JSON yang valid, Lambda akan menghapus pesan atau melempar pengecualian. Tabel berikut merangkum perilaku spesifik: 


| Format data masuk | Format pola filter untuk properti data | Tindakan yang dihasilkan | 
| --- | --- | --- | 
|  JSON yang valid  |  JSON yang valid  |  Filter Lambda berdasarkan kriteria filter Anda.  | 
|  JSON yang valid  |  Tidak ada pola filter untuk properti data  |  Filter Lambda (hanya pada properti metadata lainnya) berdasarkan kriteria filter Anda.  | 
|  JSON yang valid  |  Non-JSON  |  Lambda melempar pengecualian pada saat pembuatan atau pembaruan pemetaan sumber acara. Pola filter untuk properti data harus dalam format JSON yang valid.  | 
|  Non-JSON  |  JSON yang valid  |  Lambda menjatuhkan rekor.  | 
|  Non-JSON  |  Tidak ada pola filter untuk properti data  |  Filter Lambda (hanya pada properti metadata lainnya) berdasarkan kriteria filter Anda.  | 
|  Non-JSON  |  Non-JSON  |  Lambda melempar pengecualian pada saat pembuatan atau pembaruan pemetaan sumber acara. Pola filter untuk properti data harus dalam format JSON yang valid.  | 

# Tutorial: Menggunakan AWS Lambda dengan aliran Amazon DynamoDB
<a name="with-ddb-example"></a>

 Dalam tutorial ini, Anda membuat fungsi Lambda untuk menggunakan kejadian dari aliran Amazon DynamoDB.

## Prasyarat
<a name="with-ddb-prepare"></a>

### Instal AWS Command Line Interface
<a name="install_aws_cli"></a>

Jika Anda belum menginstal AWS Command Line Interface, ikuti langkah-langkah di [Menginstal atau memperbarui versi terbaru AWS CLI untuk menginstalnya](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html).

Tutorial ini membutuhkan terminal baris perintah atau shell untuk menjalankan perintah. Di Linux dan macOS, gunakan shell dan manajer paket pilihan Anda.

**catatan**  
Di Windows, beberapa perintah Bash CLI yang biasa Anda gunakan dengan Lambda (`zip`seperti) tidak didukung oleh terminal bawaan sistem operasi. Untuk mendapatkan versi terintegrasi Windows dari Ubuntu dan Bash, [instal Windows Subsystem untuk](https://docs.microsoft.com/en-us/windows/wsl/install-win10) Linux. 

## Buat peran eksekusi
<a name="with-ddb-create-execution-role"></a>

Buat [peran eksekusi](lambda-intro-execution-role.md) yang memberikan izin fungsi Anda untuk mengakses AWS sumber daya.

**Untuk membuat peran eksekusi**

1. Buka [halaman peran](https://console.aws.amazon.com/iam/home#/roles) di konsol IAM.

1. Pilih **Buat peran**.

1. Buat peran dengan properti berikut.
   + **Entitas tepercaya** – Lambda.
   + **Izin - Peran AWSLambda** **Dinamo DBExecution**.
   + **Nama peran** – **lambda-dynamodb-role**.

**DBExecutionPeran AWSLambda Dynamo** memiliki izin yang diperlukan fungsi untuk membaca item dari DynamoDB dan menulis log ke Log. CloudWatch 

## Buat fungsi
<a name="with-ddb-example-create-function"></a>

Buat fungsi Lambda yang memproses peristiwa DynamoDB Anda. Kode fungsi menulis beberapa data peristiwa yang masuk ke CloudWatch Log.

------
#### [ .NET ]

**SDK untuk .NET**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Mengkonsumsi acara DynamoDB dengan Lambda menggunakan.NET.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda_DDB;

public class Function
{
    public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
    {
        context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");

        foreach (var record in dynamoEvent.Records)
        {
            context.Logger.LogInformation($"Event ID: {record.EventID}");
            context.Logger.LogInformation($"Event Name: {record.EventName}");

            context.Logger.LogInformation(JsonSerializer.Serialize(record));
        }

        context.Logger.LogInformation("Stream processing complete.");
    }
}
```

------
#### [ Go ]

**SDK untuk Go V2**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Mengkonsumsi acara DynamoDB dengan Lambda menggunakan Go.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-lambda-go/events"
	"fmt"
)

func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*string, error) {
	if len(event.Records) == 0 {
		return nil, fmt.Errorf("received empty event")
	}

	for _, record := range event.Records {
	 	LogDynamoDBRecord(record)
	}

	message := fmt.Sprintf("Records processed: %d", len(event.Records))
	return &message, nil
}

func main() {
	lambda.Start(HandleRequest)
}

func LogDynamoDBRecord(record events.DynamoDBEventRecord){
	fmt.Println(record.EventID)
	fmt.Println(record.EventName)
	fmt.Printf("%+v\n", record.Change)
}
```

------
#### [ Java ]

**SDK untuk Java 2.x**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Mengkonsumsi acara DynamoDB dengan Lambda menggunakan Java.  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

public class example implements RequestHandler<DynamodbEvent, Void> {

    private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create();

    @Override
    public Void handleRequest(DynamodbEvent event, Context context) {
        System.out.println(GSON.toJson(event));
        event.getRecords().forEach(this::logDynamoDBRecord);
        return null;
    }

    private void logDynamoDBRecord(DynamodbStreamRecord record) {
        System.out.println(record.getEventID());
        System.out.println(record.getEventName());
        System.out.println("DynamoDB Record: " + GSON.toJson(record.getDynamodb()));
    }
}
```

------
#### [ JavaScript ]

**SDK untuk JavaScript (v3)**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Mengkonsumsi acara DynamoDB dengan Lambda menggunakan. JavaScript  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
};

const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```
Mengkonsumsi acara DynamoDB dengan Lambda menggunakan. TypeScript  

```
export const handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
}
const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```

------
#### [ PHP ]

**SDK untuk PHP**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Mengkonsumsi acara DynamoDB dengan Lambda menggunakan PHP.  

```
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\DynamoDb\DynamoDbHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends DynamoDbHandler
{
    private StderrLogger $logger;

    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleDynamoDb(DynamoDbEvent $event, Context $context): void
    {
        $this->logger->info("Processing DynamoDb table items");
        $records = $event->getRecords();

        foreach ($records as $record) {
            $eventName = $record->getEventName();
            $keys = $record->getKeys();
            $old = $record->getOldImage();
            $new = $record->getNewImage();
            
            $this->logger->info("Event Name:".$eventName."\n");
            $this->logger->info("Keys:". json_encode($keys)."\n");
            $this->logger->info("Old Image:". json_encode($old)."\n");
            $this->logger->info("New Image:". json_encode($new));
            
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }

        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords items");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK untuk Python (Boto3)**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Mengkonsumsi acara DynamoDB dengan Lambda menggunakan Python.  

```
import json

def lambda_handler(event, context):
    print(json.dumps(event, indent=2))

    for record in event['Records']:
        log_dynamodb_record(record)

def log_dynamodb_record(record):
    print(record['eventID'])
    print(record['eventName'])
    print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")
```

------
#### [ Ruby ]

**SDK untuk Ruby**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Mengkonsumsi acara DynamoDB dengan Lambda menggunakan Ruby.  

```
def lambda_handler(event:, context:)
    return 'received empty event' if event['Records'].empty?
  
    event['Records'].each do |record|
      log_dynamodb_record(record)
    end
  
    "Records processed: #{event['Records'].length}"
  end
  
  def log_dynamodb_record(record)
    puts record['eventID']
    puts record['eventName']
    puts "DynamoDB Record: #{JSON.generate(record['dynamodb'])}"
  end
```

------
#### [ Rust ]

**SDK for Rust**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Mengkonsumsi acara DynamoDB dengan Lambda menggunakan Rust.  

```
use lambda_runtime::{service_fn, tracing, Error, LambdaEvent};
use aws_lambda_events::{
    event::dynamodb::{Event, EventRecord},
   };


// Built with the following dependencies:
//lambda_runtime = "0.11.1"
//serde_json = "1.0"
//tokio = { version = "1", features = ["macros"] }
//tracing = { version = "0.1", features = ["log"] }
//tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
//aws_lambda_events = "0.15.0"

async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> {
    
    let records = &event.payload.records;
    tracing::info!("event payload: {:?}",records);
    if records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    for record in records{
        log_dynamo_dbrecord(record);
    }

    tracing::info!("Dynamo db records processed");

    // Prepare the response
    Ok(())

}

fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{
    tracing::info!("EventId: {}", record.event_id);
    tracing::info!("EventName: {}", record.event_name);
    tracing::info!("DynamoDB Record: {:?}", record.change );
    Ok(())

}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
    .with_max_level(tracing::Level::INFO)
    .with_target(false)
    .without_time()
    .init();

    let func = service_fn(function_handler);
    lambda_runtime::run(func).await?;
    Ok(())
    
}
```

------

**Untuk membuat fungsi**

1. Salin kode sampel ke file dengan nama `example.js`.

1. Buat paket deployment.

   ```
   zip function.zip example.js
   ```

1. Buat fungsi Lambda dengan perintah `create-function`.

   ```
   aws lambda create-function --function-name ProcessDynamoDBRecords \
       --zip-file fileb://function.zip --handler example.handler --runtime nodejs24.x \
       --role arn:aws:iam::111122223333:role/lambda-dynamodb-role
   ```

## Uji fungsi Lambda
<a name="with-dbb-invoke-manually"></a>

Pada langkah ini, Anda menjalankan fungsi Lambda Anda secara manual menggunakan perintah `invoke` AWS Lambda CLI dan contoh peristiwa DynamoDB berikut. Salin berikut ini ke dalam file bernama`input.txt`.

**Example input.txt**  

```
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ]
}
```

Jalankan perintah `invoke` berikut. 

```
aws lambda invoke --function-name ProcessDynamoDBRecords \
    --cli-binary-format raw-in-base64-out \
    --payload file://input.txt outputfile.txt
```

**cli-binary-format**Opsi ini diperlukan jika Anda menggunakan AWS CLI versi 2. Untuk menjadikan ini pengaturan default, jalankan`aws configure set cli-binary-format raw-in-base64-out`. Untuk informasi selengkapnya, lihat [opsi baris perintah global yang AWS CLI didukung](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list) di *Panduan AWS Command Line Interface Pengguna untuk Versi 2*.

Fungsi mengembalikan `message` string dalam badan respons. 

Verifikasikan output dalam `outputfile.txt` file.

## Buat tabel DynamoDB dengan aliran yang diaktifkan
<a name="with-ddb-create-buckets"></a>

Buat tabel Amazon DynamoDB dengan aliran yang diaktifkan.

**Untuk membuat tabel DynamoDB**

1. Buka [Konsol DynamoDB](https://console.aws.amazon.com/dynamodb).

1. Pilih **Buat tabel**.

1. Buat tabel dengan pengaturan berikut.
   + **Nama tabel** – **lambda-dynamodb-stream**
   + **Kunci utama** – **id** (string)

1. Pilih **Buat**.

**Untuk mengaktifkan stream**

1. Buka [Konsol DynamoDB](https://console.aws.amazon.com/dynamodb).

1. Pilih **Tables**.

1. Pilih tabel **lambda-dynamodb-stream**.

1. **Di bawah **Ekspor dan aliran, pilih detail aliran** DynamoDB.**

1. Pilih **Nyalakan**.

1. Untuk **tipe Tampilan**, pilih **Atribut kunci saja**.

1. Pilih **Aktifkan aliran**.

Tulis ARN stream. Anda memerlukan ini di langkah berikutnya ketika Anda mengaitkan aliran dengan fungsi Lambda Anda. Untuk informasi selengkapnya tentang mengaktifkan aliran, lihat [Menangkap aktivitas tabel dengan DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html).

## Tambahkan sumber acara di AWS Lambda
<a name="with-ddb-attach-notification-configuration"></a>

Buat pemetaan sumber peristiwa di AWS Lambda. Pemetaan sumber peristiwa ini mengaitkan aliran DynamoDB dengan fungsi Lambda Anda. Setelah Anda membuat pemetaan sumber acara ini, AWS Lambda mulai polling aliran.

Jalankan perintah AWS CLI `create-event-source-mapping` berikut. Setelah perintah dijalankan, catat UUID. Anda akan memerlukan UUID ini untuk merujuk ke pemetaan sumber kejadian dalam perintah apa pun, misalnya, saat menghapus pemetaan sumber kejadian.

```
aws lambda create-event-source-mapping --function-name ProcessDynamoDBRecords \
    --batch-size 100 --starting-position LATEST --event-source DynamoDB-stream-arn
```

 Ini membuat pemetaan di antara aliran DynamoDB yang ditentukan dan fungsi Lambda. Anda dapat mengaitkan aliran DynamoDB dengan beberapa fungsi Lambda, dan mengaitkan fungsi Lambda yang sama dengan beberapa aliran. Namun, fungsi Lambda akan membagikan throughput pembacaan untuk aliran yang mereka bagikan. 

Anda bisa mendapatkan daftar pemetaan sumber kejadian dengan menjalankan perintah berikut.

```
aws lambda list-event-source-mappings
```

Daftar tersebut mengembalikan semua pemetaan sumber kejadian yang Anda buat, dan antara lain menampilkan `LastProcessingResult` untuk setiap pemetaan. Bidang ini digunakan untuk memberikan pesan informatif jika terjadi masalah. Nilai seperti `No records processed` (menunjukkan bahwa AWS Lambda belum memulai polling atau bahwa tidak ada catatan dalam aliran) dan `OK` (menunjukkan AWS Lambda berhasil membaca catatan dari aliran dan memanggil fungsi Lambda Anda) menunjukkan bahwa tidak ada masalah. Jika ada masalah, Anda akan menerima pesan kesalahan.

Jika Anda memiliki banyak pemetaan sumber kejadian, gunakan parameter nama fungsi untuk mempersempit hasil.

```
aws lambda list-event-source-mappings --function-name ProcessDynamoDBRecords
```

## Uji penyiapan
<a name="with-ddb-final-integration-test-no-iam"></a>

Uji end-to-end pengalaman. Saat Anda melakukan pembaruan tabel, DynamoDB menuliskan catatan peristiwa ke aliran. Saat AWS Lambda melakukan polling terhadap aliran, ini mendeteksi catatan baru dalam aliran dan memanggil fungsi Lambda Anda atas nama Anda dengan mengirimkan peristiwa ke fungsi tersebut. 

1. Di konsol DynamoDB, tambahkan, perbarui, dan hapus item di tabel. DynamoDB menuliskan catatan tindakan ini ke aliran.

1. AWS Lambda polling aliran dan ketika mendeteksi pembaruan ke aliran, ia memanggil fungsi Lambda Anda dengan meneruskan data peristiwa yang ditemukannya di aliran.

1. Fungsi Anda berjalan dan membuat log di Amazon CloudWatch. Anda dapat memverifikasi log yang dilaporkan di CloudWatch konsol Amazon.

## Langkah selanjutnya
<a name="with-ddb-next-steps"></a>

Tutorial ini menunjukkan dasar-dasar pemrosesan peristiwa aliran DynamoDB dengan Lambda. Untuk beban kerja produksi, pertimbangkan untuk menerapkan logika respons batch sebagian untuk menangani kegagalan rekaman individu dengan lebih efisien. [Utilitas prosesor batch](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) dari Powertools for AWS Lambda tersedia dalam Python TypeScript,, .NET, dan Java dan memberikan solusi yang kuat untuk ini, secara otomatis menangani kompleksitas respons batch sebagian dan mengurangi jumlah percobaan ulang untuk catatan yang berhasil diproses.

## Bersihkan sumber daya Anda
<a name="cleanup"></a>

Sekarang Anda dapat menghapus sumber daya yang Anda buat untuk tutorial ini, kecuali Anda ingin mempertahankannya. Dengan menghapus AWS sumber daya yang tidak lagi Anda gunakan, Anda mencegah tagihan yang tidak perlu ke Anda Akun AWS.

**Untuk menghapus fungsi Lambda**

1. Buka [halaman Fungsi](https://console.aws.amazon.com/lambda/home#/functions) di konsol Lambda.

1. Pilih fungsi yang Anda buat.

1. Pilih **Tindakan**, **Hapus**.

1. Ketik **confirm** kolom input teks dan pilih **Hapus**.

**Untuk menghapus peran eksekusi**

1. Buka [halaman Peran](https://console.aws.amazon.com/iam/home#/roles) dari konsol IAM.

1. Pilih peran eksekusi yang Anda buat.

1. Pilih **Hapus**.

1. Masukkan nama peran di bidang input teks dan pilih **Hapus**.

**Untuk menghapus tabel DynamoDB**

1. Buka [halaman Tabel](https://console.aws.amazon.com//dynamodb/home#tables:) di konsol DynamoDB.

1. Pilih tabel yang Anda buat.

1. Pilih **Hapus**.

1. Masukkan **delete** di kotak teks.

1. Pilih **Hapus tabel**.