Kinesis Data Streams を使用した Studio ノートブックの作成 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Kinesis Data Streams を使用した Studio ノートブックの作成

このチュートリアルでは、Kinesis Data Stream をソースとして使用する Studio ノートブックを作成する方法について説明します。

前提条件を満たす

Studio ノートブックを作成する前に、Kinesis データストリーム (ExampleInputStream) を作成します。アプリケーションはこのストリームをアプリケーションソースとして使用します。

このストリームは Amazon Kinesis コンソールまたは次の AWS CLI コマンドを使用して作成できます。コンソールでの操作方法については、「Amazon Kinesis Data Streams デベロッパーガイド」 の 「データストリームの作成および更新」 を参照してください。ストリーム ExampleInputStream に名前を付け、[オープンシャード数] を 1 に設定します。

を使用してストリーム (ExampleInputStream) を作成するには AWS CLI、次の Amazon Kinesis create-stream AWS CLI コマンドを使用します。

$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \ --profile adminuser

AWS Glue テーブルを作成する

Studio ノートブックは、Kinesis Data Streams データソースに関するメタデータに「AWS Glue」データベースを使用します。

注記

データベースは最初に手動で作成することも、ノートブックの作成時に Apache Flink 用 Managed Service に自動的に作成させることもできます。同様に、このセクションで説明されているようにテーブルを手動で作成することも、Apache Zeppelin 内のノートブックで Managed Service for Apache Flink のテーブル作成コネクタコードを使用してDDL、ステートメントを介してテーブルを作成することもできます。その後、チェックイン AWS Glue して、テーブルが正しく作成されたことを確認できます。

テーブルを作成する
  1. にサインイン AWS Management Console し、 で AWS Glue コンソールを開きますhttps://console.aws.amazon.com/glue/

  2. AWS Glue データベースをまだお持ちでない場合は、左側のナビゲーションバーからデータベースを選択します。[データベースの追加] を選択します。[データベースの追加] ウィンドウで、[データベース名] に default を入力します。[Create] (作成) を選択します。

  3. 左のナビゲーションバーで、[テーブル] を選択します。「テーブル」ページで、「テーブルを追加」、「テーブルを手動で追加」を選択します。

  4. テーブルのプロパティの設定」ページで、「テーブル名」に stock を入力します。以前に作成したデータベースを選択していることを確認してください。[Next (次へ)] を選択します。

  5. [データストアの追加] ページで、[Kinesis] を選択します。[Stream name](ストリーム名)に ExampleInputStream を入力します。Kinesis ソース URLで、「」を選択しますhttps://kinesis---us-east-1.amazonaws.com.rproxy.goskope.comKinesis ソース URLをコピーして貼り付ける場合は、先頭または末尾のスペースをすべて削除してください。[Next (次へ)] を選択します。

  6. 分類ページで、 を選択しますJSON。[Next (次へ)] を選択します。

  7. スキーマを定義するで、[Add column] を編集して列を追加します。以下のプロパティを持つ列を追加します。

    列名 データ型
    ticker string
    price double

    [Next (次へ)] を選択します。

  8. 次のページで設定を確認し、「終了」を選択します。

  9. テーブルの一覧で、新しく作成したテーブルを選択します。

  10. テーブル編集」を選択し、キー managed-flink.proctime と値 proctime を含むプロパティを追加します。

  11. [Apply] を選択します。

Kinesis Data Streams を使用した Studio ノートブックの作成

アプリケーションで使用するリソースを作成したので、次は Studio ノートブックを作成します。

アプリケーションを作成するには、 AWS Management Console または を使用できます AWS CLI。

を使用して Studio ノートブックを作成する AWS Management Console

  1. https://console.aws.amazon.com/managed-flink/ホームで Managed Service for Apache Flink コンソールを開きます。region=us-east-1#/applications/dashboard

  2. Apache Flink アプリケーション用 Managed Service」ページで、「Studio」タブを選択します。[Studio ノートブックの作成] を選択します。

    注記

    入力 Amazon MSKクラスターMSKまたは Kinesis データストリームを選択し、リアルタイム でデータを処理する を選択することで、Amazon または Kinesis Data Streams コンソールから Studio ノートブックを作成することもできます。

  3. [Studio ノートブックの作成] ページで、次の情報を入力します。

    • ノートブックの名前に「MyNotebook」を入力します。

    • AWS Glue データベース」の「デフォルト」を選択します。

    [Studio ノートブックの作成] を選択します。

  4. MyNotebook ページで、 の実行 を選択します。「ステータス」に「実行中」が表示されるまで待ちます。ノートブックの実行中は料金が発生します。

