選取您的 Cookie 偏好設定

我們使用提供自身網站和服務所需的基本 Cookie 和類似工具。我們使用效能 Cookie 收集匿名統計資料,以便了解客戶如何使用我們的網站並進行改進。基本 Cookie 無法停用,但可以按一下「自訂」或「拒絕」以拒絕效能 Cookie。

如果您同意,AWS 與經核准的第三方也會使用 Cookie 提供實用的網站功能、記住您的偏好設定,並顯示相關內容,包括相關廣告。若要接受或拒絕所有非必要 Cookie,請按一下「接受」或「拒絕」。若要進行更詳細的選擇,請按一下「自訂」。

搭配 Managed Service for Apache Flink 使用 CloudFormation

焦點模式
搭配 Managed Service for Apache Flink 使用 CloudFormation - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

下列練習示範如何在相同堆疊中使用 Lambda 函數啟動 AWS CloudFormation 使用 建立的 Flink 應用程式。

開始之前

開始本練習之前,請遵循 AWS CloudFormation 在 AWS::KinesisAnalytics::Application 使用 建立 Flink 應用程式的步驟。

撰寫 Lambda 函數

在建立或更新 Flink 應用程式後,若要啟動它,可以使用 kinesisanalyticsv2 start-application API。Flink 應用程式建立後, AWS CloudFormation 事件會觸發呼叫。在本練習稍後部分,我們將討論如何設定堆疊以觸發 Lambda 函數,但我們先專注於 Lambda 函數宣告及其程式碼。我們在本範例中使用 Python3.8 執行期。

StartApplicationLambda: Type: AWS::Lambda::Function DependsOn: StartApplicationLambdaRole Properties: Description: Starts an application when invoked. Runtime: python3.8 Role: !GetAtt StartApplicationLambdaRole.Arn Handler: index.lambda_handler Timeout: 30 Code: ZipFile: | import logging import cfnresponse import boto3 logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): logger.info('Incoming CFN event {}'.format(event)) try: application_name = event['ResourceProperties']['ApplicationName'] # filter out events other than Create or Update, # you can also omit Update in order to start an application on Create only. if event['RequestType'] not in ["Create", "Update"]: logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) return # use kinesisanalyticsv2 API to start an application. client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region']) # get application status. describe_response = client_kda.describe_application(ApplicationName=application_name) application_status = describe_response['ApplicationDetail']['ApplicationStatus'] # an application can be started from 'READY' status only. if application_status != 'READY': logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) return # create RunConfiguration. run_configuration = { 'ApplicationRestoreConfiguration': { 'ApplicationRestoreType': 'RESTORE_FROM_LATEST_SNAPSHOT', } } logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) # this call doesn't wait for an application to transfer to 'RUNNING' state. client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration) logger.info('Started Application: {}'.format(application_name)) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) except Exception as err: logger.error(err) cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})

在上述程式碼中,Lambda 會處理傳入 AWS CloudFormation 事件、篩選除 Create和 之外的所有內容Update、取得應用程式狀態,並在狀態為 時啟動它READY。若要取得應用程式狀態,您必須建立 Lambda 角色,如下所示。

建立 Lambda 角色

您可以為 Lambda 建立角色,以便與應用程式成功「通話」並寫入日誌。此角色使用預設的受管政策,但您可能想要將其縮小為使用自訂政策。

StartApplicationLambdaRole: Type: AWS::IAM::Role DependsOn: TestFlinkApplication Properties: Description: A role for lambda to use while interacting with an application. AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess Path: /

請注意,Lambda 資源將在建立 Flink 應用程式之後在同一堆疊中建立,因為它們依賴於它。

叫用 Lambda 函數

現在剩下要做的就是調用 Lambda 函數。您可以使用自訂資源來執行此操作。

StartApplicationLambdaInvoke: Description: Invokes StartApplicationLambda to start an application. Type: AWS::CloudFormation::CustomResource DependsOn: StartApplicationLambda Version: "1.0" Properties: ServiceToken: !GetAtt StartApplicationLambda.Arn Region: !Ref AWS::Region ApplicationName: !Ref TestFlinkApplication

以上是使用 Lambda 啟動 Flink 應用程式所需的一切。您現在可以建立自己的堆疊,也可以使用下面的完整範例來查看所有這些步驟的實際運作方式。

下列範例是先前步驟的略微擴充版本,並透過範本參數完成額外的RunConfiguration調整。這是一個工作堆疊供您嘗試。請務必閱讀隨附的注意事項:

stack.yaml

