Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
下列練習示範如何在相同堆疊中使用 Lambda 函數啟動 AWS CloudFormation 使用 建立的 Flink 應用程式。
開始之前
開始本練習之前,請遵循 AWS CloudFormation 在 AWS::KinesisAnalytics::Application 使用 建立 Flink 應用程式的步驟。
撰寫 Lambda 函數
在建立或更新 Flink 應用程式後,若要啟動它,可以使用 kinesisanalyticsv2 start-application API。Flink 應用程式建立後, AWS CloudFormation 事件會觸發呼叫。在本練習稍後部分,我們將討論如何設定堆疊以觸發 Lambda 函數,但我們先專注於 Lambda 函數宣告及其程式碼。我們在本範例中使用 Python3.8
執行期。
StartApplicationLambda:
Type: AWS::Lambda::Function
DependsOn: StartApplicationLambdaRole
Properties:
Description: Starts an application when invoked.
Runtime: python3.8
Role: !GetAtt StartApplicationLambdaRole.Arn
Handler: index.lambda_handler
Timeout: 30
Code:
ZipFile: |
import logging
import cfnresponse
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info('Incoming CFN event {}'.format(event))
try:
application_name = event['ResourceProperties']['ApplicationName']
# filter out events other than Create or Update,
# you can also omit Update in order to start an application on Create only.
if event['RequestType'] not in ["Create", "Update"]:
logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType']))
cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
return
# use kinesisanalyticsv2 API to start an application.
client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
# get application status.
describe_response = client_kda.describe_application(ApplicationName=application_name)
application_status = describe_response['ApplicationDetail']['ApplicationStatus']
# an application can be started from 'READY' status only.
if application_status != 'READY':
logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status))
cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
return
# create RunConfiguration.
run_configuration = {
'ApplicationRestoreConfiguration': {
'ApplicationRestoreType': 'RESTORE_FROM_LATEST_SNAPSHOT',
}
}
logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration))
# this call doesn't wait for an application to transfer to 'RUNNING' state.
client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
logger.info('Started Application: {}'.format(application_name))
cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
except Exception as err:
logger.error(err)
cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
在上述程式碼中,Lambda 會處理傳入 AWS CloudFormation 事件、篩選除 Create
和 之外的所有內容Update
、取得應用程式狀態,並在狀態為 時啟動它READY
。若要取得應用程式狀態,您必須建立 Lambda 角色,如下所示。
建立 Lambda 角色
您可以為 Lambda 建立角色,以便與應用程式成功「通話」並寫入日誌。此角色使用預設的受管政策,但您可能想要將其縮小為使用自訂政策。
StartApplicationLambdaRole:
Type: AWS::IAM::Role
DependsOn: TestFlinkApplication
Properties:
Description: A role for lambda to use while interacting with an application.
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
- arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
Path: /
請注意,Lambda 資源將在建立 Flink 應用程式之後在同一堆疊中建立,因為它們依賴於它。
叫用 Lambda 函數
現在剩下要做的就是調用 Lambda 函數。您可以使用自訂資源來執行此操作。
StartApplicationLambdaInvoke:
Description: Invokes StartApplicationLambda to start an application.
Type: AWS::CloudFormation::CustomResource
DependsOn: StartApplicationLambda
Version: "1.0"
Properties:
ServiceToken: !GetAtt StartApplicationLambda.Arn
Region: !Ref AWS::Region
ApplicationName: !Ref TestFlinkApplication
以上是使用 Lambda 啟動 Flink 應用程式所需的一切。您現在可以建立自己的堆疊,也可以使用下面的完整範例來查看所有這些步驟的實際運作方式。
檢閱延伸範例
下列範例是先前步驟的略微擴充版本,並透過範本參數完成額外的RunConfiguration
調整。這是一個工作堆疊供您嘗試。請務必閱讀隨附的注意事項:
stack.yaml
Description: 'kinesisanalyticsv2 CloudFormation Test Application'
Parameters:
ApplicationRestoreType:
Description: ApplicationRestoreConfiguration option, can be SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT or RESTORE_FROM_CUSTOM_SNAPSHOT.
Type: String
Default: SKIP_RESTORE_FROM_SNAPSHOT
AllowedValues: [ SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT, RESTORE_FROM_CUSTOM_SNAPSHOT ]
SnapshotName:
Description: ApplicationRestoreConfiguration option, name of a snapshot to restore to, used with RESTORE_FROM_CUSTOM_SNAPSHOT ApplicationRestoreType.
Type: String
Default: ''
AllowNonRestoredState:
Description: FlinkRunConfiguration option, can be true or false.
Default: true
Type: String
AllowedValues: [ true, false ]
CodeContentBucketArn:
Description: ARN of a bucket with application code.
Type: String
CodeContentFileKey:
Description: A jar filename with an application code inside a bucket.
Type: String
Conditions:
IsSnapshotNameEmpty: !Equals [ !Ref SnapshotName, '' ]
Resources:
TestServiceExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- kinesisanlaytics.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonKinesisFullAccess
- arn:aws:iam::aws:policy/AmazonS3FullAccess
Path: /
InputKinesisStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1
OutputKinesisStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1
TestFlinkApplication:
Type: 'AWS::kinesisanalyticsv2::Application'
Properties:
ApplicationName: 'CFNTestFlinkApplication'
ApplicationDescription: 'Test Flink Application'
RuntimeEnvironment: 'FLINK-1_18'
ServiceExecutionRole: !GetAtt TestServiceExecutionRole.Arn
ApplicationConfiguration:
EnvironmentProperties:
PropertyGroups:
- PropertyGroupId: 'KinesisStreams'
PropertyMap:
INPUT_STREAM_NAME: !Ref InputKinesisStream
OUTPUT_STREAM_NAME: !Ref OutputKinesisStream
AWS_REGION: !Ref AWS::Region
FlinkApplicationConfiguration:
CheckpointConfiguration:
ConfigurationType: 'CUSTOM'
CheckpointingEnabled: True
CheckpointInterval: 1500
MinPauseBetweenCheckpoints: 500
MonitoringConfiguration:
ConfigurationType: 'CUSTOM'
MetricsLevel: 'APPLICATION'
LogLevel: 'INFO'
ParallelismConfiguration:
ConfigurationType: 'CUSTOM'
Parallelism: 1
ParallelismPerKPU: 1
AutoScalingEnabled: True
ApplicationSnapshotConfiguration:
SnapshotsEnabled: True
ApplicationCodeConfiguration:
CodeContent:
S3ContentLocation:
BucketARN: !Ref CodeContentBucketArn
FileKey: !Ref CodeContentFileKey
CodeContentType: 'ZIPFILE'
StartApplicationLambdaRole:
Type: AWS::IAM::Role
DependsOn: TestFlinkApplication
Properties:
Description: A role for lambda to use while interacting with an application.
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
- arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
Path: /
StartApplicationLambda:
Type: AWS::Lambda::Function
DependsOn: StartApplicationLambdaRole
Properties:
Description: Starts an application when invoked.
Runtime: python3.8
Role: !GetAtt StartApplicationLambdaRole.Arn
Handler: index.lambda_handler
Timeout: 30
Code:
ZipFile: |
import logging
import cfnresponse
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info('Incoming CFN event {}'.format(event))
try:
application_name = event['ResourceProperties']['ApplicationName']
# filter out events other than Create or Update,
# you can also omit Update in order to start an application on Create only.
if event['RequestType'] not in ["Create", "Update"]:
logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType']))
cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
return
# use kinesisanalyticsv2 API to start an application.
client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
# get application status.
describe_response = client_kda.describe_application(ApplicationName=application_name)
application_status = describe_response['ApplicationDetail']['ApplicationStatus']
# an application can be started from 'READY' status only.
if application_status != 'READY':
logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status))
cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
return
# create RunConfiguration from passed parameters.
run_configuration = {
'FlinkRunConfiguration': {
'AllowNonRestoredState': event['ResourceProperties']['AllowNonRestoredState'] == 'true'
},
'ApplicationRestoreConfiguration': {
'ApplicationRestoreType': event['ResourceProperties']['ApplicationRestoreType'],
}
}
# add SnapshotName to RunConfiguration if specified.
if event['ResourceProperties']['SnapshotName'] != '':
run_configuration['ApplicationRestoreConfiguration']['SnapshotName'] = event['ResourceProperties']['SnapshotName']
logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration))
# this call doesn't wait for an application to transfer to 'RUNNING' state.
client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
logger.info('Started Application: {}'.format(application_name))
cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
except Exception as err:
logger.error(err)
cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
StartApplicationLambdaInvoke:
Description: Invokes StartApplicationLambda to start an application.
Type: AWS::CloudFormation::CustomResource
DependsOn: StartApplicationLambda
Version: "1.0"
Properties:
ServiceToken: !GetAtt StartApplicationLambda.Arn
Region: !Ref AWS::Region
ApplicationName: !Ref TestFlinkApplication
ApplicationRestoreType: !Ref ApplicationRestoreType
SnapshotName: !Ref SnapshotName
AllowNonRestoredState: !Ref AllowNonRestoredState
您也可以調整 Lambda 的角色以及應用程式本身的角色。
在建立上面的堆疊之前,不要忘記指定參數。
parameters.json
[
{
"ParameterKey": "CodeContentBucketArn",
"ParameterValue": "YOUR_BUCKET_ARN"
},
{
"ParameterKey": "CodeContentFileKey",
"ParameterValue": "YOUR_JAR"
},
{
"ParameterKey": "ApplicationRestoreType",
"ParameterValue": "SKIP_RESTORE_FROM_SNAPSHOT"
},
{
"ParameterKey": "AllowNonRestoredState",
"ParameterValue": "true"
}
]
使用您的特定需求取代 YOUR_BUCKET_ARN
和 YOUR_JAR
。您可以按照本指南來建立 Amazon S3 儲存貯體和應用程式 jar。
現在建立堆疊 (使用您選擇的區域,例如 US-east-1,取代 YOUR_REGION):
aws cloudformation create-stack --region YOUR_REGION --template-body "file://stack.yaml" --parameters "file://parameters.json" --stack-name "TestManaged Service for Apache FlinkStack" --capabilities CAPABILITY_NAMED_IAM
現在,您可以導覽到 https://console.aws.amazon.com/cloudformationStarting
狀態。可能需要幾分鐘的時間才開始 Running
。
如需詳細資訊,請參閱下列內容: