Tutorial #2: Menggunakan filter untuk memproses beberapa peristiwa dengan DynamoDB dan Lambda - Amazon DynamoDB

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Tutorial #2: Menggunakan filter untuk memproses beberapa peristiwa dengan DynamoDB dan Lambda

Dalam tutorial ini, Anda akan membuat AWS Lambda pemicu untuk memproses hanya beberapa peristiwa dalam aliran dari tabel DynamoDB.

Dengan pemfilteran peristiwa Lambda, Anda dapat menggunakan ekspresi filter untuk mengontrol peristiwa mana yang dikirim Lambda ke fungsi Anda untuk diproses. Anda dapat mengonfigurasi hingga 5 filter berbeda per aliran DynamoDB. Jika Anda menggunakan jendela batching, Lambda menerapkan kriteria filter untuk setiap acara baru untuk melihat apakah itu harus disertakan dalam batch saat ini.

Filter diterapkan melalui struktur yang disebut FilterCriteria. 3 atribut utama FilterCriteria adalahmetadata properties, data properties, dan filter patterns.

Berikut adalah contoh struktur dari acara DynamoDB Streams:

{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }

metadata properties adalah bidang objek peristiwa. Dalam kasus DynamoDB Streams, metadata properties adalah bidang seperti dynamodb atau eventName.

data properties adalah bidang badan peristiwa. Untuk memfilter data properties, pastikan untuk memasukkannya ke FilterCriteria dalam kunci yang tepat. Untuk sumber peristiwa DynamoDB, kunci data adalah NewImage atau OldImage.

Akhirnya, aturan filter akan menentukan ekspresi filter yang ingin Anda terapkan ke properti tertentu. Berikut ini adalah beberapa contohnya:

Operator perbandingan Contoh Sintaks aturan (Sebagian)

Null

Jenis Produk adalah null

{ "product_type": { "S": null } }

Kosong

Nama produk kosong

{ "product_name": { "S": [ ""] } }

Setara

Negara bagian sama dengan Florida

{ "state": { "S": ["FL"] } }

Dan

Negara bagian produk sama dengan Florida dan kategori produk Cokelat

{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } }

Atau

Negara bagian produk adalah Florida atau California

{ "state": { "S": ["FL","CA"] } }

Bukan

Negara bagian produk bukan Florida

{"state": {"S": [{"anything-but": ["FL"]}]}}

Exists

Produk Rumahan ada

{"homemade": {"S": [{"exists": true}]}}

Tidak ada

Produk Rumahan tidak ada

{"homemade": {"S": [{"exists": false}]}}

Dimulai dengan

PK dimulai dengan PERUSAHAAN

{"PK": {"S": [{"prefix": "COMPANY"}]}}

Anda dapat menentukan hingga 5 pola penyaringan peristiwa untuk fungsi Lambda. Perhatikan bahwa masing-masing dari 5 peristiwa tersebut akan dievaluasi sebagai OR logis. Jadi jika Anda mengkonfigurasi dua filter bernama Filter_One danFilter_Two, fungsi Lambda akan mengeksekusi Filter_One OR Filter_Two.

catatan

Di halaman pemfilteran acara Lambda ada beberapa opsi untuk memfilter dan membandingkan nilai numerik, namun dalam kasus peristiwa filter DynamoDB itu tidak berlaku karena angka di DynamoDB disimpan sebagai string. Misalnya "quantity": { "N": "50" }, kita tahu itu nomor karena properti "N".

Menyatukan semuanya - AWS CloudFormation

Untuk menampilkan fungsionalitas pemfilteran acara dalam praktiknya, berikut adalah contoh CloudFormation template. Templat ini akan menghasilkan tabel DynamoDB Sederhana dengan PK Kunci Partisi dan SK Kunci Urutan dengan Amazon DynamoDB Streams diaktifkan. Ini akan membuat fungsi lambda dan peran Eksekusi Lambda sederhana yang memungkinkan penulisan log ke Amazon Cloudwatch, dan membaca peristiwa dari Amazon DynamoDB Stream. Ini juga akan menambahkan pemetaan sumber peristiwa antara DynamoDB Streams dan fungsi Lambda, sehingga fungsi tersebut dapat dijalankan setiap kali ada kejadian di Amazon DynamoDB Stream.

