

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# AWS Lambda Amazon Neptune の 関数の例
<a name="lambda-functions-examples"></a>

Java、JavaScript、Python で記述された次の AWS Lambda 関数の例は、`fold().coalesce().unfold()`イディオムを使用してランダムに生成された ID で単一の頂点をアップサートする方法を示しています。

各関数のコードの多くは定型コードであり、接続を管理し、エラーが発生した場合に接続とクエリを再試行します。実際のアプリケーションロジックと Gremlin クエリは、`doQuery()` および `query()` メソッドのそれぞれを使用します。これらの例を独自の Lambda 関数の基礎として使用すると、`doQuery()` および `query()` の変更に集中できます。

関数は、クエリがうまくいかなければ 5 回再試行し、再試行の間に 1 秒待機するように構成されています。

関数では、次の Lambda 環境変数に値が存在する必要があります。
+ **`NEPTUNE_ENDPOINT`** — Neptune DB クラスターエンドポイント。Python の場合、これは `neptuneEndpoint` のはずです。
+ **`NEPTUNE_PORT`** — Neptune ポート。Python の場合、これは `neptunePort` のはずです。
+ **`USE_IAM `**   –   (`true` または `false`) データベースで AWS Identity and Access Management (IAM) データベース認証が有効になっている場合は、`USE_IAM`環境変数を に設定します`true`。これにより、Lambda 関数は Neptune への接続リクエストを SIGV4 署名します。このような IAM DB 認証リクエストについては、Lambda 関数の実行ロールに、関数が Neptune DB クラスターに接続できるようにする適切な IAM ポリシーがアタッチされていることを確認してください ([IAM ポリシーのタイプ](security-iam-access-manage.md#iam-auth-policy) を参照)。

## Amazon Neptune の Java Lambda 関数の例
<a name="lambda-functions-examples-java"></a>

Java AWS Lambda 関数については、以下の点に留意してください。
+ Java ドライバーは独自の接続プールを保持しますが、これは必要ありませんので、`Cluster` のオブジェクトを `minConnectionPoolSize(1)` および `maxConnectionPoolSize(1)` で構成します。
+ `Cluster` オブジェクトは、1 つ以上のシリアライザ (デフォルトで Gyro、`binary` のような追加の出力フォーマット用に構成している場合は別のシリアライザ) を作成するため、構築に時間がかかることがあります。インスタンス化には時間がかかる場合があります。
+ 接続プールは、最初の要求で初期化されます。この時点で、ドライバーは `Netty` スタックを設定し、バイトバッファを割り当て、IAM DB 認証を使用している場合は、署名キーを作成します。これらすべてがコールドスタートのレイテンシーに追加される可能性があります。
+ Java ドライバーの接続プールは、サーバーホストの可用性を監視し、接続に失敗すると自動的に再接続を試みます。バックグラウンドタスクが開始され、接続の再確立が試行されます。`reconnectInterval( )` を使用して、再接続の試行間隔を設定します。ドライバーが再接続しようとしている間、Lambda 関数はクエリの再試行のみができます。

  再試行間隔が再接続試行の間隔より短い場合、ホストが使用できないと見なされるため、失敗した接続に対する再試行は再度失敗します。これは、`ConcurrentModificationException` の再試行には該当しません。
+ Java 11ではなくJava 8を使用してください。Java 11 では、`Netty` 最適化はデフォルトで有効になっていません。
+ この例では再試行に [retry4J](https://github.com/elennick/retry4j) を使用します。
+ Java Lambda 関数で `Sigv4` 署名ドライバーを使用するには、依存関係の要件について、[Gremlin Java による IAM を使用した Amazon Neptune データベースへの接続](iam-auth-connecting-gremlin-java.md) を参照してください。

**警告**  
Retry4J からの `CallExecutor` はスレッドセーフではないことがあります。各スレッドで独自の `CallExecutor` インスタンスを使用することを検討してください。

**注記**  
 次の例は、requestInterceptor() の使用を含むように更新されました。これは TinkerPop 3.6.6 で追加されました。TinkerPop バージョン 3.6.6 以前は、コード例では handshakeInterceptor() を使用していましたが、このリリースでは廃止されました。

```
package com.amazonaws.examples.social;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.evanlennick.retry4j.CallExecutor;
import com.evanlennick.retry4j.CallExecutorBuilder;
import com.evanlennick.retry4j.Status;
import com.evanlennick.retry4j.config.RetryConfig;
import com.evanlennick.retry4j.config.RetryConfigBuilder;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.neptune.auth.NeptuneNettyHttpSigV4Signer;
import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.structure.T;

import java.io.*;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.function.Function;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold;

public class MyHandler implements RequestStreamHandler {

  private final GraphTraversalSource g;
  private final CallExecutor<Object> executor;
  private final Random idGenerator = new Random();

  public MyHandler() {

    this.g = AnonymousTraversalSource
            .traversal()
            .withRemote(DriverRemoteConnection.using(createCluster()));


    this.executor = new CallExecutorBuilder<Object>()
            .config(createRetryConfig())
            .build();

  }

  @Override
  public void handleRequest(InputStream input,
                            OutputStream output,
                            Context context) throws IOException {

    doQuery(input, output);
  }

  private void doQuery(InputStream input, OutputStream output) throws IOException {
    try {

      Map<String, Object> args = new HashMap<>();
      args.put("id", idGenerator.nextInt());

      String result = query(args);

      try (Writer writer = new BufferedWriter(new OutputStreamWriter(output, UTF_8))) {
          writer.write(result);
      }

    } finally {
        input.close();
        output.close();
    }
  }

  private String query(Map<String, Object> args) {
    int id = (int) args.get("id");

    @SuppressWarnings("unchecked")
    Callable<Object> query = () -> g.V(id)
      .fold()
      .coalesce(
        unfold(),
        addV("Person").property(T.id, id))
      .id().next();

    Status<Object> status = executor.execute(query);

    return status.getResult().toString();
  }

  private Cluster createCluster() {
    Cluster.Builder builder = Cluster.build()
                                     .addContactPoint(System.getenv("NEPTUNE_ENDPOINT"))
                                     .port(Integer.parseInt(System.getenv("NEPTUNE_PORT")))
                                     .enableSsl(true)
                                     .minConnectionPoolSize(1)
                                     .maxConnectionPoolSize(1)
                                     .serializer(Serializers.GRAPHBINARY_V1D0)
                                     .reconnectInterval(2000);

  if (Boolean.parseBoolean(getOptionalEnv("USE_IAM", "true"))) {
    // The following example uses requestInterceptor(), which was introduced
    // in TinkerPop 3.6.6. If you are using a TinkerPop version earlier than
    // 3.6.6 (but 3.5.5 or higher), use handshakeInterceptor() instead.
    builder.requestInterceptor( r ->
      {
        NeptuneNettyHttpSigV4Signer sigV4Signer = new NeptuneNettyHttpSigV4Signer(region, DefaultCredentialsProvider.create());
        sigV4Signer.signRequest(r);
        return r;
      }
    )
    
    return builder.create();
  }

  private RetryConfig createRetryConfig() {
    return new RetryConfigBuilder().retryOnCustomExceptionLogic(retryLogic())
                                   .withDelayBetweenTries(1000, ChronoUnit.MILLIS)
                                   .withMaxNumberOfTries(5)
                                   .withFixedBackoff()
                                   .build();
  }

  private Function<Exception, Boolean> retryLogic() {
    return e -> {
      StringWriter stringWriter = new StringWriter();
      e.printStackTrace(new PrintWriter(stringWriter));
      String message = stringWriter.toString();

      // Check for connection issues
      if ( message.contains("Timed out while waiting for an available host") ||
           message.contains("Timed-out waiting for connection on Host") ||
           message.contains("Connection to server is no longer active") ||
           message.contains("Connection reset by peer") ||
           message.contains("SSLEngine closed already") ||
           message.contains("Pool is shutdown") ||
           message.contains("ExtendedClosedChannelException") ||
           message.contains("Broken pipe")) {
        return true;
      }

      // Concurrent writes can sometimes trigger a ConcurrentModificationException.
      // In these circumstances you may want to backoff and retry.
      if (message.contains("ConcurrentModificationException")) {
          return true;
      }

      // If the primary fails over to a new instance, existing connections to the old primary will
      // throw a ReadOnlyViolationException. You may want to back and retry.
      if (message.contains("ReadOnlyViolationException")) {
          return true;
      }

      return false;
    };
  }

  private String getOptionalEnv(String name, String defaultValue) {
    String value = System.getenv(name);
    if (value != null && value.length() > 0) {
      return value;
    } else {
      return defaultValue;
    }
  }
}
```

関数に再接続ロジックを含める場合は、[Java 再接続のサンプル](access-graph-gremlin-java-reconnect-example.md) を参照してください。

## Amazon Neptune の JavaScript Lambda 関数の例
<a name="lambda-functions-examples-javascript"></a>

**この例についての注意**
+ JavaScript ドライバーは接続プールを維持しません。常に 1 つの接続を開きます。
+ このサンプル関数は、IAM 認証が有効なデータベースへのリクエストに署名するため、[gremlin-aws-sigv4](https://github.com/shutterstock/gremlin-aws-sigv4) の Sigv4 署名ユーティリティを使用します。
+ これは、オープンソース[非同期ユーティリティモジュール](https://github.com/caolan/async)からの [retry( )](https://caolan.github.io/async/v3/docs.html#retry) 関数を使用してバックオフと再試行を処理します。
+ Gremlin ターミナルステップは JavaScript `promise` を返します ([TinkerPop ドキュメント](https://tinkerpop.apache.org/docs/current/reference/#gremlin-javascript-connecting)を参照してください)。`next()` の場合、これは `{value, done}` タプルです。
+ 接続エラーはハンドラ内で発生し、ここで概説した推奨事項に沿ったバックオフと再試行ロジックを使用して処理されます。ただし、例外が 1 つあります。ドライバーが例外として扱わないため、このバックオフと再試行ロジックでは対応できないある種の接続の問題があります。

  問題は、ドライバーが要求を送信した後、ドライバーが応答を受信する前に接続が閉じられた場合、クエリは完了しているように見え、NULL 値を返すことです。Lambda 関数クライアントに関する限り、関数は正常に完了しているように見えますが、レスポンスは空です。

  この問題の影響は、アプリケーションが空のレスポンスをどのように扱うかによって異なります。一部のアプリケーションでは、読み取りリクエストからの空のレスポンスをエラーとして扱うことがありますが、他のアプリケーションでは誤って空の結果として扱われる場合があります。

  この接続の問題に遭遇した書き込み要求も、空の応答を返します。空の応答で成功した呼び出しは、成功または失敗を示しますか？ 書き込み関数を呼び出すクライアントが、応答の本文を調べるのではなく、データベースへの書き込みがコミットされたことを意味する関数の正常な呼び出しを処理すると、システムがデータを失ったように見える場合があります。

  この問題は、基になるソケットによって発生したイベントをドライバが処理する方法に起因します。基盤となるネットワークソケットが `ECONNRESET` エラーで閉じると、ドライバーが使用する WebSocket が閉じられ、`'ws close'` イベントの発生となります。ただし、ドライバーには例外としてそのイベントを処理するものが含まれていません。その結果、クエリは単に消えます。

  この問題を回避するために、この例では Lambda 関数を使用して、リモート接続の作成時にドライバーに例外をスローする `'ws close'` イベントハンドラーを追加しています。ただし、この例外は Gremlin クエリの要求応答パスに沿って発生しないため、Lambda 関数自体のバックオフと再試行ロジックをトリガーするために使用することはできません。代わりに、`'ws close'` イベントハンドラーによってスローされる例外は、Lambda 呼び出しが失敗する原因となる、未処理の例外となってしまいます。これにより、関数を呼び出すクライアントはエラーを処理し、必要に応じて Lambda 呼び出しを再試行できます。

  クライアントを断続的な接続の問題から保護するために、Lambda 関数自体にバックオフと再試行ロジックを実装することをお勧めします。ただし、上記の問題の回避策では、この特定の接続の問題に起因する障害を処理するために、クライアントが再試行ロジックを実装する必要があります。

### JavaScript コード
<a name="lambda-functions-examples-javascript-code"></a>

```
const gremlin = require('gremlin');
const async = require('async');
const {getUrlAndHeaders} = require('gremlin-aws-sigv4/lib/utils');

const traversal = gremlin.process.AnonymousTraversalSource.traversal;
const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection;
const t = gremlin.process.t;
const __ = gremlin.process.statics;

let conn = null;
let g = null;

async function query(context) {

  const id = context.id;

  return g.V(id)
    .fold()
    .coalesce(
      __.unfold(), 
      __.addV('User').property(t.id, id)
    )
    .id().next();
}

async function doQuery() {
  const id = Math.floor(Math.random() * 10000).toString();

  let result = await query({id: id}); 
  return result['value'];
}

exports.handler = async (event, context) => {

  const getConnectionDetails = () => {
    if (process.env['USE_IAM'] == 'true'){
       return getUrlAndHeaders(
         process.env['NEPTUNE_ENDPOINT'],
         process.env['NEPTUNE_PORT'],
         {},
         '/gremlin',
         'wss'); 
    } else {
      const database_url = 'wss://' + process.env['NEPTUNE_ENDPOINT'] + ':' + process.env['NEPTUNE_PORT'] + '/gremlin';
      return { url: database_url, headers: {}};
    }    
  };


  const createRemoteConnection = () => {
    const { url, headers } = getConnectionDetails();

    const c = new DriverRemoteConnection(
      url, 
      {          
        headers: headers 
      });  

     c._client._connection.on('close', (code, message) => {
         console.info(`close - ${code} ${message}`);
         if (code == 1006){
           console.error('Connection closed prematurely');
           throw new Error('Connection closed prematurely');
         }
       });  

     return c;     
  };

  const createGraphTraversalSource = (conn) => {
    return traversal().withRemote(conn);
  };

  if (conn == null){
    console.info("Initializing connection")
    conn = createRemoteConnection();
    g = createGraphTraversalSource(conn);
  }

  return async.retry(
    { 
      times: 5,
      interval: 1000,
      errorFilter: function (err) { 

        // Add filters here to determine whether error can be retried
        console.warn('Determining whether retriable error: ' + err.message);

        // Check for connection issues
        if (err.message.startsWith('WebSocket is not open')){
          console.warn('Reopening connection');
          conn.close();
          conn = createRemoteConnection();
          g = createGraphTraversalSource(conn);
          return true;
        }

        // Check for ConcurrentModificationException
        if (err.message.includes('ConcurrentModificationException')){
          console.warn('Retrying query because of ConcurrentModificationException');
          return true;
        }

        // Check for ReadOnlyViolationException
        if (err.message.includes('ReadOnlyViolationException')){
          console.warn('Retrying query because of ReadOnlyViolationException');
          return true;
        }

        return false; 
      }

    }, 
    doQuery);
};
```

## Amazon Neptune の Python Lambda 関数の例
<a name="lambda-functions-examples-python"></a>

次の Python AWS Lambda 関数の例について注意すべき点をいくつかご紹介します。
+ これは、[バックオフモジュール](https://pypi.org/project/backoff/)を使用しています。
+ `pool_size=1` を設定して不要な接続プールを作成しないようにします。

```
import os, sys, backoff, math
from random import randint
from gremlin_python import statics
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.driver.protocol import GremlinServerError
from gremlin_python.driver import serializer
from gremlin_python.process.anonymous_traversal import traversal
from gremlin_python.process.graph_traversal import __
from gremlin_python.process.strategies import *
from gremlin_python.process.traversal import T
from aiohttp.client_exceptions import ClientConnectorError
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import ReadOnlyCredentials
from types import SimpleNamespace

import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)


reconnectable_err_msgs = [
    'ReadOnlyViolationException',
    'Server disconnected',
    'Connection refused',
    'Connection was already closed',
    'Connection was closed by server',
    'Failed to connect to server: HTTP Error code 403 - Forbidden'
]

retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs

network_errors = [OSError, ClientConnectorError]

retriable_errors = [GremlinServerError, RuntimeError, Exception] + network_errors

def prepare_iamdb_request(database_url):

    service = 'neptune-db'
    method = 'GET'

    access_key = os.environ['AWS_ACCESS_KEY_ID']
    secret_key = os.environ['AWS_SECRET_ACCESS_KEY']
    region = os.environ['AWS_REGION']
    session_token = os.environ['AWS_SESSION_TOKEN']


    creds = SimpleNamespace(
        access_key=access_key, secret_key=secret_key, token=session_token, region=region,
    )

    request = AWSRequest(method=method, url=database_url, data=None)
    SigV4Auth(creds, service, region).add_auth(request)

    return database_url, request.headers.items()

def is_retriable_error(e):

    is_retriable = False
    err_msg = str(e)

    if isinstance(e, tuple(network_errors)):
        is_retriable = True
    else:
        is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs)

    logger.error('error: [{}] {}'.format(type(e), err_msg))
    logger.info('is_retriable: {}'.format(is_retriable))

    return is_retriable

def is_non_retriable_error(e):
    return not is_retriable_error(e)

def reset_connection_if_connection_issue(params):

    is_reconnectable = False

    e = sys.exc_info()[1]
    err_msg = str(e)

    if isinstance(e, tuple(network_errors)):
        is_reconnectable = True
    else:
        is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs)

    logger.info('is_reconnectable: {}'.format(is_reconnectable))

    if is_reconnectable:
        global conn
        global g
        conn.close()
        conn = create_remote_connection()
        g = create_graph_traversal_source(conn)

@backoff.on_exception(backoff.constant,
                      tuple(retriable_errors),
                      max_tries=5,
                      jitter=None,
                      giveup=is_non_retriable_error,
                      on_backoff=reset_connection_if_connection_issue,
                      interval=1)
def query(**kwargs):

    id = kwargs['id']

    return (g.V().hasLabel('column').has('column_name', 'amhstr_ag_type').in_('hascolumn').dedup().valueMap().limit(10).toList())

def doQuery(event):
    return query(id=str(randint(0, 10000)))

def lambda_handler(event, context):
    result = doQuery(event)
    logger.info('result – {}'.format(result))
    return result

def create_graph_traversal_source(conn):
    return traversal().withRemote(conn)

def create_remote_connection():
    logger.info('Creating remote connection')

    (database_url, headers) = connection_info()

    # Convert headers to a dictionary if it's not already
    headers_dict = dict(headers) if isinstance(headers, list) else headers

    print(headers)
    return DriverRemoteConnection(
        database_url,
        'g',
        pool_size=1,
        headers=headers_dict)


def connection_info():

    database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort'])

    if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true':
        return prepare_iamdb_request(database_url)
    else:
        return (database_url, {})

conn = create_remote_connection()
g = create_graph_traversal_source(conn)
```

以下はサンプルの結果であり、負荷が高い期間と軽い負荷が交互に繰り返されることを示しています。

![\[Python Lambda 関数の例からのサンプル結果を示す図表。\]](http://docs.aws.amazon.com/ja_jp/neptune/latest/userguide/images/python-lambda-results.png)
