Neptune Streams の使用 - Amazon Neptune

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Neptune Streams の使用

Neptune Streams 機能を使用すると、グラフデータに加えられたすべての変更を記録する、変更ログエントリの完全なシーケンスを生成できます。この機能の概要については、「Neptune Streams を使用してグラフ変更をリアルタイムでキャプチャする」を参照してください。

Neptune Streams の有効化

neptune_streams DB クラスターパラメータを設定することで、いつでも Neptune Streams を有効または無効にできます。パラメータを 1 に設定すると Streams が有効になり、0 に設定すると Streams が無効になります。

注記

neptune_streams DB クラスターパラメータを変更した後、変更を有効にするには、クラスター内のすべての DB インスタンスを再起動する必要があります。

neptune_streams_expiry_days DB クラスターパラメータを設定して、ストリームレコードが削除されるまでにサーバー上に残る日数 (1 日から 90 日まで) を制御できます。デフォルトは 7 です。

Neptune Streams は、当初、DB クラスター neptune_lab_mode パラメータを使用してラボモードで有効または無効にする実験機能として導入されました (「Neptune ラボモード」を参照)。現在、ラボモードを使用した Streams の有効化は非推奨で、将来無効化される予定です。

Neptune Streams の無効化

Neptune Streams は、実行中であればいつでも無効にできます。

Streams をオフにするには、neptune_streams パラメータの値が 0 に設定されるように DB クラスターパラメータグループを更新します。

重要

Streams がオフになるとすぐに、変更ログデータにアクセスできなくなります。Streams をオフにするに、関心のある内容を必ずお読みください。

Neptune Streams REST API の呼び出し

Neptune Streams にアクセスするには、次のいずれかのローカルエンドポイントに HTTP GET リクエストを送信する REST API を使用します。

  • SPARQL グラフ DB の場合: https://Neptune-DNS:8182/sparql/stream

  • Gremlin または openCypher グラフ DB の場合: https://Neptune-DNS:8182/propertygraph/stream または https://Neptune-DNS:8182/pg/stream

注記