AWSTemplateFormatVersion: "2010-09-09" Description: Sample application that presents AWS Lambda event source filtering with Amazon DynamoDB Streams. Resources: StreamsSampleDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "PK" AttributeType: "S" - AttributeName: "SK" AttributeType: "S" KeySchema: - AttributeName: "PK" KeyType: "HASH" - AttributeName: "SK" KeyType: "RANGE" StreamSpecification: StreamViewType: "NEW_AND_OLD_IMAGES" ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: !GetAtt StreamsSampleDDBTable.StreamArn EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST ProcessEventLambda: Type: AWS::Lambda::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: | import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec) Outputs: StreamsSampleDDBTable: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.Arn StreamARN: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.StreamArn

Setelah Anda menerapkan templat pembentukan cloud ini, Anda dapat memasukkan Item Amazon DynamoDB berikut:

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }

Berkat fungsi lambda sederhana yang disertakan sebaris dalam template pembentukan cloud ini, Anda akan melihat peristiwa di grup CloudWatch log Amazon untuk fungsi lambda sebagai berikut:

{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }

Contoh Filter

  • Hanya produk yang cocok dengan status tertentu

Contoh ini memodifikasi CloudFormation template untuk menyertakan filter untuk mencocokkan semua produk yang berasal dari Florida, dengan singkatan “FL”.

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

Setelah Anda menerapkan kembali tumpukan, Anda dapat menambahkan item DynamoDB berikut ke tabel. Perhatikan bahwa itu tidak akan muncul di log fungsi Lambda, karena produk dalam contoh ini berasal dari California.

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK#1000", "company_id": "1000", "fabric": "Florida Chocolates", "price": 15, "product_id": "1000", "quantity": 50, "state": "CA", "stores": 5, "type": "" }
  • Hanya item yang dimulai dengan beberapa nilai di PK dan SK

Contoh ini memodifikasi CloudFormation template untuk menyertakan kondisi berikut:

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

Perhatikan kondisi AND mengharuskan kondisi berada di dalam pola, di mana PK dan SK Kunci berada dalam ekspresi yang sama dipisahkan oleh koma.

Baik mulai dengan beberapa nilai pada PK dan SK atau dari keadaan tertentu.

Contoh ini memodifikasi CloudFormation template untuk menyertakan kondisi berikut:

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

Perhatikan kondisi OR ditambahkan dengan memperkenalkan pola baru di bagian filter.

Menyatukan semuanya - CDK

Contoh templat pembentukan proyek CDK berikut berjalan melalui fungsionalitas penyaringan acara. Sebelum bekerja dengan proyek CDK ini, Anda perlu menginstal prasyarat termasuk menjalankan skrip persiapan.

Buat proyek CDK

Pertama buat AWS CDK proyek baru, dengan memanggil cdk init dalam direktori kosong.

mkdir ddb_filters cd ddb_filters cdk init app --language python

Perintah cdk init menggunakan nama folder proyek untuk memberi nama berbagai elemen proyek, termasuk kelas, subfolder, dan file. Tanda hubung apa pun dalam nama folder diubah menjadi garis bawah. Nama tersebut harus mengikuti bentuk pengenal Python. Misalnya, seharusnya tidak dimulai dengan angka atau berisi spasi.

Untuk bekerja dengan proyek baru, aktifkan lingkungan virtualnya. Ini memungkinkan dependensi proyek diinstal secara lokal di folder proyek, bukan secara global.

source .venv/bin/activate python -m pip install -r requirements.txt
catatan

Anda mungkin mengenali ini sebagai perintah Mac/Linux untuk mengaktifkan lingkungan virtual. Templat Python menyertakan file batch, source.bat, yang memungkinkan perintah yang sama untuk digunakan pada Windows. Perintah Windows tradisional .venv\Scripts\activate.bat juga berfungsi. Jika Anda menginisialisasi AWS CDK proyek Anda menggunakan AWS CDK Toolkit v1.70.0 atau yang lebih lama, lingkungan virtual Anda ada di direktori, bukan. .env .venv

Infrastruktur Dasar

Buka file ./ddb_filters/ddb_filters_stack.py dengan editor teks pilihan Anda. File ini dibuat secara otomatis saat Anda membuat AWS CDK proyek.

Selanjutnya, tambahkan fungsi _create_ddb_table dan _set_ddb_trigger_function. Fungsi-fungsi ini akan membuat tabel DynamoDB dengan kunci partisi PK dan mengurutkan kunci SK dalam mode penyediaan mode sesuai permintaan, dengan Amazon DynamoDB Streams diaktifkan secara default untuk menampilkan gambar Baru dan Lama.

