Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Tutorial #2: Menggunakan filter untuk memproses beberapa peristiwa dengan DynamoDB dan Lambda
Dalam tutorial ini, Anda akan membuat AWS Lambda pemicu untuk memproses hanya beberapa peristiwa dalam aliran dari tabel DynamoDB.
Dengan pemfilteran peristiwa Lambda, Anda dapat menggunakan ekspresi filter untuk mengontrol peristiwa mana yang dikirim Lambda ke fungsi Anda untuk diproses. Anda dapat mengonfigurasi hingga 5 filter berbeda per aliran DynamoDB. Jika Anda menggunakan jendela batching, Lambda menerapkan kriteria filter untuk setiap acara baru untuk melihat apakah itu harus disertakan dalam batch saat ini.
Filter diterapkan melalui struktur yang disebut FilterCriteria
. 3 atribut utama FilterCriteria
adalahmetadata properties
, data properties
, dan filter patterns
.
Berikut adalah contoh struktur dari acara DynamoDB Streams:
{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }
metadata properties
adalah bidang objek peristiwa. Dalam kasus DynamoDB Streams, metadata properties
adalah bidang seperti dynamodb
atau eventName
.
data properties
adalah bidang badan peristiwa. Untuk memfilter data properties
, pastikan untuk memasukkannya ke FilterCriteria
dalam kunci yang tepat. Untuk sumber peristiwa DynamoDB, kunci data adalah NewImage
atau OldImage
.
Akhirnya, aturan filter akan menentukan ekspresi filter yang ingin Anda terapkan ke properti tertentu. Berikut ini adalah beberapa contohnya:
Operator perbandingan | Contoh | Sintaks aturan (Sebagian) |
---|---|---|
Null |
Jenis Produk adalah null |
|
Kosong |
Nama produk kosong |
|
Setara |
Negara bagian sama dengan Florida |
|
Dan |
Negara bagian produk sama dengan Florida dan kategori produk Cokelat |
|
Atau |
Negara bagian produk adalah Florida atau California |
|
Bukan |
Negara bagian produk bukan Florida |
|
Exists |
Produk Rumahan ada |
|
Tidak ada |
Produk Rumahan tidak ada |
|
Dimulai dengan |
PK dimulai dengan COMPANY |
|
Anda dapat menentukan hingga 5 pola penyaringan peristiwa untuk fungsi Lambda. Perhatikan bahwa masing-masing dari 5 peristiwa tersebut akan dievaluasi sebagai OR logis. Jadi jika Anda mengkonfigurasi dua filter bernama Filter_One
danFilter_Two
, fungsi Lambda akan mengeksekusi Filter_One
OR Filter_Two
.
catatan
Di halaman pemfilteran acara Lambda ada beberapa opsi untuk memfilter dan membandingkan nilai numerik, namun dalam kasus peristiwa filter DynamoDB itu tidak berlaku karena angka di DynamoDB disimpan sebagai string. Misalnya "quantity": { "N": "50"
}
, kita tahu itu nomor karena properti "N"
.
Menyatukan semuanya - AWS CloudFormation
Untuk menampilkan fungsionalitas pemfilteran acara dalam praktiknya, berikut adalah contoh CloudFormation template. Templat ini akan menghasilkan tabel DynamoDB Sederhana dengan PK Kunci Partisi dan SK Kunci Urutan dengan Amazon DynamoDB Streams diaktifkan. Ini akan membuat fungsi lambda dan peran Eksekusi Lambda sederhana yang memungkinkan penulisan log ke Amazon Cloudwatch, dan membaca peristiwa dari Amazon DynamoDB Stream. Ini juga akan menambahkan pemetaan sumber peristiwa antara DynamoDB Streams dan fungsi Lambda, sehingga fungsi tersebut dapat dijalankan setiap kali ada kejadian di Amazon DynamoDB Stream.
AWSTemplateFormatVersion: "2010-09-09" Description: Sample application that presents AWS Lambda event source filtering with Amazon DynamoDB Streams. Resources: StreamsSampleDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "PK" AttributeType: "S" - AttributeName: "SK" AttributeType: "S" KeySchema: - AttributeName: "PK" KeyType: "HASH" - AttributeName: "SK" KeyType: "RANGE" StreamSpecification: StreamViewType: "NEW_AND_OLD_IMAGES" ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: !GetAtt StreamsSampleDDBTable.StreamArn EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST ProcessEventLambda: Type: AWS::Lambda::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: | import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec) Outputs: StreamsSampleDDBTable: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.Arn StreamARN: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.StreamArn
Setelah Anda menerapkan templat pembentukan cloud ini, Anda dapat memasukkan Item Amazon DynamoDB berikut:
{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }
Berkat fungsi lambda sederhana yang disertakan sebaris dalam template pembentukan cloud ini, Anda akan melihat peristiwa di grup CloudWatch log Amazon untuk fungsi lambda sebagai berikut:
{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }
Contoh Filter
-
Hanya produk yang cocok dengan status tertentu
Contoh ini memodifikasi CloudFormation template untuk menyertakan filter agar sesuai dengan semua produk yang berasal dari Florida, dengan singkatan “FL”.
EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST
Setelah Anda menerapkan kembali tumpukan, Anda dapat menambahkan item DynamoDB berikut ke tabel. Perhatikan bahwa itu tidak akan muncul di log fungsi Lambda, karena produk dalam contoh ini berasal dari California.
{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK#1000", "company_id": "1000", "fabric": "Florida Chocolates", "price": 15, "product_id": "1000", "quantity": 50, "state": "CA", "stores": 5, "type": "" }
-
Hanya item yang dimulai dengan beberapa nilai di PK dan SK
Contoh ini memodifikasi CloudFormation template untuk menyertakan kondisi berikut:
EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST
Perhatikan AND kondisi mengharuskan kondisi berada di dalam pola, di mana Keys PK dan SK berada dalam ekspresi yang sama dipisahkan oleh koma.
Baik mulai dengan beberapa nilai pada PK dan SK atau dari keadaan tertentu.
Contoh ini memodifikasi CloudFormation template untuk menyertakan kondisi berikut:
EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST
Perhatikan kondisi OR ditambahkan dengan memperkenalkan pola baru di bagian filter.
Menyatukan semuanya - CDK
Contoh template pembentukan CDK proyek berikut berjalan melalui fungsionalitas penyaringan acara. Sebelum bekerja dengan CDK proyek ini, Anda perlu menginstal prasyarat termasuk menjalankan skrip persiapan.
Buat CDK proyek
Pertama buat AWS CDK proyek baru, dengan memanggil cdk init
dalam direktori kosong.
mkdir ddb_filters cd ddb_filters cdk init app --language python
Perintah cdk init
menggunakan nama folder proyek untuk memberi nama berbagai elemen proyek, termasuk kelas, subfolder, dan file. Tanda hubung apa pun dalam nama folder diubah menjadi garis bawah. Nama tersebut harus mengikuti bentuk pengenal Python. Misalnya, seharusnya tidak dimulai dengan angka atau berisi spasi.
Untuk bekerja dengan proyek baru, aktifkan lingkungan virtualnya. Ini memungkinkan dependensi proyek diinstal secara lokal di folder proyek, bukan secara global.
source .venv/bin/activate python -m pip install -r requirements.txt
catatan
Anda mungkin mengenali ini sebagai perintah Mac/Linux untuk mengaktifkan lingkungan virtual. Templat Python menyertakan file batch, source.bat
, yang memungkinkan perintah yang sama untuk digunakan pada Windows. Perintah Windows tradisional .venv\Scripts\activate.bat
juga berfungsi. Jika Anda menginisialisasi AWS CDK proyek Anda menggunakan AWS CDK Toolkit v1.70.0 atau yang lebih lama, lingkungan virtual Anda ada di direktori, bukan. .env
.venv
Infrastruktur Dasar
Buka file ./ddb_filters/ddb_filters_stack.py
dengan editor teks pilihan Anda. File ini dibuat secara otomatis saat Anda membuat AWS CDK proyek.
Selanjutnya, tambahkan fungsi _create_ddb_table
dan _set_ddb_trigger_function
. Fungsi-fungsi ini akan membuat tabel DynamoDB dengan kunci partisi PK dan mengurutkan kunci SK dalam mode penyediaan mode sesuai permintaan, dengan Amazon DynamoDB Streams diaktifkan secara default untuk menampilkan gambar Baru dan Lama.
Fungsi Lambda akan disimpan di folder lambda
di bagian file app.py
. File ini akan dibuat nanti. Ini akan mencakup variabel lingkungan APP_TABLE_NAME
, yang akan menjadi nama Tabel Amazon DynamoDB yang dibuat oleh tumpukan ini. Dalam fungsi yang sama kami akan memberikan izin baca aliran ke fungsi Lambda. Akhirnya, hal tersebut akan berlangganan DynamoDB Streams sebagai sumber acara untuk fungsi lambda.
Di akhir file dalam metode __init__
, Anda akan memanggil konstruksi masing-masing untuk menginisialisasi mereka dalam tumpukan. Untuk proyek yang lebih besar yang memerlukan komponen dan layanan tambahan, mungkin yang terbaik adalah mendefinisikan konstruksi ini di luar tumpukan dasar.
import os import json import aws_cdk as cdk from aws_cdk import ( Stack, aws_lambda as _lambda, aws_dynamodb as dynamodb, ) from constructs import Construct class DdbFiltersStack(Stack): def _create_ddb_table(self): dynamodb_table = dynamodb.Table( self, "AppTable", partition_key=dynamodb.Attribute( name="PK", type=dynamodb.AttributeType.STRING ), sort_key=dynamodb.Attribute( name="SK", type=dynamodb.AttributeType.STRING), billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, removal_policy=cdk.RemovalPolicy.DESTROY, ) cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name) return dynamodb_table def _set_ddb_trigger_function(self, ddb_table): events_lambda = _lambda.Function( self, "LambdaHandler", runtime=_lambda.Runtime.PYTHON_3_9, code=_lambda.Code.from_asset("lambda"), handler="app.handler", environment={ "APP_TABLE_NAME": ddb_table.table_name, }, ) ddb_table.grant_stream_read(events_lambda) event_subscription = _lambda.CfnEventSourceMapping( scope=self, id="companyInsertsOnlyEventSourceMapping", function_name=events_lambda.function_name, event_source_arn=ddb_table.table_stream_arn, maximum_batching_window_in_seconds=1, starting_position="LATEST", batch_size=1, ) def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) ddb_table = self._create_ddb_table() self._set_ddb_trigger_function(ddb_table)
Sekarang kita akan membuat fungsi lambda yang sangat sederhana yang akan mencetak log ke Amazon CloudWatch. Untuk melakukannya, buat folder baru bernama lambda
.
mkdir lambda touch app.py
Menggunakan editor teks favorit Anda, tambahkan konten berikut ke file app.py
:
import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec)
Memastikan Anda berada di folder /ddb_filters/
, ketikkan perintah berikut untuk membuat aplikasi sampel:
cdk deploy
Pada titik tertentu Anda akan diminta untuk mengonfirmasi apakah Anda ingin menerapkan solusi tersebut. Terima perubahan dengan mengetik Y
.
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤ │ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │ └───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘ Do you wish to deploy these changes (y/n)? y ... ✨ Deployment time: 67.73s Outputs: DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP Stack ARN: arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
Setelah perubahan diterapkan, buka AWS konsol Anda dan tambahkan satu item ke tabel Anda.
{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }
CloudWatch Log sekarang harus berisi semua informasi dari entri ini.
Contoh Filter
-
Hanya produk yang cocok dengan status tertentu
Buka file ddb_filters/ddb_filters/ddb_filters_stack.py
, dan modifikasi untuk menyertakan filter yang cocok dengan semua produk yang setara dengan “FL”. Ini dapat direvisi tepat di bawah event_subscription
di baris 45.
event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
-
Hanya item yang dimulai dengan beberapa nilai di PK dan SK
Ubah skrip python untuk menyertakan kondisi berikut:
event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, ] },
-
Baik mulai dengan beberapa nilai pada PK dan SK atau dari keadaan tertentu.
Ubah skrip python untuk menyertakan kondisi berikut:
event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
Perhatikan bahwa kondisi OR ditambahkan dengan menambahkan lebih banyak elemen ke array Filter.
Pembersihan
Temukan tumpukan filter di dasar direktori kerja Anda, dan jalankancdk destroy
. Anda akan diminta untuk mengonfirmasi penghapusan sumber daya:
cdk destroy Are you sure you want to delete: DdbFiltersStack (y/n)? y