Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
AWS Lambda contoh fungsi untuk Amazon Neptune
ContohAWS Lambda fungsi berikut, ditulis dalam Java, JavaScript dan Python, menggambarkan upserting sebuah vertex tunggal dengan ID yang dihasilkan secara acak menggunakanfold().coalesce().unfold()
idiom tersebut.
Banyak kode di setiap fungsi adalah kode boilerplate, bertanggung jawab untuk mengelola koneksi dan mencoba kembali koneksi juga query jika terjadi kesalahan. Logika aplikasi nyata dan query Gremlin diimplementasikan dalam metode doQuery()
danquery()
berturut-turut. Jika Anda menggunakan contoh-contoh ini sebagai dasar untuk fungsi Lambda Anda sendiri, Anda dapat berkonsentrasi pada modifikasi doQuery()
dan query()
.
Fungsi dikonfigurasi untuk mencoba lagi kueri yang gagal 5 kali, menunggu 1 detik antara pengulangan itu.
Fungsi memerlukan nilai untuk hadir dalam variabel lingkungan Lambda berikut:
NEPTUNE_ENDPOINT
– Titik akhir klaster DB Neptune Anda. Untuk Python, ini seharusnyaneptuneEndpoint
.NEPTUNE_PORT
– Port Neptune. Untuk Python, ini seharusnyaneptunePort
.-
USE_IAM
– (true
ataufalse
) Jika basis data Anda memiliki AWS Identity and Access Management Autentikasi basis Data (IAM) yang diaktifkan, atur variabel lingkungan keUSE_IAM
ketrue
. Hal ini menyebabkan fungsi Lambda untuk permintaan koneksi SIGV4-sign ke Neptune. Untuk permintaan auth DB IAM seperti itu, pastikan bahwa peran eksekusi fungsi Lambda memiliki kebijakan IAM tepat yang terpasang, yang memungkinkan fungsi untuk terhubung ke klaster Neptune DB Anda (lihat Jenis IAM kebijakan).
Contoh Fungsi Lambda Java untuk Amazon Neptune
Berikut adalah beberapa hal yang perlu diingat tentang Fungsi AWS Lambda Java:
Driver Java mempertahankan kolam koneksinya sendiri, yang tidak Anda butuhkan, jadi konfigurasikan objek
Cluster
denganminConnectionPoolSize(1)
danmaxConnectionPoolSize(1)
.Objek
Cluster
bisa saja lambat untuk dibangun karena objek tersebut membuat satu atau lebih serializers (Gyro secara default, ditambah lainnya jika Anda telah mengkonfigurasinya untuk format output tambahan sepertibinary
). Ini perlu beberapa waktu untuk instantiate.Kolam koneksi diinisialisasi dengan permintaan pertama. Pada titik ini, driver mengatur tumpukan
Netty
, mengalokasikan byte buffer, dan membuat kunci penandatanganan jika Anda menggunakan IAM DB auth. Semua yang dapat menambah latensi cold-start.-
Kolam koneksi Driver Java memonitor ketersediaan host server dan secara otomatis mencoba menyambung kembali jika koneksi gagal. Itu dimulai tugas latar belakang untuk mencoba membangun kembali koneksi. Gunakan
reconnectInterval( )
untuk mengkonfigurasi interval antara upaya rekoneksi. Sementara driver sedang mencoba untuk menyambung kembali, fungsi Lambda Anda hanya dapat mencoba lagi query.Jika interval antara mencoba kembali lebih kecil daripada interval antara upaya menyambung kembali, mencoba kembali pada koneksi yang akan gagal lagi karena host dianggap tidak tersedia. Ini tidak berlaku untuk mencoba lagi
ConcurrentModificationException
. Gunakan Java 8 bukan Java 11. Optimasi
Netty
tidak diaktifkan secara default di Java 11.Contoh ini menggunakan Retry4j
untuk mencoba ulang. Untuk menggunakan penandatanganan driver
Sigv4
dalam fungsi Lambda Java Anda, lihat persyaratan dependensi di Menghubungkan ke database IAM Amazon Neptunus menggunakan dengan Gremlin Java.
Awas
TheCallExecutor
dari Retry4j mungkin tidak thread-safe. Pertimbangkan agar setiap thread menggunakanCallExecutor
instansinya sendiri.
package com.amazonaws.examples.social; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.evanlennick.retry4j.CallExecutor; import com.evanlennick.retry4j.CallExecutorBuilder; import com.evanlennick.retry4j.Status; import com.evanlennick.retry4j.config.RetryConfig; import com.evanlennick.retry4j.config.RetryConfigBuilder; import org.apache.tinkerpop.gremlin.driver.Cluster; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.neptune.auth.NeptuneNettyHttpSigV4Signer; import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; import org.apache.tinkerpop.gremlin.driver.ser.Serializers; import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.structure.T; import java.io.*; import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.function.Function; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; public class MyHandler implements RequestStreamHandler { private final GraphTraversalSource g; private final CallExecutor<Object> executor; private final Random idGenerator = new Random(); public MyHandler() { this.g = AnonymousTraversalSource .traversal() .withRemote(DriverRemoteConnection.using(createCluster())); this.executor = new CallExecutorBuilder<Object>() .config(createRetryConfig()) .build(); } @Override public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { doQuery(input, output); } private void doQuery(InputStream input, OutputStream output) throws IOException { try { Map<String, Object> args = new HashMap<>(); args.put("id", idGenerator.nextInt()); String result = query(args); try (Writer writer = new BufferedWriter(new OutputStreamWriter(output, UTF_8))) { writer.write(result); } } finally { input.close(); output.close(); } } private String query(Map<String, Object> args) { int id = (int) args.get("id"); @SuppressWarnings("unchecked") Callable<Object> query = () -> g.V(id) .fold() .coalesce( unfold(), addV("Person").property(T.id, id)) .id().next(); Status<Object> status = executor.execute(query); return status.getResult().toString(); } private Cluster createCluster() { Cluster.Builder builder = Cluster.build() .addContactPoint(System.getenv("NEPTUNE_ENDPOINT")) .port(Integer.parseInt(System.getenv("NEPTUNE_PORT"))) .enableSsl(true) .minConnectionPoolSize(1) .maxConnectionPoolSize(1) .serializer(Serializers.GRAPHBINARY_V1D0) .reconnectInterval(2000); if (Boolean.parseBoolean(getOptionalEnv("USE_IAM", "true"))) { // For versions of TinkerPop 3.4.11 or higher: builder.handshakeInterceptor( r -> { NeptuneNettyHttpSigV4Signer sigV4Signer = new NeptuneNettyHttpSigV4Signer(region, new DefaultAWSCredentialsProviderChain()); sigV4Signer.signRequest(r); return r; } ) // Versions of TinkerPop prior to 3.4.11 should use the following approach. // Be sure to adjust the imports to include: // import org.apache.tinkerpop.gremlin.driver.SigV4WebSocketChannelizer; // builder = builder.channelizer(SigV4WebSocketChannelizer.class); return builder.create(); } private RetryConfig createRetryConfig() { return new RetryConfigBuilder().retryOnCustomExceptionLogic(retryLogic()) .withDelayBetweenTries(1000, ChronoUnit.MILLIS) .withMaxNumberOfTries(5) .withFixedBackoff() .build(); } private Function<Exception, Boolean> retryLogic() { return e -> { StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String message = stringWriter.toString(); // Check for connection issues if ( message.contains("Timed out while waiting for an available host") || message.contains("Timed-out waiting for connection on Host") || message.contains("Connection to server is no longer active") || message.contains("Connection reset by peer") || message.contains("SSLEngine closed already") || message.contains("Pool is shutdown") || message.contains("ExtendedClosedChannelException") || message.contains("Broken pipe")) { return true; } // Concurrent writes can sometimes trigger a ConcurrentModificationException. // In these circumstances you may want to backoff and retry. if (message.contains("ConcurrentModificationException")) { return true; } // If the primary fails over to a new instance, existing connections to the old primary will // throw a ReadOnlyViolationException. You may want to back and retry. if (message.contains("ReadOnlyViolationException")) { return true; } return false; }; } private String getOptionalEnv(String name, String defaultValue) { String value = System.getenv(name); if (value != null && value.length() > 0) { return value; } else { return defaultValue; } } }
Jika Anda ingin menyertakan logika sambung kembali dalam fungsi Anda, lihat Sampel sambungan ulang Java.
JavaScript Contoh Fungsi Lambda Lambda lambda untuk Amazon Neptune
Catatan tentang contoh ini
JavaScript Pengemudi tidak memelihara kolam koneksi. Selalu membuka koneksi tunggal.
Fungsi contoh menggunakan utilitas penandatanganan Sigv4 dari gremlin-aws-sigv4
untuk menandatangani permintaan ke IAM authentication-enabled database. Itu menggunakan fungsi retry ()
dari modul utilitas async sumber terbuka untuk menangani backoff-and-retry upaya. langkah-langkah terminal Gremlin kembali JavaScript
promise
(lihat TinkerPop dokumentasi). Untuk next()
, ini adalah tupel{value, done}
.-
Kesalahan koneksi diajukan di dalam handler, dan ditangani dengan menggunakan beberapa backoff-and-retry logika sejalan dengan rekomendasi yang diuraikan di sini, dengan satu pengecualian. Ada satu jenis masalah koneksi bahwa driver tersebut tidak dianggap sebagai pengecualian, dan oleh karena itu tidak dapat ditampung oleh backoff-and-retry logika ini.
Masalahnya adalah jika koneksi ditutup setelah driver mengirimkan permintaan tapi sebelum driver menerima respon, kueri terlihat selesai namun kembali ke nilai null. Sejauh fungsi lambda klien diperhatikan, fungsi terlihat berhasil, tetapi dengan respon kosong.
Dampak dari masalah ini tergantung pada cara aplikasi Anda memperlakukan respon kosong. Beberapa aplikasi mungkin memperlakukan respon kosong dari permintaan baca sebagai kesalahan, tetapi yang lain mungkin keliru memperlakukannya sebagai hasil kosong.
Menulis permintaan yang mengalami masalah koneksi ini juga akan mengembalikan respon kosong. Apakah invokasi yang berhasil dengan respon kosong menandakan keberhasilan atau kegagalan? Jika klien yang mengaktifkan fungsi tulis hanya memperlakukan invokasi yang berhasil dari fungsi agar berarti menulis ke database telah dilakukan, daripada memeriksa tubuh respons, sistem mungkin tampak kehilangan data.
Masalah ini keluar dari cara driver memperlakukan peristiwa yang dipancarkan oleh socket yang mendasari. Ketika jaringan yang mendasari socket ditutup dengan
ECONNRESET
kesalahan, yang WebSocket digunakan driver ditutup dan memancarkan'ws close'
peristiwa. Tidak ada apapun dalam driver, namun, untuk menangani peristiwa itu dengan cara yang dapat digunakan untuk meningkatkan pengecualian. Akibatnya, kueri itu hilang begitu saja.Untuk mengatasi masalah ini, fungsi lambda contoh di sini menambahkan handler peristiwa
'ws close'
yang melempar pengecualian untuk driver saat membuat sambungan jarak jauh. Namun, pengecualian ini tidak diajukan di sepanjang jalur permintaan-respons query Gremlin, dan oleh karenanya tidak dapat digunakan untuk memicu backoff-and-retry logika apa pun dalam fungsi lambda itu sendiri. Sebaliknya, pengecualian yang dilemparkan oleh handler peristiwa'ws close'
membawa hasil dalam pengecualian tidak tertangani yang menyebabkan invokasi lambda gagal. Hal ini memungkinkan klien yang meng-invoke fungsi untuk menangani kesalahan dan mencoba lagi invokasi lambda jika sesuai.Kami merekomendasikan Anda menerapkan backoff-and-retry logika dalam fungsi lambda itu sendiri untuk melindungi klien Anda dari masalah koneksi intermiten. Namun, pemecahan masalah di atas memerlukan klien agar menerapkan logika coba lagi juga, untuk menangani kegagalan yang dihasilkan dari masalah sambungan tertentu ini.
Kode JavaScript
const gremlin = require('gremlin'); const async = require('async'); const {getUrlAndHeaders} = require('gremlin-aws-sigv4/lib/utils'); const traversal = gremlin.process.AnonymousTraversalSource.traversal; const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection; const t = gremlin.process.t; const __ = gremlin.process.statics; let conn = null; let g = null; async function query(context) { const id = context.id; return g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(t.id, id) ) .id().next(); } async function doQuery() { const id = Math.floor(Math.random() * 10000).toString(); let result = await query({id: id}); return result['value']; } exports.handler = async (event, context) => { const getConnectionDetails = () => { if (process.env['USE_IAM'] == 'true'){ return getUrlAndHeaders( process.env['NEPTUNE_ENDPOINT'], process.env['NEPTUNE_PORT'], {}, '/gremlin', 'wss'); } else { const database_url = 'wss://' + process.env['NEPTUNE_ENDPOINT'] + ':' + process.env['NEPTUNE_PORT'] + '/gremlin'; return { url: database_url, headers: {}}; } }; const createRemoteConnection = () => { const { url, headers } = getConnectionDetails(); const c = new DriverRemoteConnection( url, { mimeType: 'application/vnd.gremlin-v2.0+json', headers: headers }); c._client._connection.on('close', (code, message) => { console.info(`close - ${code} ${message}`); if (code == 1006){ console.error('Connection closed prematurely'); throw new Error('Connection closed prematurely'); } }); return c; }; const createGraphTraversalSource = (conn) => { return traversal().withRemote(conn); }; if (conn == null){ console.info("Initializing connection") conn = createRemoteConnection(); g = createGraphTraversalSource(conn); } return async.retry( { times: 5, interval: 1000, errorFilter: function (err) { // Add filters here to determine whether error can be retried console.warn('Determining whether retriable error: ' + err.message); // Check for connection issues if (err.message.startsWith('WebSocket is not open')){ console.warn('Reopening connection'); conn.close(); conn = createRemoteConnection(); g = createGraphTraversalSource(conn); return true; } // Check for ConcurrentModificationException if (err.message.includes('ConcurrentModificationException')){ console.warn('Retrying query because of ConcurrentModificationException'); return true; } // Check for ReadOnlyViolationException if (err.message.includes('ReadOnlyViolationException')){ console.warn('Retrying query because of ReadOnlyViolationException'); return true; } return false; } }, doQuery); };
Contoh Fungsi Lambda Python untuk Amazon Neptune
Berikut adalah beberapa hal yang perlu diperhatikan tentang fungsi contoh AWS Lambda Python berikut:
Ini menggunakan modul backoff
. Ini diatur
pool_size=1
untuk menjaga dari pembuatan kolam koneksi yang tidak perlu.Ini menetapkan
message_serializer=serializer.GraphSONSerializersV2d0()
.
import os, sys, backoff, math from random import randint from gremlin_python import statics from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection from gremlin_python.driver.protocol import GremlinServerError from gremlin_python.driver import serializer from gremlin_python.process.anonymous_traversal import traversal from gremlin_python.process.graph_traversal import __ from gremlin_python.process.strategies import * from gremlin_python.process.traversal import T from aiohttp.client_exceptions import ClientConnectorError from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.credentials import ReadOnlyCredentials from types import SimpleNamespace import logging logger = logging.getLogger() logger.setLevel(logging.INFO) reconnectable_err_msgs = [ 'ReadOnlyViolationException', 'Server disconnected', 'Connection refused', 'Connection was already closed', 'Connection was closed by server', 'Failed to connect to server: HTTP Error code 403 - Forbidden' ] retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs network_errors = [OSError, ClientConnectorError] retriable_errors = [GremlinServerError, RuntimeError, Exception] + network_errors def prepare_iamdb_request(database_url): service = 'neptune-db' method = 'GET' access_key = os.environ['AWS_ACCESS_KEY_ID'] secret_key = os.environ['AWS_SECRET_ACCESS_KEY'] region = os.environ['AWS_REGION'] session_token = os.environ['AWS_SESSION_TOKEN'] creds = SimpleNamespace( access_key=access_key, secret_key=secret_key, token=session_token, region=region, ) request = AWSRequest(method=method, url=database_url, data=None) SigV4Auth(creds, service, region).add_auth(request) return (database_url, request.headers.items()) def is_retriable_error(e): is_retriable = False err_msg = str(e) if isinstance(e, tuple(network_errors)): is_retriable = True else: is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs) logger.error('error: [{}] {}'.format(type(e), err_msg)) logger.info('is_retriable: {}'.format(is_retriable)) return is_retriable def is_non_retriable_error(e): return not is_retriable_error(e) def reset_connection_if_connection_issue(params): is_reconnectable = False e = sys.exc_info()[1] err_msg = str(e) if isinstance(e, tuple(network_errors)): is_reconnectable = True else: is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs) logger.info('is_reconnectable: {}'.format(is_reconnectable)) if is_reconnectable: global conn global g conn.close() conn = create_remote_connection() g = create_graph_traversal_source(conn) @backoff.on_exception(backoff.constant, tuple(retriable_errors), max_tries=5, jitter=None, giveup=is_non_retriable_error, on_backoff=reset_connection_if_connection_issue, interval=1) def query(**kwargs): id = kwargs['id'] return (g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(T.id, id) ) .id().next()) def doQuery(event): return query(id=str(randint(0, 10000))) def lambda_handler(event, context): result = doQuery(event) logger.info('result – {}'.format(result)) return result def create_graph_traversal_source(conn): return traversal().withRemote(conn) def create_remote_connection(): logger.info('Creating remote connection') (database_url, headers) = connection_info() return DriverRemoteConnection( database_url, 'g', pool_size=1, message_serializer=serializer.GraphSONSerializersV2d0(), headers=headers) def connection_info(): database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort']) if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true': return prepare_iamdb_request(database_url) else: return (database_url, {}) conn = create_remote_connection() g = create_graph_traversal_source(conn)
Berikut adalah hasil sampel, menunjukkan periode bolak-balik beban berat dan ringan: