Step Functions 익스프레스 워크플로를 SQS 사용하여 Amazon의 대용량 메시지를 처리합니다. - AWS Step Functions

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Step Functions 익스프레스 워크플로를 SQS 사용하여 Amazon의 대용량 메시지를 처리합니다.

이 샘플 프로젝트는 사용 방법을 보여줍니다. AWS Step Functions 익스프레스 워크플로는 Amazon 심플 큐 서비스 (AmazonSQS) 와 같은 대용량 이벤트 소스의 메시지 또는 데이터를 처리합니다. Express 워크플로는 매우 빠른 속도로 시작할 수 있기 때문에 대용량 이벤트 처리 또는 스트리밍 데이터 워크로드에 적합합니다.

다음은 이벤트 소스에서 상태 머신을 실행하는 데 일반적으로 사용되는 두 가지 방법입니다.

  • 이벤트 소스에서 CloudWatch 이벤트가 발생할 때마다 상태 머신 실행을 시작하도록 Amazon Events 규칙을 구성합니다. 자세한 내용은 이벤트를 트리거하는 CloudWatch 이벤트 규칙 생성을 참조하십시오.

  • 이벤트 소스를 Lambda 함수에 매핑하고 함수 코드를 작성하여 상태 머신을 실행합니다. The AWS Lambda 이벤트 소스가 이벤트를 내보낼 때마다 함수가 호출되어 스테이트 머신 실행이 시작됩니다. 자세한 내용은 사용을 참조하십시오. AWS Lambda 아마존과 함께 SQS.

이 샘플 프로젝트는 Amazon SQS 대기열이 메시지를 전송할 때마다 두 번째 방법을 사용하여 실행을 시작합니다. 유사한 구성을 사용하여 Amazon Simple Storage Service(S3), Amazon DynamoDB 및 Amazon Kinesis와 같은 다른 이벤트 소스에서 Express 워크플로 실행을 트리거할 수 있습니다.

Express 워크플로 및 Step Functions 서비스 통합에 대한 자세한 내용은 다음을 참조하세요.

1단계: 상태 시스템 만들기

  1. Step Functions 콘솔을 열고 상태 시스템 생성을 선택합니다.

  2. 검색 Process high-volume messages from SQS 상자에 내용을 입력한 다음 반환된 검색 SQS결과에서 대용량 메시지 처리를 선택합니다.

  3. 다음을 선택하여 계속 진행합니다.

  4. [Run a demo] 를 선택하여 읽기 전용 및 ready-to-deploy 워크플로를 만들거나 [Build on it] 를 선택하여 빌드하고 나중에 배포할 수 있는 편집 가능한 상태 머신 정의를 생성합니다.

    이 샘플 프로젝트는 다음 리소스를 배포합니다.

    • Lambda 함수 4개

    • 아마존 SQS 대기열

    • 원래 요청 ping에 대한 AWS Step Functions 상태 시스템

    • 관련 AWS Identity and Access Management (IAM) 역할

    다음 이미지는 SQS 샘플 프로젝트의 대용량 메시지 처리의 워크플로 그래프를 보여줍니다.

    SQS샘플 프로젝트의 대용량 메시지 처리의 워크플로 그래프.
  5. 템플릿 사용을 선택하여 계속 선택합니다.

다음 단계는 이전 선택에 따라 달라집니다.

  1. 데모 실행 — 에서 배포한 리소스로 읽기 전용 프로젝트를 만들기 전에 상태 머신을 검토할 수 있습니다. AWS CloudFormation 다음 주소로 AWS 계정.

    상태 머신 정의를 볼 수 있으며 준비가 되면 Deploy and run을 선택하여 프로젝트를 배포하고 리소스를 생성할 수 있습니다.

    배포하는 데 리소스 및 권한을 생성하는 데 최대 10분이 걸릴 수 있습니다. Stack ID 링크를 사용하여 진행 상황을 모니터링할 수 있습니다. AWS CloudFormation.

    배포가 완료되면 콘솔에서 새 상태 머신을 확인할 수 있습니다.

  2. 기반 구축 — 워크플로 정의를 검토하고 편집할 수 있습니다. 사용자 지정 워크플로를 실행하기 전에 샘플 프로젝트에서 자리 표시자의 값을 설정해야 할 수도 있습니다.

참고

계정에 배포된 서비스에는 표준 요금이 적용될 수 있습니다.

2단계: 상태 시스템 실행 트리거

  1. Amazon SQS 콘솔을 엽니다.

  2. 샘플 프로젝트에서 생성한 대기열을 선택합니다.

    이름은 예시- SQSQueue -와 비슷할 것입니다 wJalrXUtnFEMI.

  3. 대기열 작업 목록에서 메시지 전송을 선택합니다.

  4. 복사 버튼을 사용하여 다음 메시지를 복사한 다음 메시지 전송 창에 입력하고 메시지 전송을 선택합니다.

    참고

    이 샘플 메시지에서 input: 줄은 페이지에 맞게 줄바꿈을 사용하여 서식이 지정되었습니다. [복사] 버튼을 사용합니다. 또는 줄바꿈 없이 한 줄로 입력되었는지 확인합니다.

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. 닫기를 선택하세요.

  6. Step Functions 콘솔을 엽니다.

  7. Amazon CloudWatch Logs 로그 그룹으로 이동하여 로그를 검사하십시오. 로그 그룹 이름은 예시- ExpressLogGroup -와 같습니다 wJalrXUtnFEMI.

Lambda 함수 코드 예제

다음은 시작하는 Lambda 함수가 다음을 사용하여 상태 시스템 실행을 시작하는 방법을 보여주는 Lambda 함수 코드입니다. AWS SDK.

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

예제 상태 머신 코드

이 샘플 프로젝트의 Express 워크플로는 텍스트 처리를 위한 Lambda 함수 집합으로 구성됩니다.

방법에 대한 자세한 내용은 AWS Step Functions 다른 사람을 제어할 수 있습니다. AWS 서비스, 참조Step Functions와 서비스 통합.

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAM예시

이 예에서는 AWS Identity and Access Management 샘플 프로젝트에서 생성된 (IAM) 정책에는 상태 머신 및 관련 리소스를 실행하는 데 필요한 최소 권한이 포함되어 있습니다. IAM정책에 필요한 권한만 포함하는 것이 좋습니다.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

다음 정책은 CloudWatch 로그에 대한 충분한 권한이 있는지 확인합니다.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

Step Functions를 다른 기능과 함께 사용할 IAM 때 구성하는 방법에 대한 자세한 내용은 AWS 서비스는 을 참조하십시오Step Functions가 통합 서비스를 위한 IAM 정책을 생성하는 방법.