Description: 'kinesisanalyticsv2 CloudFormation Test Application' Parameters: ApplicationRestoreType: Description: ApplicationRestoreConfiguration option, can be SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT or RESTORE_FROM_CUSTOM_SNAPSHOT. Type: String Default: SKIP_RESTORE_FROM_SNAPSHOT AllowedValues: [ SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT, RESTORE_FROM_CUSTOM_SNAPSHOT ] SnapshotName: Description: ApplicationRestoreConfiguration option, name of a snapshot to restore to, used with RESTORE_FROM_CUSTOM_SNAPSHOT ApplicationRestoreType. Type: String Default: '' AllowNonRestoredState: Description: FlinkRunConfiguration option, can be true or false. Default: true Type: String AllowedValues: [ true, false ] CodeContentBucketArn: Description: ARN of a bucket with application code. Type: String CodeContentFileKey: Description: A jar filename with an application code inside a bucket. Type: String Conditions: IsSnapshotNameEmpty: !Equals [ !Ref SnapshotName, '' ] Resources: TestServiceExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - kinesisanlaytics.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonKinesisFullAccess - arn:aws:iam::aws:policy/AmazonS3FullAccess Path: / InputKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1 OutputKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1 TestFlinkApplication: Type: 'AWS::kinesisanalyticsv2::Application' Properties: ApplicationName: 'CFNTestFlinkApplication' ApplicationDescription: 'Test Flink Application' RuntimeEnvironment: 'FLINK-1_18' ServiceExecutionRole: !GetAtt TestServiceExecutionRole.Arn ApplicationConfiguration: EnvironmentProperties: PropertyGroups: - PropertyGroupId: 'KinesisStreams' PropertyMap: INPUT_STREAM_NAME: !Ref InputKinesisStream OUTPUT_STREAM_NAME: !Ref OutputKinesisStream AWS_REGION: !Ref AWS::Region FlinkApplicationConfiguration: CheckpointConfiguration: ConfigurationType: 'CUSTOM' CheckpointingEnabled: True CheckpointInterval: 1500 MinPauseBetweenCheckpoints: 500 MonitoringConfiguration: ConfigurationType: 'CUSTOM' MetricsLevel: 'APPLICATION' LogLevel: 'INFO' ParallelismConfiguration: ConfigurationType: 'CUSTOM' Parallelism: 1 ParallelismPerKPU: 1 AutoScalingEnabled: True ApplicationSnapshotConfiguration: SnapshotsEnabled: True ApplicationCodeConfiguration: CodeContent: S3ContentLocation: BucketARN: !Ref CodeContentBucketArn FileKey: !Ref CodeContentFileKey CodeContentType: 'ZIPFILE' StartApplicationLambdaRole: Type: AWS::IAM::Role DependsOn: TestFlinkApplication Properties: Description: A role for lambda to use while interacting with an application. AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess Path: / StartApplicationLambda: Type: AWS::Lambda::Function DependsOn: StartApplicationLambdaRole Properties: Description: Starts an application when invoked. Runtime: python3.8 Role: !GetAtt StartApplicationLambdaRole.Arn Handler: index.lambda_handler Timeout: 30 Code: ZipFile: | import logging import cfnresponse import boto3 logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): logger.info('Incoming CFN event {}'.format(event)) try: application_name = event['ResourceProperties']['ApplicationName'] # filter out events other than Create or Update, # you can also omit Update in order to start an application on Create only. if event['RequestType'] not in ["Create", "Update"]: logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) return # use kinesisanalyticsv2 API to start an application. client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region']) # get application status. describe_response = client_kda.describe_application(ApplicationName=application_name) application_status = describe_response['ApplicationDetail']['ApplicationStatus'] # an application can be started from 'READY' status only. if application_status != 'READY': logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) return # create RunConfiguration from passed parameters. run_configuration = { 'FlinkRunConfiguration': { 'AllowNonRestoredState': event['ResourceProperties']['AllowNonRestoredState'] == 'true' }, 'ApplicationRestoreConfiguration': { 'ApplicationRestoreType': event['ResourceProperties']['ApplicationRestoreType'], } } # add SnapshotName to RunConfiguration if specified. if event['ResourceProperties']['SnapshotName'] != '': run_configuration['ApplicationRestoreConfiguration']['SnapshotName'] = event['ResourceProperties']['SnapshotName'] logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) # this call doesn't wait for an application to transfer to 'RUNNING' state. client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration) logger.info('Started Application: {}'.format(application_name)) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) except Exception as err: logger.error(err) cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)}) StartApplicationLambdaInvoke: Description: Invokes StartApplicationLambda to start an application. Type: AWS::CloudFormation::CustomResource DependsOn: StartApplicationLambda Version: "1.0" Properties: ServiceToken: !GetAtt StartApplicationLambda.Arn Region: !Ref AWS::Region ApplicationName: !Ref TestFlinkApplication ApplicationRestoreType: !Ref ApplicationRestoreType SnapshotName: !Ref SnapshotName AllowNonRestoredState: !Ref AllowNonRestoredState

您也可以調整 Lambda 的角色以及應用程式本身的角色。

在建立上面的堆疊之前,不要忘記指定參數。

parameters.json

[ { "ParameterKey": "CodeContentBucketArn", "ParameterValue": "YOUR_BUCKET_ARN" }, { "ParameterKey": "CodeContentFileKey", "ParameterValue": "YOUR_JAR" }, { "ParameterKey": "ApplicationRestoreType", "ParameterValue": "SKIP_RESTORE_FROM_SNAPSHOT" }, { "ParameterKey": "AllowNonRestoredState", "ParameterValue": "true" } ]

使用您的特定需求取代 YOUR_BUCKET_ARNYOUR_JAR。您可以按照本指南來建立 Amazon S3 儲存貯體和應用程式 jar。

現在建立堆疊 (使用您選擇的區域,例如 US-east-1,取代 YOUR_REGION):

aws cloudformation create-stack --region YOUR_REGION --template-body "file://stack.yaml" --parameters "file://parameters.json" --stack-name "TestManaged Service for Apache FlinkStack" --capabilities CAPABILITY_NAMED_IAM

現在,您可以導覽到 https://console.aws.amazon.com/cloudformation 並檢視進度。建立 Flink 應用程式後,您應該會看到該應用程式處於 Starting 狀態。可能需要幾分鐘的時間才開始 Running

如需詳細資訊,請參閱下列內容:

隱私權網站條款Cookie 偏好設定
© 2025, Amazon Web Services, Inc.或其附屬公司。保留所有權利。