

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon MSK による Studio ノートブックの作成
<a name="example-notebook-msk"></a>

このチュートリアルでは、Amazon MSK クラスターをソースとして使用する Studio ノートブックを作成する方法について説明します。

**Topics**
+ [Amazon MSK クラスターのセットアップ](#example-notebook-msk-setup)
+ [VPC に NAT ゲートウェイを追加する](#example-notebook-msk-nat)
+ [AWS Glue 接続とテーブルを作成する](#example-notebook-msk-glue)
+ [Amazon MSK による Studio ノートブックの作成](#example-notebook-msk-create)
+ [Amazon MSK クラスターにデータを送信します。](#example-notebook-msk-send)
+ [Studio ノートブックをテストします。](#example-notebook-msk-test)

## Amazon MSK クラスターのセットアップ
<a name="example-notebook-msk-setup"></a>

このチュートリアルでは、プレーンテキストでアクセスできる Amazon MSK クラスターが必要です。Amazon MSK クラスターをまだセットアップしていない場合は、「[Amazon MSK の使用入門](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)」チュートリアルに従って、Amazon VPC、Amazon MSK クラスター、トピック、および Amazon EC2 クライアントインスタンスを作成してください。

チュートリアルを実行するときは、以下の手順を実行します。
+ 「[ステップ 3: Amazon MSK クラスターを作成する](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html)」のステップ 4 で、 `ClientBroker` 値を `TLS` から **PLAINTEXT** に変更します。

## VPC に NAT ゲートウェイを追加する
<a name="example-notebook-msk-nat"></a>

「[Amazon MSK の使用入門](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)」チュートリアルに従って Amazon MSK クラスターを作成した場合、または既存の Amazon VPC にプライベートサブネット用の NAT ゲートウェイがまだない場合は、Amazon VPC に NAT ゲートウェイを追加する必要があります。アーキテクチャを次の図に示します。

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/images/vpc_05.png)


Amazon VPC の NAT ゲートウェイを作成するには、次の手順を実行します。

1. Amazon VPC コンソール ([https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/)) を開きます。

1. 左のナビゲーションバーから、[**NAT ゲートウェイ**] を選択します。

1. 「**NAT ゲートウェイ**」ページで「**NAT ゲートウェイの作成**」を選択します。

1. [**NAT ゲートウェイの作成**] ページで、以下の値を入力します。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/example-notebook-msk.html)

   **[Create NAT Gateway]** (NAT ゲートウェイの作成) を選択します。

1. 左のナビゲーションバーで、**[ルートテーブル ]** を選択します。

1. [**ルートテーブルの作成**] を選択します。

1. [**ルートテーブルの作成**] ページで、以下の情報を指定します。
   + **名前タグ:** **ZeppelinRouteTable**
   + 「**VPC**」: 自分の VPC (例:「**AWS KafkaTutorialVPC**」)を選択します。

   [**Create**] (作成) を選択します。

1. ルートテーブルのリストから「**ZeppelinRouteTable**」を選択します。[**ルート**] タブを選択し、[**ルート編集**] を選択します。

1. **[ルートの編集]** ページで、**[ルートの追加]** を選択します。

1. ******[送信先]** に「**0.0.0.0/0**」と入力します。「**Target**」には「**NAT ゲートウェイ**」、「**ZeppelinGateway**」。[**ルートの保存**] を選択します。[**閉じる**] を選択します。

1. 「ルートテーブル」ページで「**ZeppelinRouteTable**」を選択した状態で、「**サブネット関連付け**」タブを選択します。「**サブネット関連付けの編集**」を選択します。

1. 「**サブネット関連付けの編集**」ページで、「**AWS KafkaTutorialSubnet2**」と「**AWS KafkaTutorialSubnet3**」を選択します。[**Save**] を選択します。

## AWS Glue 接続とテーブルを作成する
<a name="example-notebook-msk-glue"></a>

Studio ノートブックは、Amazon MSK データソースに関するメタデータ用の「[AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)」データベースを使用します。このセクションでは、Amazon MSK クラスターにアクセスする方法を説明する AWS Glue 接続と、Studio ノートブックなどのクライアントにデータソース内のデータを表示する方法を説明する AWS Glue テーブルを作成します。

**接続を作成する**

1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) で AWS Glue コンソールを開きます。

1.  AWS Glue データベースがまだない場合は、左側のナビゲーションバーから**データベース**を選択します。**[データベースの追加]** を選択します。[**データベースの追加**] ウィンドウで、[**データベース名**] に **default** を入力します。[**Create**] (作成) を選択します。

1. 左のナビゲーションバーから、[**接続**]を選択します。[**接続の追加**] を選択します。

1. 「**接続を追加**」ウィンドウで、次の値を入力します。
   + **[接続名]** に、**ZeppelinConnection** と入力します。
   + [**接続タイプ**] で、[**Kafka**] を選択します。
   + 「**Kafka ブートストラップサーバー URL**」には、クラスターのブートストラップブローカーの文字列を指定します。ブートストラップブローカーは、MSK コンソールから、または次の CLI コマンドを入力して取得できます。

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + 「**SSL 接続が必要**」チェックボックスをオフにします。

   [**Next (次へ)**] を選択します。

1. **[VPC]** ページで、次の値を入力します。
   + **VPC** の場合は、VPC の名前 (** AWS KafkaTutorialVPC** など) を選択します。
   + 「**サブネット**」には、「**AWS KafkaTutorialSubnet2**」を選択します。
   + 「**セキュリティグループ**」では、使用可能なすべてのグループを選択します。

   [**Next (次へ)**] を選択します。

1. 「**接続プロパティ**」/「**接続アクセス**」ページで 「**完了**」を選択します。

**テーブルを作成する**
**注記**  
次の手順で説明するように手動でテーブルを作成することも、Apache Zeppelin 内のノートブックにある Apache Flink 用 Managed Service のテーブル作成コネクタコードを使用して DDL ステートメントでテーブルを作成することもできます。その後、チェックイン AWS Glue して、テーブルが正しく作成されたことを確認できます。

1. 左のナビゲーションバーで、[**テーブル**] を選択します。「**テーブル**」ページで、「**テーブルを追加**」、「**テーブルを手動で追加**」を選択します。

1. 「**テーブルのプロパティの設定**」ページで、「**テーブル名**」に **stock** を入力します。以前に作成したデータベースを選択していることを確認してください。[**Next (次へ)**] を選択します。

1. 「**データストアの追加**」ページで「**Kafka**」を選択します。トピック名には、「**トピック名**」 (「**AWS KafkaTutorialTopic**」など) を入力します。「**接続**」には「**ZeppelinConnection**」を選択します。

1. 「**分類**」ページで「**JSON**」を選択します。[**Next (次へ)**] を選択します。

1. **スキーマを定義する**で、[Add column] を編集して列を追加します。以下のプロパティを持つ列を追加します。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/example-notebook-msk.html)

   [**Next (次へ)**] を選択します。

1. 次のページで設定を確認し、「**終了**」を選択します。

1. テーブルの一覧で、新しく作成したテーブルを選択します。

1. **テーブルの編集** を選択し、次のプロパティを追加します。
   + キー: `managed-flink.proctime`、値: `proctime`
   + キー: `flink.properties.group.id`、値: `test-consumer-group`
   + キー: `flink.properties.auto.offset.reset`、値: `latest`
   + キー: `classification`、値: `json`

   これらのキーと値のペアがないと、Flink ノートブックはエラーになります。

1. [**Apply**] を選択します。

## Amazon MSK による Studio ノートブックの作成
<a name="example-notebook-msk-create"></a>

アプリケーションで使用するリソースを作成したので、次は Studio ノートブックを作成します。

**Topics**
+ [を使用して Studio ノートブックを作成する AWS マネジメントコンソール](#example-notebook-create-msk-console)
+ [を使用して Studio ノートブックを作成する AWS CLI](#example-notebook-msk-create-api)

**注記**  
Amazon MSK コンソールから既存のクラスターを選択し、「**データをリアルタイムで処理**」を選択することで Studio ノートブックを作成することもできます。

### を使用して Studio ノートブックを作成する AWS マネジメントコンソール
<a name="example-notebook-create-msk-console"></a>

1. 「[https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)」にある Apache Flink コンソール用 Managed Service を開きます。

1. 「**Apache Flink アプリケーション用 Managed Service**」ページで、「**Studio**」タブを選択します。「**Studio ノートブックの作成**」を選択します。
**注記**  
Amazon MSK または Kinesis Data Streams コンソールから Studio ノートブックを作成するには、入力の Amazon MSK クラスターまたは Kinesis データストリームを選択し、「**データをリアルタイムで処理**」を選択します。

1. [**Studio ノートブックの作成**] ページで、次の情報を入力します。
   + 「**Studio ノートブック名**」に **MyNotebook** を入力します。
   + 「**AWS Glue データベース**」の「**デフォルト**」を選択します。

   「**Studio ノートブックの作成**」を選択します。

1. 「**MyNotebook**」ページで、「**構成**」タブを選択します。「**Networking**」セクションで、「**編集**」を選択します。

1. 「**MyNotebook のネットワークの編集**」ページで、「**Amazon MSK クラスターに基づく VPC 設定**」を選択します。「**Amazon MSK クラスター**」には Amazon MSK クラスターを選択します。[**Save changes**] (変更の保存) をクリックします。

1. 「**MyNotebook**」ページで、「**実行**」を選択します。「**ステータス**」に「**実行中**」が表示されるまで待ちます。

### を使用して Studio ノートブックを作成する AWS CLI
<a name="example-notebook-msk-create-api"></a>

を使用して Studio ノートブックを作成するには AWS CLI、次の手順を実行します。

1. 次の情報があることを確認します。アプリケーションを作成するにはこれらの値が必要です。
   +  アカウント ID。
   + Amazon MSK クラスターを含む Amazon VPC 用のサブネット ID やセキュリティグループ ID。

1. `create.json` というファイルを次の内容で作成します。プレースホルダー値を、ユーザー自身の情報に置き換えます。

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. アプリケーションを作成するには、次のコマンドを実行します。

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. コマンドが完了すると、次のような出力が表示され、新しい Studio ノートブックの詳細が表示されます。

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. アプリケーションを起動するには、次のコマンドを実行します。サンプル値をアカウント ID に置き換えます。

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Amazon MSK クラスターにデータを送信します。
<a name="example-notebook-msk-send"></a>

このセクションでは、Amazon EC2 クライアントで Python スクリプトを実行して Amazon MSK データソースにデータを送信します。

1. Amazon EC2 クライアントに接続します。

1. 以下のコマンドを実行して Python バージョン 3、Pip、および Kafka for Python パッケージをインストールし、アクションを確認します。

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. 次のコマンドを入力して、クライアントマシン AWS CLI で を設定します。

   ```
   aws configure
   ```

   アカウントの認証情報と **us-east-1** を `region` に入力します。

1. `stock.py` というファイルを次の内容で作成します。サンプル値を Amazon MSK クラスターのブートストラップブローカー文字列に置き換え、トピックが「**AWS KafkaTutorialTopic**」でない場合はトピック名を更新します。

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. 次のコマンドを使用してスクリプトを実行します。

   ```
   $ python3 stock.py
   ```

1. 以下のセクションを実行している間は、スクリプトを実行したままにしておきます。

## Studio ノートブックをテストします。
<a name="example-notebook-msk-test"></a>

このセクションでは、Studio ノートブックを使用して Amazon MSK クラスターのデータをクエリします。

1. 「[https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)」にある Apache Flink 用 Managed Serviceコンソールを開きます。

1. [**Apache Flink アプリケーション用 Managed Service**] ページで、[**Studio ノートブック**] タブを選択します。「**MyNotebook**」を選択します。

1. 「**MyNotebook**」ページで、[**Apache Zeppelin で開く**] を選択します。

   新しいタブで Apache Zeppelin インターフェイスが開きます。

1. 「**Zeppelinへようこそ！**」でページで「**Zeppelinの新ノート**」を選択します。

1. 「**Zeppelin Note**」ページで、新しいノートに次のクエリを入力します。

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   実行アイコンを選択します。

   アプリケーションは Amazon MSK クラスターのデータを表示します。

アプリケーションの Apache Flink ダッシュボードを開いて運用状況を表示するには、「**FLINK JOB**」を選択します。Flink Dashboard の詳細については、[「Managed Service for Apache Flink デベロッパーガイド」](https://docs.aws.amazon.com/)の「[Apache Flink ダッシュボード](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)」を参照してください。

Flink ストリーミング SQL クエリの他の例については、「[Apache Flink ドキュメント](https://nightlies.apache.org/flink/flink-docs-release-1.15/)」の「[クエリ](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)」を参照してください。