翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
AWS Lambda Amazon Neptune 関数の例
以下の Java、JavaScript、および Python で記述された AWS Lambda 関数の例は、fold().coalesce().unfold()
イディオムを使用してランダムに生成された ID で単一の頂点をアップサートしていることを示しています。
各関数のコードの多くは定型コードであり、接続を管理し、エラーが発生した場合に接続とクエリを再試行します。実際のアプリケーションロジックと Gremlin クエリは、doQuery()
および query()
メソッドのそれぞれを使用します。これらの例を独自の Lambda 関数の基礎として使用すると、doQuery()
および query()
の変更に集中できます。
関数は、クエリがうまくいかなければ 5 回再試行し、再試行の間に 1 秒待機するように構成されています。
関数では、次の Lambda 環境変数に値が存在する必要があります。
NEPTUNE_ENDPOINT
— Neptune DB クラスターエンドポイント。Python の場合、これはneptuneEndpoint
のはずです。NEPTUNE_PORT
— Neptune ポート。Python の場合、これはneptunePort
のはずです。-
USE_IAM
— (true
またはfalse
) データベースに有効な AWS Identity and Access Management (IAM) データベース認証がある場合、USE_IAM
環境変数をtrue
に設定します。これにより、Lambda 関数は Neptune への接続リクエストを SIGV4 署名します。このような IAM DB 認証リクエストについては、Lambda 関数の実行ロールに、関数が Neptune DB クラスターに接続できるようにする適切な IAM ポリシーがアタッチされていることを確認してください (IAM ポリシーのタイプ を参照)。
Amazon Neptune の Java Lambda 関数の例
Java AWS Lambda 関数について留意すべき点をいくつかご紹介します。
Java ドライバーは独自の接続プールを保持しますが、これは必要ありませんので、
Cluster
のオブジェクトをminConnectionPoolSize(1)
およびmaxConnectionPoolSize(1)
で構成します。Cluster
オブジェクトは、1 つ以上のシリアライザ (デフォルトで Gyro、binary
のような追加の出力フォーマット用に構成している場合は別のシリアライザ) を作成するため、構築に時間がかかることがあります。インスタンス化には時間がかかる場合があります。接続プールは、最初の要求で初期化されます。この時点で、ドライバーは
Netty
スタックを設定し、バイトバッファを割り当て、IAM DB 認証を使用している場合は、署名キーを作成します。これらすべてがコールドスタートのレイテンシーに追加される可能性があります。-
Java ドライバーの接続プールは、サーバーホストの可用性を監視し、接続に失敗すると自動的に再接続を試みます。バックグラウンドタスクが開始され、接続の再確立が試行されます。
reconnectInterval( )
を使用して、再接続の試行間隔を設定します。ドライバーが再接続しようとしている間、Lambda 関数はクエリの再試行のみができます。再試行間隔が再接続試行の間隔より短い場合、ホストが使用できないと見なされるため、失敗した接続に対する再試行は再度失敗します。これは、
ConcurrentModificationException
の再試行には該当しません。 Java 11ではなくJava 8を使用してください。Java 11 では、
Netty
最適化はデフォルトで有効になっていません。この例では再試行に retry4J
を使用します。 Java Lambda 関数で
Sigv4
署名ドライバーを使用するには、依存関係の要件について、Gremlin Java IAMを使用した Amazon Neptune データベースへの接続 を参照してください。
警告
Retry4J からの CallExecutor
はスレッドセーフではないことがあります。各スレッドで独自の CallExecutor
インスタンスを使用することを検討してください。
package com.amazonaws.examples.social; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.evanlennick.retry4j.CallExecutor; import com.evanlennick.retry4j.CallExecutorBuilder; import com.evanlennick.retry4j.Status; import com.evanlennick.retry4j.config.RetryConfig; import com.evanlennick.retry4j.config.RetryConfigBuilder; import org.apache.tinkerpop.gremlin.driver.Cluster; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.neptune.auth.NeptuneNettyHttpSigV4Signer; import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; import org.apache.tinkerpop.gremlin.driver.ser.Serializers; import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.structure.T; import java.io.*; import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.function.Function; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; public class MyHandler implements RequestStreamHandler { private final GraphTraversalSource g; private final CallExecutor<Object> executor; private final Random idGenerator = new Random(); public MyHandler() { this.g = AnonymousTraversalSource .traversal() .withRemote(DriverRemoteConnection.using(createCluster())); this.executor = new CallExecutorBuilder<Object>() .config(createRetryConfig()) .build(); } @Override public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { doQuery(input, output); } private void doQuery(InputStream input, OutputStream output) throws IOException { try { Map<String, Object> args = new HashMap<>(); args.put("id", idGenerator.nextInt()); String result = query(args); try (Writer writer = new BufferedWriter(new OutputStreamWriter(output, UTF_8))) { writer.write(result); } } finally { input.close(); output.close(); } } private String query(Map<String, Object> args) { int id = (int) args.get("id"); @SuppressWarnings("unchecked") Callable<Object> query = () -> g.V(id) .fold() .coalesce( unfold(), addV("Person").property(T.id, id)) .id().next(); Status<Object> status = executor.execute(query); return status.getResult().toString(); } private Cluster createCluster() { Cluster.Builder builder = Cluster.build() .addContactPoint(System.getenv("NEPTUNE_ENDPOINT")) .port(Integer.parseInt(System.getenv("NEPTUNE_PORT"))) .enableSsl(true) .minConnectionPoolSize(1) .maxConnectionPoolSize(1) .serializer(Serializers.GRAPHBINARY_V1D0) .reconnectInterval(2000); if (Boolean.parseBoolean(getOptionalEnv("USE_IAM", "true"))) { // For versions of TinkerPop 3.4.11 or higher: builder.handshakeInterceptor( r -> { NeptuneNettyHttpSigV4Signer sigV4Signer = new NeptuneNettyHttpSigV4Signer(region, new DefaultAWSCredentialsProviderChain()); sigV4Signer.signRequest(r); return r; } ) // Versions of TinkerPop prior to 3.4.11 should use the following approach. // Be sure to adjust the imports to include: // import org.apache.tinkerpop.gremlin.driver.SigV4WebSocketChannelizer; // builder = builder.channelizer(SigV4WebSocketChannelizer.class); return builder.create(); } private RetryConfig createRetryConfig() { return new RetryConfigBuilder().retryOnCustomExceptionLogic(retryLogic()) .withDelayBetweenTries(1000, ChronoUnit.MILLIS) .withMaxNumberOfTries(5) .withFixedBackoff() .build(); } private Function<Exception, Boolean> retryLogic() { return e -> { StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String message = stringWriter.toString(); // Check for connection issues if ( message.contains("Timed out while waiting for an available host") || message.contains("Timed-out waiting for connection on Host") || message.contains("Connection to server is no longer active") || message.contains("Connection reset by peer") || message.contains("SSLEngine closed already") || message.contains("Pool is shutdown") || message.contains("ExtendedClosedChannelException") || message.contains("Broken pipe")) { return true; } // Concurrent writes can sometimes trigger a ConcurrentModificationException. // In these circumstances you may want to backoff and retry. if (message.contains("ConcurrentModificationException")) { return true; } // If the primary fails over to a new instance, existing connections to the old primary will // throw a ReadOnlyViolationException. You may want to back and retry. if (message.contains("ReadOnlyViolationException")) { return true; } return false; }; } private String getOptionalEnv(String name, String defaultValue) { String value = System.getenv(name); if (value != null && value.length() > 0) { return value; } else { return defaultValue; } } }
関数に再接続ロジックを含める場合は、Java 再接続のサンプル を参照してください。
Amazon Neptune の JavaScript Lambda 関数の例
この例についての注意
JavaScript ドライバーは接続プールを維持しません。常に 1 つの接続を開きます。
このサンプル関数は、IAM 認証が有効なデータベースへのリクエストに署名するため、gremlin-aws-sigv4
の Sigv4 署名ユーティリティを使用します。 これは、オープンソース非同期ユーティリティモジュール
からの retry( ) 関数を使用してバックオフと再試行を処理します。 Gremlin ターミナルステップは JavaScript
promise
を返します (TinkerPop ドキュメントを参照してください)。 next()
の場合、これは{value, done}
タプルです。-
接続エラーはハンドラ内で発生し、ここで概説した推奨事項に沿ったバックオフと再試行ロジックを使用して処理されます。ただし、例外が 1 つあります。ドライバーが例外として扱わないため、このバックオフと再試行ロジックでは対応できないある種の接続の問題があります。
問題は、ドライバーが要求を送信した後、ドライバーが応答を受信する前に接続が閉じられた場合、クエリは完了しているように見え、NULL 値を返すことです。Lambda 関数クライアントに関する限り、関数は正常に完了しているように見えますが、レスポンスは空です。
この問題の影響は、アプリケーションが空のレスポンスをどのように扱うかによって異なります。一部のアプリケーションでは、読み取りリクエストからの空のレスポンスをエラーとして扱うことがありますが、他のアプリケーションでは誤って空の結果として扱われる場合があります。
この接続の問題に遭遇した書き込み要求も、空の応答を返します。空の応答で成功した呼び出しは、成功または失敗を示しますか? 書き込み関数を呼び出すクライアントが、応答の本文を調べるのではなく、データベースへの書き込みがコミットされたことを意味する関数の正常な呼び出しを処理すると、システムがデータを失ったように見える場合があります。
この問題は、基になるソケットによって発生したイベントをドライバが処理する方法に起因します。基盤となるネットワークソケットが
ECONNRESET
エラーで閉じると、ドライバーが使用する WebSocket が閉じられ、'ws close'
イベントの発生となります。ただし、ドライバーには例外としてそのイベントを処理するものが含まれていません。その結果、クエリは単に消えます。この問題を回避するために、この例では Lambda 関数を使用して、リモート接続の作成時にドライバーに例外をスローする
'ws close'
イベントハンドラーを追加しています。ただし、この例外は Gremlin クエリの要求応答パスに沿って発生しないため、Lambda 関数自体のバックオフと再試行ロジックをトリガーするために使用することはできません。代わりに、'ws close'
イベントハンドラーによってスローされる例外は、Lambda 呼び出しが失敗する原因となる、未処理の例外となってしまいます。これにより、関数を呼び出すクライアントはエラーを処理し、必要に応じて Lambda 呼び出しを再試行できます。クライアントを断続的な接続の問題から保護するために、Lambda 関数自体にバックオフと再試行ロジックを実装することをお勧めします。ただし、上記の問題の回避策では、この特定の接続の問題に起因する障害を処理するために、クライアントが再試行ロジックを実装する必要があります。
JavaScript コード
const gremlin = require('gremlin'); const async = require('async'); const {getUrlAndHeaders} = require('gremlin-aws-sigv4/lib/utils'); const traversal = gremlin.process.AnonymousTraversalSource.traversal; const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection; const t = gremlin.process.t; const __ = gremlin.process.statics; let conn = null; let g = null; async function query(context) { const id = context.id; return g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(t.id, id) ) .id().next(); } async function doQuery() { const id = Math.floor(Math.random() * 10000).toString(); let result = await query({id: id}); return result['value']; } exports.handler = async (event, context) => { const getConnectionDetails = () => { if (process.env['USE_IAM'] == 'true'){ return getUrlAndHeaders( process.env['NEPTUNE_ENDPOINT'], process.env['NEPTUNE_PORT'], {}, '/gremlin', 'wss'); } else { const database_url = 'wss://' + process.env['NEPTUNE_ENDPOINT'] + ':' + process.env['NEPTUNE_PORT'] + '/gremlin'; return { url: database_url, headers: {}}; } }; const createRemoteConnection = () => { const { url, headers } = getConnectionDetails(); const c = new DriverRemoteConnection( url, { mimeType: 'application/vnd.gremlin-v2.0+json', headers: headers }); c._client._connection.on('close', (code, message) => { console.info(`close - ${code} ${message}`); if (code == 1006){ console.error('Connection closed prematurely'); throw new Error('Connection closed prematurely'); } }); return c; }; const createGraphTraversalSource = (conn) => { return traversal().withRemote(conn); }; if (conn == null){ console.info("Initializing connection") conn = createRemoteConnection(); g = createGraphTraversalSource(conn); } return async.retry( { times: 5, interval: 1000, errorFilter: function (err) { // Add filters here to determine whether error can be retried console.warn('Determining whether retriable error: ' + err.message); // Check for connection issues if (err.message.startsWith('WebSocket is not open')){ console.warn('Reopening connection'); conn.close(); conn = createRemoteConnection(); g = createGraphTraversalSource(conn); return true; } // Check for ConcurrentModificationException if (err.message.includes('ConcurrentModificationException')){ console.warn('Retrying query because of ConcurrentModificationException'); return true; } // Check for ReadOnlyViolationException if (err.message.includes('ReadOnlyViolationException')){ console.warn('Retrying query because of ReadOnlyViolationException'); return true; } return false; } }, doQuery); };
Amazon Neptune の Python Lambda 関数の例
次の Python AWS Lambda 関数の例について注意すべき点をいくつかご紹介します。
これは、バックオフモジュール
を使用しています。 pool_size=1
を設定して不要な接続プールを作成しないようにします。message_serializer=serializer.GraphSONSerializersV2d0()
を設定します。
import os, sys, backoff, math from random import randint from gremlin_python import statics from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection from gremlin_python.driver.protocol import GremlinServerError from gremlin_python.driver import serializer from gremlin_python.process.anonymous_traversal import traversal from gremlin_python.process.graph_traversal import __ from gremlin_python.process.strategies import * from gremlin_python.process.traversal import T from aiohttp.client_exceptions import ClientConnectorError from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.credentials import ReadOnlyCredentials from types import SimpleNamespace import logging logger = logging.getLogger() logger.setLevel(logging.INFO) reconnectable_err_msgs = [ 'ReadOnlyViolationException', 'Server disconnected', 'Connection refused', 'Connection was already closed', 'Connection was closed by server', 'Failed to connect to server: HTTP Error code 403 - Forbidden' ] retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs network_errors = [OSError, ClientConnectorError] retriable_errors = [GremlinServerError, RuntimeError, Exception] + network_errors def prepare_iamdb_request(database_url): service = 'neptune-db' method = 'GET' access_key = os.environ['AWS_ACCESS_KEY_ID'] secret_key = os.environ['AWS_SECRET_ACCESS_KEY'] region = os.environ['AWS_REGION'] session_token = os.environ['AWS_SESSION_TOKEN'] creds = SimpleNamespace( access_key=access_key, secret_key=secret_key, token=session_token, region=region, ) request = AWSRequest(method=method, url=database_url, data=None) SigV4Auth(creds, service, region).add_auth(request) return (database_url, request.headers.items()) def is_retriable_error(e): is_retriable = False err_msg = str(e) if isinstance(e, tuple(network_errors)): is_retriable = True else: is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs) logger.error('error: [{}] {}'.format(type(e), err_msg)) logger.info('is_retriable: {}'.format(is_retriable)) return is_retriable def is_non_retriable_error(e): return not is_retriable_error(e) def reset_connection_if_connection_issue(params): is_reconnectable = False e = sys.exc_info()[1] err_msg = str(e) if isinstance(e, tuple(network_errors)): is_reconnectable = True else: is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs) logger.info('is_reconnectable: {}'.format(is_reconnectable)) if is_reconnectable: global conn global g conn.close() conn = create_remote_connection() g = create_graph_traversal_source(conn) @backoff.on_exception(backoff.constant, tuple(retriable_errors), max_tries=5, jitter=None, giveup=is_non_retriable_error, on_backoff=reset_connection_if_connection_issue, interval=1) def query(**kwargs): id = kwargs['id'] return (g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(T.id, id) ) .id().next()) def doQuery(event): return query(id=str(randint(0, 10000))) def lambda_handler(event, context): result = doQuery(event) logger.info('result – {}'.format(result)) return result def create_graph_traversal_source(conn): return traversal().withRemote(conn) def create_remote_connection(): logger.info('Creating remote connection') (database_url, headers) = connection_info() return DriverRemoteConnection( database_url, 'g', pool_size=1, message_serializer=serializer.GraphSONSerializersV2d0(), headers=headers) def connection_info(): database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort']) if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true': return prepare_iamdb_request(database_url) else: return (database_url, {}) conn = create_remote_connection() g = create_graph_traversal_source(conn)
以下はサンプルの結果であり、負荷が高い期間と軽い負荷が交互に繰り返されることを示しています。