Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
アプリケーションの作成と実行 (CLI)
このセクションでは、 を使用して Managed Service for Apache Flink アプリケーション AWS Command Line Interface を作成して実行します。kinesisanalyticsv2 AWS CLI コマンドを使用して、Managed Service for Apache Flink アプリケーションを作成して操作します。
許可ポリシーを作成する
注記
アプリケーションのアクセス権限ポリシーとロールを作成する必要があります。これらのIAMリソースを作成しない場合、アプリケーションはデータとログストリームにアクセスできません。
まず、2 つのステートメントを含むアクセス許可ポリシーを作成します。1 つは、ソースストリームの読み取りアクションに対するアクセス許可を付与し、もう 1 つはシンクストリームの書き込みアクションに対するアクセス許可を付与します。次に、ポリシーを IAMロール (次のセクションで作成する) にアタッチします。そのため、 Managed Service for Apache Flinkがこのロールを引き受けると、ソースストリームからの読み取りとシンクストリームへの書き込みを行うために必要なアクセス許可がサービスに付与されます。
次のコードを使用して AKReadSourceStreamWriteSinkStream
アクセス許可ポリシーを作成します。username
を Amazon S3 バケットの作成に使用したユーザー名に置き換え、アプリケーションコードを保存します。Amazon リソースネーム (ARNs) のアカウント ID を自分のアカウント ID (012345678901)
に置き換えます。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-
username
/getting-started-scala-1.0.jar" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" } ] }
アクセス許可ポリシーを作成する step-by-step 手順については、 ユーザーガイドの「チュートリアル: 最初のカスタマー管理ポリシーの作成とアタッチIAM」を参照してください。
IAM ポリシーを作成する
このセクションでは、Managed Service for Apache Flink アプリケーションがソースストリームを読み取ってシンクストリームに書き込むために引き受けることができる IAMロールを作成します。
Apache Flink 用 Managed Service は、許可なしにはストリームにアクセスできません。これらのアクセス許可は、 IAMロールを介して付与します。各IAMロールには 2 つのポリシーがアタッチされています。信頼ポリシーは、ロールを引き受けるための許可を Managed Service for Apache Flink 付与し、許可ポリシーは、ロールを引き受けた後に Managed Service for Apache Flink が実行できる事柄を決定します。
前のセクションで作成したアクセス許可ポリシーをこのロールにアタッチします。
IAM ロールを作成するには
でIAMコンソールを開きますhttps://console.aws.amazon.com/iam/
。 ナビゲーションペインで [ロール] を選択し、続いて [ロールを作成] を選択します。
[信頼されるエンティティの種類を選択] で、[AWS のサービス] を選択します。
[このロールを使用するサービスを選択] で、[Kinesis Analytics] を選択します。
[ユースケースの選択]で、[Managed Service for Apache Flink]を選択します。
[Next: Permissions] (次へ: アクセス許可) を選択します。
[アクセス権限ポリシーをアタッチする] ページで、[Next: Review] (次: 確認) を選択します。ロールを作成した後に、アクセス許可ポリシーをアタッチします。
[Create role (ロールの作成)] ページで、ロールの名前に
MF-stream-rw-role
を入力します。[ロールの作成] を選択します。これで、 という名前の新しいIAMロールが作成されました
MF-stream-rw-role
。次に、ロールの信頼ポリシーとアクセス許可ポリシーを更新します。アクセス許可ポリシーをロールにアタッチします。
注記
この演習では、Managed Service for Apache Flink が、Kinesis データストリーム (ソース) からのデータの読み取りと、別の Kinesis データストリームへの出力の書き込みの両方を実行するためにこのロールを引き受けます。前のステップである「許可ポリシーの作成」で作成したロールを添付します。
[概要] ページで、[アクセス許可] タブを選択します。
[Attach Policies (ポリシーのアタッチ)] を選択します。
検索ボックスに
AKReadSourceStreamWriteSinkStream
(前のセクションで作成したポリシー) と入力します。AKReadSourceStreamWriteSinkStream
ポリシーを選択し、[ポリシーを添付]を選択します。
これで、アプリケーションがリソースにアクセスするために使用するサービスの実行ロールが作成されました。新しいロールARNの を書き留めます。
ロールを作成する step-by-step 手順については、 ユーザーガイドのIAM「ロールの作成 (コンソール)IAM」を参照してください。
アプリケーションの作成
次のJSONコードを という名前のファイルに保存しますcreate_request.json
。サンプルロールを、以前に作成したロールARNの ARNに置き換えます。バケットARNサフィックス (ユーザー名) を、前のセクションで選択したサフィックスに置き換えます。サービス実行ロールのサンプルのアカウント ID (012345678901) を、自分のアカウント ID に置き換えます。
{ "ApplicationName": "getting_started", "ApplicationDescription": "Scala getting started application", "RuntimeEnvironment": "FLINK-1_19", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-
username
", "FileKey": "getting-started-scala-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }, "CloudWatchLoggingOptions": [ { "LogStreamARN": "arn:aws:logs:us-west-2:012345678901
:log-group:MyApplication:log-stream:kinesis-analytics-log-stream" } ] }
次のリクエストCreateApplicationで を実行してアプリケーションを作成します。
aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
これでアプリケーションが作成されました。次のステップでは、アプリケーションを起動します。
アプリケーションを起動する
このセクションでは、 StartApplicationアクションを使用してアプリケーションを起動します。
アプリケーションを起動するには
次のJSONコードを という名前のファイルに保存します
start_request.json
。{ "ApplicationName": "getting_started", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
前述のリクエストを指定して
StartApplication
アクションを実行し、アプリケーションを起動します。aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
アプリケーションが実行されます。Amazon CloudWatch コンソールで Managed Service for Apache Flink メトリクスをチェックして、アプリケーションが動作していることを確認できます。
アプリケーションの停止
このセクションでは、 StopApplicationアクションを使用してアプリケーションを停止します。
アプリケーションを停止するには
次のJSONコードを という名前のファイルに保存します
stop_request.json
。{ "ApplicationName": "s3_sink" }
前述のリクエストを指定して
StopApplication
アクションを実行し、アプリケーションを起動します。aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
アプリケーションが停止します。
CloudWatch ログ記録オプションを追加する
を使用して AWS CLI 、Amazon CloudWatch ログストリームをアプリケーションに追加できます。アプリケーションで CloudWatch ログを使用する方法については、「アプリケーションログ記録のセットアップ」を参照してください。
環境プロパティを更新する
このセクションでは、 UpdateApplicationアクションを使用して、アプリケーションコードを再コンパイルせずにアプリケーションの環境プロパティを変更します。この例では、ソースストリームおよびレプリケート先ストリームのリージョンを変更します。
アプリケーションの環境プロパティを更新します
次のJSONコードを という名前のファイルに保存します
update_properties_request.json
。{ "ApplicationName": "getting_started", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }
前述のリクエストで
UpdateApplication
アクションを実行し、環境プロパティを更新します。aws kinesisanalyticsv2 update-application --cli-input-json file://update_properties_request.json
アプリケーションコードの更新
アプリケーションコードを新しいバージョンのコードパッケージで更新する必要がある場合は、 UpdateApplicationCLIアクションを使用します。
注記
同じファイル名のアプリケーションコードの新しいバージョンをロードするには、新しいオブジェクトバージョンを指定する必要があります。Amazon S3 オブジェクトバージョンを使用する方法の詳細については、「バージョニングの有効化または無効化」を参照してください。
を使用するには AWS CLI、Amazon S3 バケットから以前のコードパッケージを削除し、新しいバージョンをアップロードして、同じ Amazon S3 バケットとオブジェクト名、および新しいオブジェクトバージョンUpdateApplication
を指定して を呼び出します。アプリケーションは新しいコードパッケージで再起動します。
以下の UpdateApplication
アクションのサンプル・リクエストは、アプリケーション・コードを再読み込 み、アプリケーションを再起動します。CurrentApplicationVersionId
を現在のアプリケーションバージョンに更新します。ListApplications
または DescribeApplication
アクションを使用して、現在のアプリケーションバージョンを確認できます。バケット名のサフィックス (「<username>」) を、 依存リソースを作成する セクションで選択したサフィックスで更新します。
{{ "ApplicationName": "getting_started", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::ka-app-code-<username>", "FileKeyUpdate": "getting-started-scala-1.0.jar", "ObjectVersionUpdate": "SAMPLEUehYngP87ex1nzYIGYgfhypvDU" } } } } }