エンジンリリース 1.1.0.0 現在、Gremlin ストリームエンドポイント (https://Neptune-DNS:8182/gremlin/stream) は、関連する出力形式 (GREMLIN_JSON) とともに非推奨です。下位互換性のために引き続きサポートされていますが、将来のリリースで削除される可能性があります。

HTTP GET オペレーションのみが許可されます。

Neptune は、レスポンスの gzip 圧縮をサポートします。ただし、HTTP リクエストに、受け入れられた圧縮形式として gzip を指定する Accept-Encoding ヘッダーが含まれていることが条件です (つまり、"Accept-Encoding: gzip")。

パラメータ
  • limit - long、オプション。範囲:1 ~ 100,000。デフォルト: 10

    返すレコードの最大数を指定します。また、レスポンスのサイズ制限は 10 MB であり、これは変更できず、limit パラメータで指定されたレコード数よりも優先されます。10 MB の制限に達した場合、レスポンスにはしきい値超過レコードが含まれます。

  • iteratorType - 文字列、オプション。

    このパラメータには以下の値のいずれかがあります。

    • AT_SEQUENCE_NUMBER (デフォルト) - commitNum および opNum パラメータで一緒に指定されたイベントシーケンス番号から読み取りを開始することを示します。

    • AFTER_SEQUENCE_NUMBER - commitNum および opNum パラメータで一緒に指定されたイベントシーケンス番号の直後に読み取りが開始されることを示します。

    • TRIM_HORIZON - 読み取りは、システム内の最後のトリミングされていないレコードから開始することを示します。これは、変更ログストリームで最も古い (まだ削除されていない) レコードであることを示しています。このモードは、特定の開始イベントシーケンス番号がないアプリケーションの起動時に便利です。

    • LATEST - 読み取りは、システム内の最新のレコードから開始することを示します。これは、変更ログストリームで最近の (まだ削除されていない) レコードであることを示しています。これは、災害対策時やダウンタイムゼロのアップグレード時など、古いレコードを処理しないように、ストリームの現在の上位からレコードを読み取る必要がある場合に便利です。このモードでは、返されるレコードは最大 1 つだけであることに注意してください。

  • commitNum - long、iteratorType が AT_SEQUENCE_NUMBER または AFTER_SEQUENCE_NUMBER のときは必須。

    変更ログストリームから読み取る開始レコードのコミット番号。

    iteratorTypeTRIM_HORIZON または LATEST の場合、このパラメータは無視されます。

  • opNum - long、オプション (デフォルトは 1)。

    変更ログストリームデータからの読み取りを開始するための、指定されたコミット内のオペレーションシーケンス番号。

通常、SPARQL グラフデータを変更するオペレーションでは、オペレーションごとに 1 つの変更レコードしか生成されません。ただし、Gremlin グラフデータを変更するオペレーションでは、次の例のように、オペレーションごとに複数の変更レコードを生成できます。

  • INSERT - Gremlin 頂点は複数のラベルを持つことができ、Gremlin 要素は複数のプロパティを持つことができます。要素が挿入されると、ラベルとプロパティごとに個別の変更レコードが生成されます。

  • UPDATE - Gremlin 要素プロパティが変更されると、2 つの変更レコードが生成されます。1 つ目は前の値の削除で、2 つ目は新しい値の挿入です。

  • DELETE - 削除される要素プロパティごとに個別の変更レコードが生成されます。たとえば、プロパティを持つ Gremlin エッジが削除されると、プロパティごとに 1 つの変更レコードが生成されます。その後、エッジラベルの削除用に 1 つの変更レコードが生成されます。

    Gremlin 頂点が削除されると、すべての受信エッジプロパティと送信エッジプロパティが最初に削除され、次にエッジラベル、頂点プロパティ、最後に頂点ラベルが削除されます。これらの削除はそれぞれ、変更レコードを生成します。

Neptune Streams API レスポンスの形式

Neptune Streams REST API リクエストに対するレスポンスには、以下のフィールドがあります。

  • lastEventId - ストリームレスポンスの最後の変更のシーケンス識別子。イベント ID は 2 つのフィールドで構成されます。commitNum はグラフを変更したトランザクションを識別し、opNum はそのトランザクション内の特定のオペレーションを識別します。以下の例ではこれを示しています。

    "eventId": { "commitNum": 12, "opNum": 1 }
  • lastTrxTimestamp - トランザクションのコミットがリクエストされた時間 (Unix エポックからのミリ秒単位)。

  • format - 返される変更レコードのシリアル化形式。指定できる値は、Gremlin または openCypher 変更レコードの場合は PG_JSON、SPARQL 変更レコードの場合は NQUADS です。

  • records - レスポンスに含まれるシリアル化された変更ログストリームレコードの配列。records 配列内の各レコードには、次のフィールドが含まれます。

    • commitTimestamp - トランザクションのコミットがリクエストされた時間 (Unix エポックからのミリ秒単位)。

    • eventId - ストリームレスポンスの最後の変更のシーケンス識別子。

    • data – シリアル化された Gremlin、SPARQL、または OpenCypher 変更レコード。各レコードのシリアル化形式については、次のセクション Neptune Streams のシリアル化形式 で詳しく説明します。

    • op — 変更を作成した操作。

    • isLastOp - この操作がトランザクションの最後の操作である場合にのみ表示されます。存在する場合は、true に設定されます。トランザクション全体が確実に消費されるようにする場合に便利です。

  • totalRecords - レスポンスのレコードの総数。

例えば、次のレスポンスは、複数の操作を含むトランザクションの Gremlin 変更データを返します。

{ "lastEventId": { "commitNum": 12, "opNum": 1 }, "lastTrxTimestamp": 1560011610678, "format": "PG_JSON", "records": [ { "commitTimestamp": 1560011610678, "eventId": { "commitNum": 1, "opNum": 1 }, "data": { "id": "d2b59bf8-0d0f-218b-f68b-2aa7b0b1904a", "type": "vl", "key": "label", "value": { "value": "vertex", "dataType": "String" } }, "op": "ADD" } ], "totalRecords": 1 }

次のレスポンスは、トランザクションの最後の操作 (トランザクション番号 97 の EventId(97, 1) によって識別される操作) の SPARQL 変更データを返します。

{ "lastEventId": { "commitNum": 97, "opNum": 1 }, "lastTrxTimestamp": 1561489355102, "format": "NQUADS", "records": [ { "commitTimestamp": 1561489355102, "eventId": { "commitNum": 97, "opNum": 1 }, "data": { "stmt": "<https://test.com/s> <https://test.com/p> <https://test.com/o> .\n" }, "op": "ADD", "isLastOp": true } ], "totalRecords": 1 }

Neptune Streams API の例外

次の表では、Neptune Streams の例外について説明します。

エラーコード HTTP コード 再試行してもいいですか。 メッセージ

InvalidParameterException

400

いいえ

無効な または out-of-range 値が入力パラメータとして指定されました。

ExpiredStreamException

400

いいえ

リクエストされたすべてのレコードが許容される最大有効期間を超え、有効期限が切れています。

ThrottlingException

500

はい

リクエストの速度が、最大スループットを超えています。

StreamRecordsNotFoundException

404

いいえ

リクエストされたリソースが見つかりませんでした。ストリームが正しく指定されていない可能性があります。

MemoryLimitExceededException

500

はい

メモリ不足のため、リクエストの処理は成功しませんでしたが、サーバーがビジー状態でなくなったら再試行できます。