Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Menerapkan pemrosesan aliran DynamoDB stateful di Lambda
Fungsi Lambda dapat menjalankan aplikasi pemrosesan aliran berkelanjutan. Aliran merupakan data tidak terbatas yang mengalir terus-menerus melalui aplikasi Anda. Untuk menganalisis informasi dari input yang terus diperbarui ini, Anda dapat mengikat catatan yang disertakan menggunakan jendela yang didefinisikan dalam hal waktu.
Jatuh jendela adalah jendela waktu yang berbeda yang membuka dan menutup secara berkala. Secara default, pemanggilan Lambda tanpa status — Anda tidak dapat menggunakannya untuk memproses data di beberapa pemanggilan berkelanjutan tanpa database eksternal. Namun, dengan jendela yang jatuh, Anda dapat mempertahankan status Anda di seluruh pemanggilan. Status ini berisi hasil agregat pesan yang sebelumnya diproses untuk jendela saat ini. Status Anda maksimal bisa sebesar 1 MB per shard. Jika melebihi ukuran tersebut, Lambda mengakhiri jendela lebih awal.
Setiap rekaman dalam aliran milik jendela tertentu. Lambda akan memproses setiap rekaman setidaknya sekali, tetapi tidak menjamin bahwa setiap rekaman akan diproses hanya sekali. Dalam kasus yang jarang terjadi, seperti penanganan kesalahan, beberapa catatan mungkin diproses lebih dari sekali. Catatan selalu diproses secara berurutan pertama kali. Jika catatan diproses lebih dari satu kali, mereka mungkin diproses rusak.
Agregasi dan pemrosesan
Fungsi yang dikelola pengguna Anda dipanggil baik untuk agregasi maupun untuk memproses hasil akhir agregasi tersebut. Lambda mengumpulkan semua catatan yang diterima di jendela. Anda dapat menerima catatan ini dalam beberapa batch, masing-masing sebagai invokasi terpisah. Setiap invokasi menerima status. Jadi, saat menggunakan jendela tumbling, respons fungsi Lambda Anda harus berisi state
properti. Jika respons tidak berisi state
properti, Lambda menganggap ini sebagai pemanggilan yang gagal. Untuk memenuhi kondisi ini, fungsi Anda dapat mengembalikan TimeWindowEventResponse
objek, yang memiliki JSON bentuk sebagai berikut:
contoh Nilai TimeWindowEventResponse
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
catatan
Untuk fungsi Java, sebaiknya gunakan a Map<String, String>
untuk mewakili status.
Di akhir jendela, tanda isFinalInvokeForWindow
diatur ke true
untuk menunjukkan bahwa ini adalah status akhir dan bahwa itu siap untuk diproses. Setelah pemrosesan, jendela selesai, dan invokasi akhir Anda selesai, lalu status dihapus.
Di akhir jendela Anda, Lambda menggunakan pemrosesan akhir untuk tindakan pada hasil agregasi. Pemrosesan akhir Anda dipanggil secara sinkron. Setelah pemanggilan berhasil, fungsi Anda memeriksa nomor urut dan pemrosesan aliran berlanjut. Jika invokasi tidak berhasil, fungsi Lambda Anda menunda pemrosesan lebih lanjut sampai invokasi sukses.
contoh DynamodbTimeWindowEvent
{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ], "window": { "start": "2020-07-30T17:00:00Z", "end": "2020-07-30T17:05:00Z" }, "state": { "1": "state1" }, "shardId": "shard123456789", "eventSourceARN": "stream-ARN", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }
Konfigurasi
Anda dapat mengonfigurasi jendela berguling saat membuat atau memperbarui pemetaan sumber peristiwa. Untuk mengkonfigurasi jendela tumbling, tentukan jendela dalam hitungan detik (TumblingWindowInSeconds). Contoh berikut AWS Command Line Interface (AWS CLI) perintah membuat pemetaan sumber acara streaming yang memiliki jendela jatuh 120 detik. Fungsi Lambda yang didefinisikan untuk agregasi dan pemrosesan diberi nama tumbling-window-example-function
.
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \ --function-name tumbling-window-example-function \ --starting-position TRIM_HORIZON \ --tumbling-window-in-seconds
120
Lambda menentukan jatuh batas jendela berguling berdasarkan waktu ketika catatan dimasukkan ke dalam aliran. Semua catatan memiliki stempel waktu perkiraan yang tersedia yang digunakan Lambda dalam penentuan batas.
Agregasi jendela berguling tidak mendukung shard ulang. Ketika shard berakhir, Lambda menganggap jendela ditutup, dan shard anak memulai jendela mereka sendiri dalam status baru.
Jendela berguling sepenuhnya mendukung kebijakan coba lagi yang ada maxRetryAttempts
dan maxRecordAge
.
contoh Handler.py - Agregasi dan pemrosesan
Fungsi Python berikut menunjukkan cara untuk menggabungkan, lalu memproses status akhir Anda:
def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1 print('Returning state: ', state) return {'state': state}