を使用して Studio ノートブックを作成する AWS CLI

を使用して Studio ノートブックを作成するには AWS CLI、次の手順を実行します。

  1. アカウント ID を確認します。アプリケーションを作成する際にこの値が必要になります。

  2. ロール arn:aws:iam::AccountID:role/ZeppelinRole を作成し、コンソールで自動作成されたロールに以下の権限を追加します。

    "kinesis:GetShardIterator",

    "kinesis:GetRecords",

    "kinesis:ListShards"

  3. create.json というファイルを次の内容で作成します。プレースホルダー値を、ユーザー自身の情報に置き換えます。

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  4. アプリケーションを作成するには、次のコマンドを実行します。

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  5. コマンドが完了すると、新しい Studio ノートブックの詳細を示す出力結果が表示されます。次は出力の例です。

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  6. アプリケーションを起動するには、次のコマンドを実行します。サンプル値をアカウント ID に置き換えます。

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

Kinesis データストリームへのデータ送信

Kinesis データストリームにテストデータを送信するには、次の手順に従います。

  1. Kinesis Data Generator を開きます。

  2. で Cognito ユーザーを作成するを選択します CloudFormation

  3. AWS CloudFormation コンソールが開き、Kinesis Data Generator テンプレートが表示されます。[Next (次へ)] を選択します。

  4. [コンポーネントの詳細の指定] ページで、Cognito ユーザーのユーザー名とパスワードを入力します。[Next (次へ)] を選択します。

  5. [スタックオプションの設定] ページで、[次へ] を選択します。

  6. 「Kinesis-Data-Generator-Cognito-User の確認」ページで、IAMリソースを作成する AWS CloudFormation 可能性のある「I acknowledge」チェックボックスを選択します。[Create Stack] (スタックの作成) を選択します。

  7. AWS CloudFormation スタックの作成が完了するまで待ちます。スタックが完了したら、 AWS CloudFormation コンソールで Kinesis-Data-Generator-Cognito-User スタックを開き、出力タブを選択します。KinesisDataGeneratorUrl 出力値URLの一覧にある を開きます。

  8. [Amazon Kinesis Data Generator] ページで、ステップ 4 で作成した認証情報を使用してログインします。

  9. 次のページで、次の値を入力します。

    リージョン us-east-1
    ストリーム/Firehose ストリーム ExampleInputStream
    レコード/秒 1

    [記録テンプレート] に、次の内容を貼り付けます。

    { "ticker": "{{random.arrayElement( ["AMZN","MSFT","GOOG"] )}}", "price": {{random.number( { "min":10, "max":150 } )}} }
  10. [データ送信] を選択します。

  11. ジェネレータは、Kinesis データストリームにデータを送信します。

    次のセクションを完了する間、ジェネレータを作動させたままにしておきます。

Studio ノートブックをテストします。

このセクションでは、Studio ノートブックを使用して Kinesis データストリームのデータをクエリします。

  1. https://console.aws.amazon.com/managed-flink/ホームで Managed Service for Apache Flink コンソールを開きます。region=us-east-1#/applications/dashboard

  2. [Apache Flink アプリケーション用 Managed Service] ページで、[Studio ノートブック] タブを選択します。を選択しますMyNotebook

  3. MyNotebook ページで、「Apache Zeppelin で開く」を選択します。

    新しいタブで Apache Zeppelin インターフェイスが開きます。

  4. [Zeppelinへようこそ!]ページで [Zeppelin Note] を選択します。

  5. Zeppelin Note」ページで、新しいノートに次のクエリを入力します。

    %flink.ssql(type=update) select * from stock

    実行アイコンを選択します。

    しばらくすると、ノートには Kinesis データストリームのデータが表示されます。

アプリケーションの Apache Flink ダッシュボードを開いて運用面を表示するには、 FLINK JOBを選択します。Flink Dashboard の詳細については、「Managed Service for Apache Flink デベロッパーガイド」の「Apache Flink ダッシュボード」を参照してください。

Flink Streaming SQLクエリのその他の例については、Apache Flink ドキュメントの「クエリ」を参照してください。