Memproses item individual dengan fungsi Lambda di Step Functions - AWS Step Functions

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Memproses item individual dengan fungsi Lambda di Step Functions

Dalam tutorial ini, Anda menggunakan ItemBatcher (Peta) bidang status Peta Terdistribusi untuk mengulangi item individual yang ada dalam batch menggunakan fungsi Lambda. Status Peta Terdistribusi memulai empat eksekusi alur kerja anak. Masing-masing alur kerja anak ini menjalankan status Peta Inline. Untuk setiap iterasinya, status Peta Inline memanggil fungsi Lambda dan meneruskan satu item dari batch ke fungsi. Fungsi Lambda kemudian memproses item dan mengembalikan hasilnya.

Anda akan membuat mesin status yang melakukan perkalian pada array bilangan bulat. Katakanlah bahwa array integer yang Anda berikan sebagai input adalah [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] dan faktor perkalian adalah. 7 Kemudian, array yang dihasilkan terbentuk setelah mengalikan bilangan bulat ini dengan faktor 7, akan menjadi. [7, 14, 21, 28, 35, 42, 49, 56, 63, 70]

Langkah 1: Buat mesin negara

Pada langkah ini, Anda membuat prototipe alur kerja mesin status yang meneruskan satu item dari sekumpulan item ke setiap pemanggilan fungsi Lambda yang akan Anda buat di Langkah 2.

  • Gunakan definisi berikut untuk membuat mesin status menggunakan konsol Step Functions. Untuk informasi tentang membuat mesin status, lihat Langkah 1: Buat prototipe alur kerja di tutorial Memulai dengan menggunakan status Peta Terdistribusi.

    Di mesin status ini, Anda menentukan status Peta Terdistribusi yang menerima larik 10 bilangan bulat sebagai input dan meneruskan item array ini ke eksekusi alur kerja anak dalam batch. Setiap eksekusi alur kerja anak menerima batch tiga item sebagai input dan menjalankan status Peta Inline. Setiap iterasi status Peta Sebaris memanggil fungsi Lambda dan meneruskan item dari batch ke fungsi. Fungsi ini kemudian mengalikan item dengan faktor 7 dan mengembalikan hasilnya.

    Output dari setiap eksekusi alur kerja anak adalah JSON array yang berisi hasil perkalian untuk setiap item yang dilewatkan.

    penting

    Pastikan untuk mengganti Amazon Resource Name (ARN) dari fungsi Lambda dalam kode berikut dengan fungsi ARN yang akan Anda buat di Langkah 2.

    { "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "InnerMap", "States": { "InnerMap": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "INLINE" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:functionName" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "ItemsPath": "$.Items", "ItemSelector": { "MyMultiplicationFactor.$": "$.BatchInput.MyMultiplicationFactor", "MyItem.$": "$$.Map.Item.Value" } } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemsPath": "$.MyItems", "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } } } } }

Langkah 2: Buat fungsi Lambda

Pada langkah ini, Anda membuat fungsi Lambda yang memproses setiap item yang diteruskan dari batch.

penting

Pastikan fungsi Lambda Anda berada di bawah yang sama Wilayah AWS sebagai mesin negara Anda.

Untuk membuat fungsi Lambda
  1. Gunakan konsol Lambda untuk membuat fungsi Lambda Python bernama. ProcessSingleItem Untuk informasi tentang membuat fungsi Lambda, lihat Langkah 4: Mengkonfigurasi fungsi Lambda dalam Memulai dengan menggunakan Distributed Map state tutorial.

  2. Salin kode berikut untuk fungsi Lambda dan tempelkan ke bagian sumber Kode fungsi Lambda Anda.

    import json def lambda_handler(event, context): multiplication_factor = event['MyMultiplicationFactor'] item = event['MyItem'] result = multiplication_factor * item return { 'statusCode': 200, 'multiplied': result }
  3. Setelah Anda membuat fungsi Lambda Anda, salin fungsi yang ARN ditampilkan di sudut kanan atas halaman. Berikut ini adalah contohARN, di mana nama-fungsi adalah nama fungsi Lambda (dalam hal ini,ProcessSingleItem):

    arn:aws:lambda:us-east-1:123456789012:function:function-name

    Anda harus menyediakan fungsi ARN di mesin status yang Anda buat di Langkah 1.

  4. Pilih Terapkan untuk menyebarkan perubahan.

Langkah 3: Jalankan mesin negara

Saat Anda menjalankan mesin status, status Peta Terdistribusi memulai empat eksekusi alur kerja anak, di mana setiap eksekusi memproses tiga item, sementara satu eksekusi memproses satu item.

Contoh berikut menunjukkan data yang diteruskan ke salah satu pemanggilan ProcessSingleItemfungsi di dalam eksekusi alur kerja anak.

{ "MyMultiplicationFactor": 7, "MyItem": 1 }

Dengan masukan ini, contoh berikut menunjukkan output yang dikembalikan oleh fungsi Lambda.

{ "statusCode": 200, "multiplied": 7 }

Contoh berikut menunjukkan JSON array output untuk salah satu eksekusi alur kerja anak.

[ { "statusCode": 200, "multiplied": 7 }, { "statusCode": 200, "multiplied": 14 }, { "statusCode": 200, "multiplied": 21 } ]

Mesin state mengembalikan output berikut yang berisi empat array untuk empat eksekusi alur kerja anak. Array ini berisi hasil perkalian dari masing-masing item input.

Akhirnya, output mesin state adalah array bernama multiplied yang menggabungkan semua hasil perkalian yang dikembalikan untuk empat eksekusi alur kerja anak.

[ [ { "statusCode": 200, "multiplied": 7 }, { "statusCode": 200, "multiplied": 14 }, { "statusCode": 200, "multiplied": 21 } ], [ { "statusCode": 200, "multiplied": 28 }, { "statusCode": 200, "multiplied": 35 }, { "statusCode": 200, "multiplied": 42 } ], [ { "statusCode": 200, "multiplied": 49 }, { "statusCode": 200, "multiplied": 56 }, { "statusCode": 200, "multiplied": 63 } ], [ { "statusCode": 200, "multiplied": 70 } ] ]

Untuk menggabungkan semua hasil perkalian yang dikembalikan oleh eksekusi alur kerja anak ke dalam larik keluaran tunggal, Anda dapat menggunakan bidang tersebut. ResultSelector Tentukan bidang ini di dalam status Peta Terdistribusi untuk menemukan semua hasil, ekstrak hasil individual, dan kemudian menggabungkannya ke dalam satu array keluaran bernamamultiplied.

Untuk menggunakan ResultSelector bidang, perbarui definisi mesin status Anda seperti yang ditunjukkan pada contoh berikut.

{ "StartAt": "Pass", "States": { ... ... "Map": { "Type": "Map", ... ... "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } }, "ItemsPath": "$.MyItems", "ResultSelector": { "multiplied.$": "$..multiplied" } } } }

Mesin status diperbarui mengembalikan array output konsolidasi seperti yang ditunjukkan pada contoh berikut.

{ "multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70] }