Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Beispiele für AWS Lambda-Funktionen für Amazon Neptune
Die folgenden Beispiele für AWS Lambda-Funktionen, geschrieben in Java, JavaScript und Python, veranschaulichen das Upserting eines einzelnen Scheitelpunkts mit einer zufällig generierten ID unter Verwendung des Ausdrucks fold().coalesce().unfold()
.
Bei einem Großteil des Codes in den einzelnen Funktionen handelt es sich um Boilerplate-Code, der für die Verwaltung von Verbindungen und die Wiederholung von Verbindungen und Abfragen im Falle eines Fehlers zuständig ist. Die eigentliche Anwendungslogik und die Gremlin-Abfrage werden in den Methoden doQuery()
bzw. query()
implementiert. Wenn Sie diese Beispiele als Grundlage für Ihre eigenen Lambda-Funktionen verwenden, können Sie sich auf die Änderung von doQuery()
und query()
konzentrieren.
Die Funktionen sind so konfiguriert, dass fehlgeschlagene Abfragen 5-mal wiederholt werden und zwischen den Wiederholungen jeweils 1 Sekunde gewartet wird.
Die Funktionen erfordern Werte in den folgenden Lambda-Umgebungsvariablen:
NEPTUNE_ENDPOINT
– Ihr Neptune-DB-Cluster-Endpunkt. Für Python sollte diesneptuneEndpoint
sein.NEPTUNE_PORT
– Der Neptune-Port. Für Python sollte diesneptunePort
sein.-
USE_IAM
– (true
oderfalse
) Wenn für Ihre Datenbank die AWS Identity and Access Management (IAM)-Datenbankauthentifizierung aktiviert ist, setzen Sie die UmgebungsvariableUSE_IAM
auftrue
. Dies veranlasst die Lambda-Funktion, Verbindungsanforderungen an Neptune mit Sigv4 zu signieren. Stellen Sie für solche IAM-DB-Authentifizierungsanforderungen sicher, dass der Ausführungsrolle der Lambda-Funktion eine entsprechende IAM-Richtlinie angefügt ist, die es der Funktion ermöglicht, eine Verbindung zu Ihrem Neptune-DB-Cluster herzustellen (siehe Arten von Richtlinien IAM).
Beispiel für eine Java-Lambda-Funktion für Amazon Neptune
Hier einige Dinge, die Sie in Bezug auf Java-AWS Lambda-Funktionen beachten sollten:
Der Java-Treiber unterhält seinen eigenen Verbindungspool, den Sie nicht benötigen. Konfigurieren Sie Ihr
Cluster
-Objekt daher mitminConnectionPoolSize(1)
undmaxConnectionPoolSize(1)
.Die Erstellung des
Cluster
-Objekts kann langsam sein, da es einen oder mehrere Serialisierer erstellt (standardmäßig Gyro, plus einen weiteren, wenn Sie es für zusätzliche Ausgabeformate wie z. B.binary
konfiguriert haben). Deren Instanziierung kann eine Weile dauern.Der Verbindungspool wird mit der ersten Anforderung initialisiert. Zu diesem Zeitpunkt richtet der Treiber den
Netty
-Stack ein, weist Byte-Puffer zu und erstellt einen Signaturschlüssel, wenn Sie die IAM-DB-Authentifizierung verwenden. All dies kann die Kaltstart-Latenz erhöhen.-
Der Verbindungspool des Java-Treibers überwacht die Verfügbarkeit der Server-Hosts und versucht automatisch eine erneute Verbindung, wenn eine Verbindung fehlschlägt. Er startet eine Hintergrundaufgabe, um zu versuchen, die Verbindung wiederherzustellen. Verwenden Sie
reconnectInterval( )
, um das Intervall zwischen erneuten Verbindungsversuchen zu konfigurieren. Während der Treiber eine erneute Verbindung versucht, kann Ihre Lambda-Funktion die Abfrage einfach wiederholen.Wenn das Intervall zwischen den Wiederholungen kleiner als das Intervall zwischen den erneuten Verbindungsversuchen ist, schlagen Wiederholungsversuche bei einer fehlgeschlagenen Verbindung erneut fehl, da der Host als nicht verfügbar angesehen wird. Dies gilt nicht für Wiederholungen für eine
ConcurrentModificationException
. Verwenden Sie Java 8 anstelle von Java 11.
Netty
-Optimierungen sind in Java 11 standardmäßig nicht aktiviert.In diesem Beispiel wird Retry4J
für Wiederholungen verwendet. Informationen zur Verwendung des
Sigv4
-Signaturtreibers in Ihrer Java-Lambda-Funktion finden Sie in den Abhängigkeitsanforderungen in Herstellen einer Verbindung zu Amazon Neptune Neptune-Datenbanken IAM mithilfe von Gremlin Java.
Warnung
Der CallExecutor
von Retry4j ist möglicherweise nicht threadsicher. Überlegen Sie sich, jeden Thread eine eigene CallExecutor
-Instance verwenden zu lassen.
package com.amazonaws.examples.social; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.evanlennick.retry4j.CallExecutor; import com.evanlennick.retry4j.CallExecutorBuilder; import com.evanlennick.retry4j.Status; import com.evanlennick.retry4j.config.RetryConfig; import com.evanlennick.retry4j.config.RetryConfigBuilder; import org.apache.tinkerpop.gremlin.driver.Cluster; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.neptune.auth.NeptuneNettyHttpSigV4Signer; import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; import org.apache.tinkerpop.gremlin.driver.ser.Serializers; import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.structure.T; import java.io.*; import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.function.Function; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; public class MyHandler implements RequestStreamHandler { private final GraphTraversalSource g; private final CallExecutor<Object> executor; private final Random idGenerator = new Random(); public MyHandler() { this.g = AnonymousTraversalSource .traversal() .withRemote(DriverRemoteConnection.using(createCluster())); this.executor = new CallExecutorBuilder<Object>() .config(createRetryConfig()) .build(); } @Override public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { doQuery(input, output); } private void doQuery(InputStream input, OutputStream output) throws IOException { try { Map<String, Object> args = new HashMap<>(); args.put("id", idGenerator.nextInt()); String result = query(args); try (Writer writer = new BufferedWriter(new OutputStreamWriter(output, UTF_8))) { writer.write(result); } } finally { input.close(); output.close(); } } private String query(Map<String, Object> args) { int id = (int) args.get("id"); @SuppressWarnings("unchecked") Callable<Object> query = () -> g.V(id) .fold() .coalesce( unfold(), addV("Person").property(T.id, id)) .id().next(); Status<Object> status = executor.execute(query); return status.getResult().toString(); } private Cluster createCluster() { Cluster.Builder builder = Cluster.build() .addContactPoint(System.getenv("NEPTUNE_ENDPOINT")) .port(Integer.parseInt(System.getenv("NEPTUNE_PORT"))) .enableSsl(true) .minConnectionPoolSize(1) .maxConnectionPoolSize(1) .serializer(Serializers.GRAPHBINARY_V1D0) .reconnectInterval(2000); if (Boolean.parseBoolean(getOptionalEnv("USE_IAM", "true"))) { // For versions of TinkerPop 3.4.11 or higher: builder.handshakeInterceptor( r -> { NeptuneNettyHttpSigV4Signer sigV4Signer = new NeptuneNettyHttpSigV4Signer(region, new DefaultAWSCredentialsProviderChain()); sigV4Signer.signRequest(r); return r; } ) // Versions of TinkerPop prior to 3.4.11 should use the following approach. // Be sure to adjust the imports to include: // import org.apache.tinkerpop.gremlin.driver.SigV4WebSocketChannelizer; // builder = builder.channelizer(SigV4WebSocketChannelizer.class); return builder.create(); } private RetryConfig createRetryConfig() { return new RetryConfigBuilder().retryOnCustomExceptionLogic(retryLogic()) .withDelayBetweenTries(1000, ChronoUnit.MILLIS) .withMaxNumberOfTries(5) .withFixedBackoff() .build(); } private Function<Exception, Boolean> retryLogic() { return e -> { StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String message = stringWriter.toString(); // Check for connection issues if ( message.contains("Timed out while waiting for an available host") || message.contains("Timed-out waiting for connection on Host") || message.contains("Connection to server is no longer active") || message.contains("Connection reset by peer") || message.contains("SSLEngine closed already") || message.contains("Pool is shutdown") || message.contains("ExtendedClosedChannelException") || message.contains("Broken pipe")) { return true; } // Concurrent writes can sometimes trigger a ConcurrentModificationException. // In these circumstances you may want to backoff and retry. if (message.contains("ConcurrentModificationException")) { return true; } // If the primary fails over to a new instance, existing connections to the old primary will // throw a ReadOnlyViolationException. You may want to back and retry. if (message.contains("ReadOnlyViolationException")) { return true; } return false; }; } private String getOptionalEnv(String name, String defaultValue) { String value = System.getenv(name); if (value != null && value.length() > 0) { return value; } else { return defaultValue; } } }
Wenn Sie die Wiederverbindungslogik in Ihre Funktion aufnehmen möchten, finden Sie weitere Informationen unter Beispiel für eine erneute Verbindung mit Java.
Beispiel für eine JavaScript-Lambda-Funktion für Amazon Neptune
Anmerkungen zu diesem Beispiel
Der JavaScript-Treiber unterhält keinen Verbindungspool. Es öffnet immer eine einzige Verbindung.
Die Beispielfunktion verwendet die Sigv4-Signaturdienstprogramme von gremlin-aws-sigv4
zum Signieren von Anforderungen an eine Datenbank mit aktivierter IAM-Authentifizierung. Sie verwendet die Funktion retry ()
aus dem Open-Source-Dienstprogrammmodul Async zur Verarbeitung von Backoff- und Wiederholungsversuchen. Gremlin-Terminal-Schritte geben ein JavaScript
promise
zurück (siehe TinkerPop-Dokumentation). Für next()
ist dies ein Tupel{value, done}
-
Verbindungsfehler werden innerhalb des Handlers ausgelöst und mit einer gewissen Backoff- und Wiederholungslogik gemäß den hier dargelegten Empfehlungen behandelt. Dabei gibt es jedoch eine Ausnahme. Es gibt eine Art von Verbindungsproblem, die der Treiber nicht als Ausnahme behandelt und die daher mit dieser Backoff- und Wiederholungslogik nicht behoben werden kann.
Das Problem besteht darin, dass die Abfrage zwar abgeschlossen zu werden scheint, jedoch einen Nullwert zurückgibt, wenn eine Verbindung geschlossen wird, nachdem ein Treiber eine Anforderung gesendet hat, jedoch bevor der Treiber eine Antwort erhält. Was den Client der Lambda-Funktion betrifft, so scheint die Funktion erfolgreich abgeschlossen zu werden, allerdings mit einer leeren Antwort.
Wie sich dieses Problem auswirkt, hängt davon ab, wie Ihre Anwendung mit einer leeren Antwort umgeht. Einige Anwendungen behandeln eine leere Antwort auf eine Leseanforderung möglicherweise als Fehler, während andere sie fälschlicherweise als leeres Ergebnis behandeln.
Schreibanforderungen, bei denen dieses Verbindungsproblem auftritt, geben ebenfalls eine leere Antwort zurück. Signalisiert ein erfolgreicher Aufruf mit einer leeren Antwort eine erfolgreiche Ausführung oder einen Fehler? Wenn der Client, der eine Schreibfunktion aufruft, den erfolgreichen Aufruf der Funktion einfach so interpretiert, dass der Schreibvorgang in die Datenbank festgeschrieben wurde, anstatt den Hauptteil der Antwort zu überprüfen, kann es so aussehen, als ob das System Daten verliert.
Dieses Problem ist darauf zurückzuführen, wie der Treiber Ereignisse behandelt, die vom zugrunde liegenden Socket ausgegeben werden. Wenn der zugrunde liegende Netzwerk-Socket mit einem Fehler
ECONNRESET
geschlossen wird, wird der vom Treiber verwendete WebSocket geschlossen und gibt ein Ereignis'ws close'
aus. Der Treiber enthält jedoch nichts, um dieses Ereignis so zu behandeln, dass eine Ausnahme ausgelöst werden könnte. Infolgedessen verschwindet die Abfrage einfach.Um dieses Problem zu umgehen, fügt die Lambda-Beispielfunktion hier einen
'ws close'
-Event-Handler hinzu, der beim Herstellen einer Remote-Verbindung eine Ausnahme für den Treiber auslöst. Diese Ausnahme wird jedoch nicht entlang des Request-Response-Pfads der Gremlin-Abfrage ausgelöst und kann daher nicht verwendet werden, um eine Backoff- und Wiederholungslogik innerhalb der Lambda-Funktion selbst auszulösen. Die vom'ws close'
-Event-Handler ausgelöste Ausnahme fühlt vielmehr zu einer unbehandelten Ausnahme, die zur Folge hat, dass der Lambda-Aufruf fehlschlägt. So kann der Client, der die Funktion aufruft, den Fehler behandeln und den Lambda-Aufruf gegebenenfalls erneut versuchen.Wir empfehlen, in der Lambda-Funktion selbst eine Backoff- und Wiederholungslogik zu implementieren, um Ihre Clients vor zeitweiligen Verbindungsproblemen zu schützen. Um das oben genannte Problem zu umgehen, muss jedoch auch der Client eine Wiederholungslogik implementieren, um Fehler zu behandeln, die sich aus diesem speziellen Verbindungsproblem ergeben.
Javascript-Code
const gremlin = require('gremlin'); const async = require('async'); const {getUrlAndHeaders} = require('gremlin-aws-sigv4/lib/utils'); const traversal = gremlin.process.AnonymousTraversalSource.traversal; const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection; const t = gremlin.process.t; const __ = gremlin.process.statics; let conn = null; let g = null; async function query(context) { const id = context.id; return g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(t.id, id) ) .id().next(); } async function doQuery() { const id = Math.floor(Math.random() * 10000).toString(); let result = await query({id: id}); return result['value']; } exports.handler = async (event, context) => { const getConnectionDetails = () => { if (process.env['USE_IAM'] == 'true'){ return getUrlAndHeaders( process.env['NEPTUNE_ENDPOINT'], process.env['NEPTUNE_PORT'], {}, '/gremlin', 'wss'); } else { const database_url = 'wss://' + process.env['NEPTUNE_ENDPOINT'] + ':' + process.env['NEPTUNE_PORT'] + '/gremlin'; return { url: database_url, headers: {}}; } }; const createRemoteConnection = () => { const { url, headers } = getConnectionDetails(); const c = new DriverRemoteConnection( url, { mimeType: 'application/vnd.gremlin-v2.0+json', headers: headers }); c._client._connection.on('close', (code, message) => { console.info(`close - ${code} ${message}`); if (code == 1006){ console.error('Connection closed prematurely'); throw new Error('Connection closed prematurely'); } }); return c; }; const createGraphTraversalSource = (conn) => { return traversal().withRemote(conn); }; if (conn == null){ console.info("Initializing connection") conn = createRemoteConnection(); g = createGraphTraversalSource(conn); } return async.retry( { times: 5, interval: 1000, errorFilter: function (err) { // Add filters here to determine whether error can be retried console.warn('Determining whether retriable error: ' + err.message); // Check for connection issues if (err.message.startsWith('WebSocket is not open')){ console.warn('Reopening connection'); conn.close(); conn = createRemoteConnection(); g = createGraphTraversalSource(conn); return true; } // Check for ConcurrentModificationException if (err.message.includes('ConcurrentModificationException')){ console.warn('Retrying query because of ConcurrentModificationException'); return true; } // Check for ReadOnlyViolationException if (err.message.includes('ReadOnlyViolationException')){ console.warn('Retrying query because of ReadOnlyViolationException'); return true; } return false; } }, doQuery); };
Beispiel für eine Python-Lambda-Funktion für Amazon Neptune
Hier einige Dinge, die Sie in Bezug auf die folgende Python-AWS Lambda-Beispielfunktion beachten sollten:
Sie verwendet das Backoff-Modul
. Sie legt
pool_size=1
fest, damit kein unnötiger Verbindungspool erstellt wird.Sie legt
message_serializer=serializer.GraphSONSerializersV2d0()
fest.
import os, sys, backoff, math from random import randint from gremlin_python import statics from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection from gremlin_python.driver.protocol import GremlinServerError from gremlin_python.driver import serializer from gremlin_python.process.anonymous_traversal import traversal from gremlin_python.process.graph_traversal import __ from gremlin_python.process.strategies import * from gremlin_python.process.traversal import T from aiohttp.client_exceptions import ClientConnectorError from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.credentials import ReadOnlyCredentials from types import SimpleNamespace import logging logger = logging.getLogger() logger.setLevel(logging.INFO) reconnectable_err_msgs = [ 'ReadOnlyViolationException', 'Server disconnected', 'Connection refused', 'Connection was already closed', 'Connection was closed by server', 'Failed to connect to server: HTTP Error code 403 - Forbidden' ] retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs network_errors = [OSError, ClientConnectorError] retriable_errors = [GremlinServerError, RuntimeError, Exception] + network_errors def prepare_iamdb_request(database_url): service = 'neptune-db' method = 'GET' access_key = os.environ['AWS_ACCESS_KEY_ID'] secret_key = os.environ['AWS_SECRET_ACCESS_KEY'] region = os.environ['AWS_REGION'] session_token = os.environ['AWS_SESSION_TOKEN'] creds = SimpleNamespace( access_key=access_key, secret_key=secret_key, token=session_token, region=region, ) request = AWSRequest(method=method, url=database_url, data=None) SigV4Auth(creds, service, region).add_auth(request) return (database_url, request.headers.items()) def is_retriable_error(e): is_retriable = False err_msg = str(e) if isinstance(e, tuple(network_errors)): is_retriable = True else: is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs) logger.error('error: [{}] {}'.format(type(e), err_msg)) logger.info('is_retriable: {}'.format(is_retriable)) return is_retriable def is_non_retriable_error(e): return not is_retriable_error(e) def reset_connection_if_connection_issue(params): is_reconnectable = False e = sys.exc_info()[1] err_msg = str(e) if isinstance(e, tuple(network_errors)): is_reconnectable = True else: is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs) logger.info('is_reconnectable: {}'.format(is_reconnectable)) if is_reconnectable: global conn global g conn.close() conn = create_remote_connection() g = create_graph_traversal_source(conn) @backoff.on_exception(backoff.constant, tuple(retriable_errors), max_tries=5, jitter=None, giveup=is_non_retriable_error, on_backoff=reset_connection_if_connection_issue, interval=1) def query(**kwargs): id = kwargs['id'] return (g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(T.id, id) ) .id().next()) def doQuery(event): return query(id=str(randint(0, 10000))) def lambda_handler(event, context): result = doQuery(event) logger.info('result – {}'.format(result)) return result def create_graph_traversal_source(conn): return traversal().withRemote(conn) def create_remote_connection(): logger.info('Creating remote connection') (database_url, headers) = connection_info() return DriverRemoteConnection( database_url, 'g', pool_size=1, message_serializer=serializer.GraphSONSerializersV2d0(), headers=headers) def connection_info(): database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort']) if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true': return prepare_iamdb_request(database_url) else: return (database_url, {}) conn = create_remote_connection() g = create_graph_traversal_source(conn)
Hier sind Beispielergebnisse, die Zeiten hoher und geringer Auslastung im Wechsel zeigen: