Amazon で Studio ノートブックを作成する MSK - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon で Studio ノートブックを作成する MSK

このチュートリアルでは、Amazon MSKクラスターをソースとして使用する Studio ノートブックを作成する方法について説明します。

Amazon MSKクラスターをセットアップする

このチュートリアルでは、プレーンテキストアクセスを許可する Amazon MSKクラスターが必要です。Amazon MSKクラスターをまだセットアップしていない場合は、Amazon の使用開始MSKチュートリアルに従って、Amazon VPC、Amazon MSKクラスター、トピック、および Amazon EC2クライアントインスタンスを作成します。

チュートリアルを実行するときは、以下の手順を実行します。

にNATゲートウェイを追加する VPC

Amazon の使用開始のチュートリアルに従って Amazon MSKMSKクラスターを作成した場合、または既存の Amazon にプライベートサブネット用のNATゲートウェイがまだVPCない場合は、Amazon にNATゲートウェイを追加する必要がありますVPC。アーキテクチャを次の図に示します。

AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.

Amazon のNATゲートウェイを作成するにはVPC、次の手順を実行します。

  1. で Amazon VPCコンソールを開きますhttps://console.aws.amazon.com/vpc/

  2. 左側のナビゲーションバーからNATゲートウェイを選択します。

  3. NAT ゲートウェイ ページで、NATゲートウェイの作成 を選択します。

  4. NAT ゲートウェイの作成ページで、次の値を指定します。

    名前 - オプション ZeppelinGateway
    サブネット AWS KafkaTutorialSubnet1
    Elastic IP 割り当て ID 使用可能な Elastic IP を選択します。IPs 使用可能な Elastic がない場合は、Elastic IP の割り当て を選択し、コンソールが作成するエレーシック IP を選択します。

    NAT ゲートウェイの作成 を選択します

  5. 左のナビゲーションバーで、[ルートテーブル ] を選択します。

  6. [ルートテーブルの作成] を選択します。

  7. [ルートテーブルの作成] ページで、以下の情報を指定します。

    • 名前タグ: ZeppelinRouteTable

    • VPC: を選択します VPC (例: AWS KafkaTutorialVPC)。

    [Create] (作成) を選択します。

  8. ルートテーブルのリストで、 を選択しますZeppelinRouteTable。[ルート] タブを選択し、[ルート編集] を選択します。

  9. [ルートの編集] ページで、[ルートの追加] を選択します。

  10. [送信先] に「0.0.0.0/0」と入力します。ターゲット でNATゲートウェイ 、 を選択しますZeppelinGateway。[ルートの保存] を選択します。[閉じる] を選択します。

  11. ルートテーブルページで、 ZeppelinRouteTable を選択して、サブネットの関連付けタブを選択します。「サブネット関連付けの編集」を選択します。

  12. 「サブネットの関連付けの編集」ページで、AWS KafkaTutorialSubnet2AWS KafkaTutorialSubnet3 を選択します。[Save] を選択します。

AWS Glue 接続とテーブルを作成する

Studio ノートブックは、Amazon MSKデータソースに関するメタデータに AWS Glue データベースを使用します。このセクションでは、Amazon MSKクラスターへのアクセス方法を説明する AWS Glue 接続と、Studio ノートブックなどのクライアントにデータソース内のデータを表示する方法を説明する AWS Glue テーブルを作成します。

接続を作成する
  1. にサインイン AWS Management Console し、 で AWS Glue コンソールを開きますhttps://console.aws.amazon.com/glue/

  2. AWS Glue データベースをまだお持ちでない場合は、左側のナビゲーションバーからデータベースを選択します。[データベースの追加] を選択します。[データベースの追加] ウィンドウで、[データベース名] に default を入力します。[Create] (作成) を選択します。

  3. 左のナビゲーションバーから、[接続]を選択します。[接続の追加] を選択します。

  4. 接続を追加」ウィンドウで、次の値を入力します。

    • [接続名] に、ZeppelinConnection と入力します。

    • [接続タイプ] で、[Kafka] を選択します。

    • Kafka ブートストラップサーバー URLsには、クラスターのブートストラップブローカー文字列を指定します。ブートストラップブローカーは、MSKコンソールから取得するか、次のCLIコマンドを入力して取得できます。

      aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
    • SSL 「接続が必要」チェックボックスのチェックを解除します。

    [Next (次へ)] を選択します。

  5. VPC ページで、次の値を指定します。

    • にはVPC、 の名前を選択します VPC (例: ) AWS KafkaTutorialVPC

    • サブネット でAWS KafkaTutorialSubnet2 を選択します。

    • セキュリティグループ」では、使用可能なすべてのグループを選択します。

    [Next (次へ)] を選択します。

  6. 接続プロパティ」/「接続アクセス」ページで 「完了」を選択します。

テーブルを作成する
注記

次の手順で説明するようにテーブルを手動で作成することも、Apache Zeppelin 内のノートブックで Managed Service for Apache Flink のテーブルコネクタ作成コードを使用してDDL、ステートメントを介してテーブルを作成することもできます。その後、チェックイン AWS Glue して、テーブルが正しく作成されたことを確認できます。

  1. 左のナビゲーションバーで、[テーブル] を選択します。「テーブル」ページで、「テーブルを追加」、「テーブルを手動で追加」を選択します。

  2. テーブルのプロパティの設定」ページで、「テーブル名」に stock を入力します。以前に作成したデータベースを選択していることを確認してください。[Next (次へ)] を選択します。

  3. データストアの追加」ページで「Kafka」を選択します。トピック名 には、トピック名 (例: AWS KafkaTutorialTopic) を入力します。接続 で、 を選択しますZeppelinConnection

  4. 分類ページで、 を選択しますJSON。[Next (次へ)] を選択します。

  5. スキーマを定義するで、[Add column] を編集して列を追加します。以下のプロパティを持つ列を追加します。

    列名 データ型
    ticker string
    price double

    [Next (次へ)] を選択します。

  6. 次のページで設定を確認し、「終了」を選択します。

  7. テーブルの一覧で、新しく作成したテーブルを選択します。

  8. テーブルの編集 を選択し、次のプロパティを追加します。

    • キー: managed-flink.proctime、値: proctime

    • キー: flink.properties.group.id、値: test-consumer-group

    • キー: flink.properties.auto.offset.reset、値: latest

    • キー: classification、値: json

    これらのキーと値のペアがないと、Flink ノートブックはエラーになります。

  9. [Apply] を選択します。

Amazon で Studio ノートブックを作成する MSK

アプリケーションで使用するリソースを作成したので、次は Studio ノートブックを作成します。

アプリケーションは、 AWS Management Console または を使用して作成できます AWS CLI。
注記

既存のクラスターを選択し、リアルタイム でデータを処理する を選択することで、Amazon MSKコンソールから Studio ノートブックを作成することもできます。

を使用して Studio ノートブックを作成する AWS Management Console

  1. https://console.aws.amazon.com/managed-flink/ホームで Managed Service for Apache Flink コンソールを開きます。region=us-east-1#/applications/dashboard

  2. Apache Flink アプリケーション用 Managed Service」ページで、「Studio」タブを選択します。[Studio ノートブックの作成] を選択します。

    注記

    Amazon MSKまたは Kinesis Data Streams コンソールから Studio ノートブックを作成するには、入力 Amazon MSKクラスターまたは Kinesis データストリームを選択し、リアルタイム でデータを処理するを選択します

  3. [Studio ノートブックの作成] ページで、次の情報を入力します。

    • Studio ノートブック名」に MyNotebook を入力します。

    • AWS Glue データベース」の「デフォルト」を選択します。

    [Studio ノートブックの作成] を選択します。

  4. MyNotebook ページで、設定タブを選択します。「Networking」セクションで、「編集」を選択します。

  5. 「ネットワークの編集 MyNotebook」ページで、VPCAmazon MSKクラスター に基づいて設定を選択します。Amazon MSKクラスター の Amazon MSK クラスターを選択します。[Save changes] (変更の保存) をクリックします。

  6. MyNotebook ページで、「 を実行」を選択します。「ステータス」に「実行中」が表示されるまで待ちます。

を使用して Studio ノートブックを作成する AWS CLI

を使用して Studio ノートブックを作成するには AWS CLI、次の手順を実行します。

  1. 次の情報があることを確認します。アプリケーションを作成するにはこれらの値が必要です。

    • アカウント ID。

    • Amazon MSKクラスターVPCを含む Amazon のサブネットIDsとセキュリティグループ ID。

  2. create.json というファイルを次の内容で作成します。プレースホルダー値を、ユーザー自身の情報に置き換えます。

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  3. アプリケーションを作成するには、次のコマンドを実行します。

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  4. コマンドが完了すると、次のような出力が表示され、新しい Studio ノートブックの詳細が表示されます。

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  5. アプリケーションを起動するには、次のコマンドを実行します。サンプル値をアカウント ID に置き換えます。

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

Amazon MSKクラスターにデータを送信する

このセクションでは、Amazon EC2クライアントで Python スクリプトを実行して、Amazon MSKデータソースにデータを送信します。

  1. Amazon EC2クライアントに接続します。

  2. 以下のコマンドを実行して Python バージョン 3、Pip、および Kafka for Python パッケージをインストールし、アクションを確認します。

    sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
  3. 次のコマンドを入力して、クライアントマシン AWS CLI で を設定します。

    aws configure

    アカウントの認証情報と us-east-1region に入力します。

  4. stock.py というファイルを次の内容で作成します。サンプル値を Amazon MSKクラスターのブートストラップブローカー文字列に置き換え、トピックが でない場合はトピック名を更新しますAWS KafkaTutorialTopic

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  5. 次のコマンドを使用してスクリプトを実行します。

    $ python3 stock.py
  6. 以下のセクションを実行している間は、スクリプトを実行したままにしておきます。

Studio ノートブックをテストします。

このセクションでは、Studio ノートブックを使用して Amazon MSKクラスターからデータをクエリします。

  1. https://console.aws.amazon.com/managed-flink/ホームで Managed Service for Apache Flink コンソールを開きます。region=us-east-1#/applications/dashboard

  2. [Apache Flink アプリケーション用 Managed Service] ページで、[Studio ノートブック] タブを選択します。を選択しますMyNotebook

  3. MyNotebook ページで、「Apache Zeppelin で開く」を選択します。

    新しいタブで Apache Zeppelin インターフェイスが開きます。

  4. Zeppelinへようこそ!」でページで「Zeppelinの新ノート」を選択します。

  5. Zeppelin Note」ページで、新しいノートに次のクエリを入力します。

    %flink.ssql(type=update) select * from stock

    実行アイコンを選択します。

    アプリケーションは Amazon MSKクラスターからのデータを表示します。

アプリケーションの Apache Flink ダッシュボードを開いて運用面を表示するには、 FLINK JOBを選択します。Flink Dashboard の詳細については、「Managed Service for Apache Flink デベロッパーガイド」の「Apache Flink ダッシュボード」を参照してください。

Flink Streaming SQLクエリのその他の例については、Apache Flink ドキュメントの「クエリ」を参照してください。