Fungsi Lambda akan disimpan di folder lambda di bagian file app.py. File ini akan dibuat nanti. Ini akan mencakup variabel lingkungan APP_TABLE_NAME, yang akan menjadi nama Tabel Amazon DynamoDB yang dibuat oleh tumpukan ini. Dalam fungsi yang sama kami akan memberikan izin baca aliran ke fungsi Lambda. Akhirnya, hal tersebut akan berlangganan DynamoDB Streams sebagai sumber acara untuk fungsi lambda.

Di akhir file dalam metode __init__, Anda akan memanggil konstruksi masing-masing untuk menginisialisasi mereka dalam tumpukan. Untuk proyek yang lebih besar yang memerlukan komponen dan layanan tambahan, mungkin yang terbaik adalah mendefinisikan konstruksi ini di luar tumpukan dasar.

import os import json import aws_cdk as cdk from aws_cdk import ( Stack, aws_lambda as _lambda, aws_dynamodb as dynamodb, ) from constructs import Construct class DdbFiltersStack(Stack): def _create_ddb_table(self): dynamodb_table = dynamodb.Table( self, "AppTable", partition_key=dynamodb.Attribute( name="PK", type=dynamodb.AttributeType.STRING ), sort_key=dynamodb.Attribute( name="SK", type=dynamodb.AttributeType.STRING), billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, removal_policy=cdk.RemovalPolicy.DESTROY, ) cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name) return dynamodb_table def _set_ddb_trigger_function(self, ddb_table): events_lambda = _lambda.Function( self, "LambdaHandler", runtime=_lambda.Runtime.PYTHON_3_9, code=_lambda.Code.from_asset("lambda"), handler="app.handler", environment={ "APP_TABLE_NAME": ddb_table.table_name, }, ) ddb_table.grant_stream_read(events_lambda) event_subscription = _lambda.CfnEventSourceMapping( scope=self, id="companyInsertsOnlyEventSourceMapping", function_name=events_lambda.function_name, event_source_arn=ddb_table.table_stream_arn, maximum_batching_window_in_seconds=1, starting_position="LATEST", batch_size=1, ) def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) ddb_table = self._create_ddb_table() self._set_ddb_trigger_function(ddb_table)

Sekarang kita akan membuat fungsi lambda yang sangat sederhana yang akan mencetak log ke Amazon CloudWatch. Untuk melakukannya, buat folder baru bernama lambda.

mkdir lambda touch app.py

Menggunakan editor teks favorit Anda, tambahkan konten berikut ke file app.py:

import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec)

Memastikan Anda berada di folder /ddb_filters/, ketikkan perintah berikut untuk membuat aplikasi sampel:

cdk deploy

Pada titik tertentu Anda akan diminta untuk mengonfirmasi apakah Anda ingin menerapkan solusi tersebut. Terima perubahan dengan mengetik Y.

├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤ │ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │ └───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘ Do you wish to deploy these changes (y/n)? y ... ✨ Deployment time: 67.73s Outputs: DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP Stack ARN: arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6

Setelah perubahan diterapkan, buka AWS konsol Anda dan tambahkan satu item ke tabel Anda.

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }

CloudWatch Log sekarang harus berisi semua informasi dari entri ini.

Contoh Filter

  • Hanya produk yang cocok dengan status tertentu

Buka file ddb_filters/ddb_filters/ddb_filters_stack.py, dan modifikasi untuk menyertakan filter yang cocok dengan semua produk yang setara dengan “FL”. Ini dapat direvisi tepat di bawah event_subscription di baris 45.

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
  • Hanya item yang dimulai dengan beberapa nilai di PK dan SK

Ubah skrip python untuk menyertakan kondisi berikut:

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, ] },
  • Baik mulai dengan beberapa nilai pada PK dan SK atau dari keadaan tertentu.

Ubah skrip python untuk menyertakan kondisi berikut:

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )

Perhatikan bahwa kondisi OR ditambahkan dengan menambahkan lebih banyak elemen ke array Filter.

Pembersihan

Temukan tumpukan filter di dasar direktori kerja Anda, dan jalankancdk destroy. Anda akan diminta untuk mengonfirmasi penghapusan sumber daya:

cdk destroy Are you sure you want to delete: DdbFiltersStack (y/n)? y