

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# AWS Lambda contoh fungsi untuk Amazon Neptunus
<a name="lambda-functions-examples"></a>

Contoh AWS Lambda fungsi berikut, ditulis dalam Java, JavaScript dan Python, menggambarkan upserting satu simpul dengan ID yang dihasilkan secara acak menggunakan idiom. `fold().coalesce().unfold()`

Banyak kode di setiap fungsi adalah kode boilerplate, bertanggung jawab untuk mengelola koneksi dan mencoba kembali koneksi juga query jika terjadi kesalahan. Logika aplikasi nyata dan query Gremlin diimplementasikan dalam metode `doQuery()` dan`query()` berturut-turut. Jika Anda menggunakan contoh-contoh ini sebagai dasar untuk fungsi Lambda Anda sendiri, Anda dapat berkonsentrasi pada modifikasi `doQuery()` dan `query()`.

Fungsi dikonfigurasi untuk mencoba lagi kueri yang gagal 5 kali, menunggu 1 detik antara pengulangan itu.

Fungsi memerlukan nilai untuk hadir dalam variabel lingkungan Lambda berikut:
+ **`NEPTUNE_ENDPOINT`**   –   Titik akhir klaster DB Neptune Anda. Untuk Python, ini seharusnya. `neptuneEndpoint`
+ **`NEPTUNE_PORT`**   –   Port Neptune. Untuk Python, ini seharusnya. `neptunePort`
+ **`USE_IAM `**— (`true`atau`false`) Jika database Anda memiliki otentikasi database AWS Identity and Access Management (IAM) diaktifkan, atur variabel `USE_IAM` lingkungan ke. `true` Hal ini menyebabkan fungsi Lambda untuk permintaan koneksi SIGV4-sign ke Neptune. Untuk permintaan auth DB IAM seperti itu, pastikan bahwa peran eksekusi fungsi Lambda memiliki kebijakan IAM tepat yang terpasang, yang memungkinkan fungsi untuk terhubung ke klaster Neptune DB Anda (lihat [Jenis kebijakan IAM](security-iam-access-manage.md#iam-auth-policy)).

## Contoh Fungsi Lambda Java untuk Amazon Neptune
<a name="lambda-functions-examples-java"></a>

Berikut adalah beberapa hal yang perlu diingat tentang AWS Lambda fungsi Java:
+ Driver Java mempertahankan kolam koneksinya sendiri, yang tidak Anda butuhkan, jadi konfigurasikan objek `Cluster` dengan `minConnectionPoolSize(1)` dan `maxConnectionPoolSize(1)`.
+ Objek `Cluster` bisa saja lambat untuk dibangun karena objek tersebut membuat satu atau lebih serializers (Gyro secara default, ditambah lainnya jika Anda telah mengkonfigurasinya untuk format output tambahan seperti `binary`). Ini perlu beberapa waktu untuk instantiate.
+ Kolam koneksi diinisialisasi dengan permintaan pertama. Pada titik ini, driver mengatur tumpukan `Netty`, mengalokasikan byte buffer, dan membuat kunci penandatanganan jika Anda menggunakan IAM DB auth. Semua yang dapat menambah latensi cold-start.
+ Kolam koneksi Driver Java memonitor ketersediaan host server dan secara otomatis mencoba menyambung kembali jika koneksi gagal. Itu dimulai tugas latar belakang untuk mencoba membangun kembali koneksi. Gunakan `reconnectInterval( )` untuk mengkonfigurasi interval antara upaya rekoneksi. Sementara driver sedang mencoba untuk menyambung kembali, fungsi Lambda Anda hanya dapat mencoba lagi query.

  Jika interval antara mencoba kembali lebih kecil daripada interval antara upaya menyambung kembali, mencoba kembali pada koneksi yang akan gagal lagi karena host dianggap tidak tersedia. Ini tidak berlaku untuk mencoba lagi `ConcurrentModificationException`.
+ Gunakan Java 8 bukan Java 11. Optimasi `Netty` tidak diaktifkan secara default di Java 11.
+ Contoh ini menggunakan [Retry4j](https://github.com/elennick/retry4j) untuk mencoba ulang.
+ Untuk menggunakan penandatanganan driver `Sigv4` dalam fungsi Lambda Java Anda, lihat persyaratan dependensi di [Menghubungkan ke database Amazon Neptunus menggunakan IAM dengan Gremlin Java](iam-auth-connecting-gremlin-java.md).

**Awas**  
`CallExecutor`Dari Retry4j mungkin tidak aman untuk utas. Pertimbangkan agar setiap utas menggunakan `CallExecutor` instance-nya sendiri.

**catatan**  
 Contoh berikut telah diperbarui untuk menyertakan penggunaan requestInterceptor (). Ini ditambahkan di TinkerPop 3.6.6. Sebelum TinkerPop versi 3.6.6, contoh kode menggunakan HandshakeInterceptor (), yang tidak digunakan lagi dengan rilis itu. 

```
package com.amazonaws.examples.social;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.evanlennick.retry4j.CallExecutor;
import com.evanlennick.retry4j.CallExecutorBuilder;
import com.evanlennick.retry4j.Status;
import com.evanlennick.retry4j.config.RetryConfig;
import com.evanlennick.retry4j.config.RetryConfigBuilder;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.neptune.auth.NeptuneNettyHttpSigV4Signer;
import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.structure.T;

import java.io.*;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.function.Function;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold;

public class MyHandler implements RequestStreamHandler {

  private final GraphTraversalSource g;
  private final CallExecutor<Object> executor;
  private final Random idGenerator = new Random();

  public MyHandler() {

    this.g = AnonymousTraversalSource
            .traversal()
            .withRemote(DriverRemoteConnection.using(createCluster()));


    this.executor = new CallExecutorBuilder<Object>()
            .config(createRetryConfig())
            .build();

  }

  @Override
  public void handleRequest(InputStream input,
                            OutputStream output,
                            Context context) throws IOException {

    doQuery(input, output);
  }

  private void doQuery(InputStream input, OutputStream output) throws IOException {
    try {

      Map<String, Object> args = new HashMap<>();
      args.put("id", idGenerator.nextInt());

      String result = query(args);

      try (Writer writer = new BufferedWriter(new OutputStreamWriter(output, UTF_8))) {
          writer.write(result);
      }

    } finally {
        input.close();
        output.close();
    }
  }

  private String query(Map<String, Object> args) {
    int id = (int) args.get("id");

    @SuppressWarnings("unchecked")
    Callable<Object> query = () -> g.V(id)
      .fold()
      .coalesce(
        unfold(),
        addV("Person").property(T.id, id))
      .id().next();

    Status<Object> status = executor.execute(query);

    return status.getResult().toString();
  }

  private Cluster createCluster() {
    Cluster.Builder builder = Cluster.build()
                                     .addContactPoint(System.getenv("NEPTUNE_ENDPOINT"))
                                     .port(Integer.parseInt(System.getenv("NEPTUNE_PORT")))
                                     .enableSsl(true)
                                     .minConnectionPoolSize(1)
                                     .maxConnectionPoolSize(1)
                                     .serializer(Serializers.GRAPHBINARY_V1D0)
                                     .reconnectInterval(2000);

  if (Boolean.parseBoolean(getOptionalEnv("USE_IAM", "true"))) {
    // The following example uses requestInterceptor(), which was introduced
    // in TinkerPop 3.6.6. If you are using a TinkerPop version earlier than
    // 3.6.6 (but 3.5.5 or higher), use handshakeInterceptor() instead.
    builder.requestInterceptor( r ->
      {
        NeptuneNettyHttpSigV4Signer sigV4Signer = new NeptuneNettyHttpSigV4Signer(region, DefaultCredentialsProvider.create());
        sigV4Signer.signRequest(r);
        return r;
      }
    )
    
    return builder.create();
  }

  private RetryConfig createRetryConfig() {
    return new RetryConfigBuilder().retryOnCustomExceptionLogic(retryLogic())
                                   .withDelayBetweenTries(1000, ChronoUnit.MILLIS)
                                   .withMaxNumberOfTries(5)
                                   .withFixedBackoff()
                                   .build();
  }

  private Function<Exception, Boolean> retryLogic() {
    return e -> {
      StringWriter stringWriter = new StringWriter();
      e.printStackTrace(new PrintWriter(stringWriter));
      String message = stringWriter.toString();

      // Check for connection issues
      if ( message.contains("Timed out while waiting for an available host") ||
           message.contains("Timed-out waiting for connection on Host") ||
           message.contains("Connection to server is no longer active") ||
           message.contains("Connection reset by peer") ||
           message.contains("SSLEngine closed already") ||
           message.contains("Pool is shutdown") ||
           message.contains("ExtendedClosedChannelException") ||
           message.contains("Broken pipe")) {
        return true;
      }

      // Concurrent writes can sometimes trigger a ConcurrentModificationException.
      // In these circumstances you may want to backoff and retry.
      if (message.contains("ConcurrentModificationException")) {
          return true;
      }

      // If the primary fails over to a new instance, existing connections to the old primary will
      // throw a ReadOnlyViolationException. You may want to back and retry.
      if (message.contains("ReadOnlyViolationException")) {
          return true;
      }

      return false;
    };
  }

  private String getOptionalEnv(String name, String defaultValue) {
    String value = System.getenv(name);
    if (value != null && value.length() > 0) {
      return value;
    } else {
      return defaultValue;
    }
  }
}
```

Jika Anda ingin menyertakan logika sambung kembali dalam fungsi Anda, lihat [Sampel sambungan ulang Java](access-graph-gremlin-java-reconnect-example.md).

## JavaScript Contoh fungsi Lambda untuk Amazon Neptunus
<a name="lambda-functions-examples-javascript"></a>

**Catatan tentang contoh ini**
+  JavaScript Pengemudi tidak memelihara kolam koneksi. Selalu membuka koneksi tunggal.
+ Fungsi contoh menggunakan utilitas penandatanganan Sigv4 dari [gremlin-aws-sigv4](https://github.com/shutterstock/gremlin-aws-sigv4) untuk menandatangani permintaan ke database berkemampuan otentikasi IAM.
+ Ini menggunakan fungsi [retry ()](https://caolan.github.io/async/v3/docs.html#retry) dari [modul utilitas async](https://github.com/caolan/async) open-source untuk menangani upaya. backoff-and-retry
+ Langkah terminal Gremlin mengembalikan a JavaScript `promise` (lihat [TinkerPop dokumentasi](https://tinkerpop.apache.org/docs/current/reference/#gremlin-javascript-connecting)). Untuk `next()`, ini adalah tupel `{value, done}`.
+ Kesalahan koneksi muncul di dalam handler, dan ditangani dengan menggunakan beberapa backoff-and-retry logika sesuai dengan rekomendasi yang diuraikan di sini, dengan satu pengecualian. Ada satu jenis masalah koneksi yang pengemudi tidak perlakukan sebagai pengecualian, dan karena itu tidak dapat diakomodasi oleh backoff-and-retry logika ini.

  Masalahnya adalah jika koneksi ditutup setelah driver mengirimkan permintaan tapi sebelum driver menerima respon, kueri terlihat selesai namun kembali ke nilai null. Sejauh fungsi lambda klien diperhatikan, fungsi terlihat berhasil, tetapi dengan respon kosong.

  Dampak dari masalah ini tergantung pada cara aplikasi Anda memperlakukan respon kosong. Beberapa aplikasi mungkin memperlakukan respon kosong dari permintaan baca sebagai kesalahan, tetapi yang lain mungkin keliru memperlakukannya sebagai hasil kosong.

  Menulis permintaan yang mengalami masalah koneksi ini juga akan mengembalikan respon kosong. Apakah invokasi yang berhasil dengan respon kosong menandakan keberhasilan atau kegagalan? Jika klien yang mengaktifkan fungsi tulis hanya memperlakukan invokasi yang berhasil dari fungsi agar berarti menulis ke database telah dilakukan, daripada memeriksa tubuh respons, sistem mungkin tampak kehilangan data.

  Masalah ini keluar dari cara driver memperlakukan peristiwa yang dipancarkan oleh socket yang mendasari. Ketika soket jaringan yang mendasarinya ditutup dengan `ECONNRESET` kesalahan, yang WebSocket digunakan oleh pengemudi ditutup dan memancarkan `'ws close'` peristiwa. Tidak ada apapun dalam driver, namun, untuk menangani peristiwa itu dengan cara yang dapat digunakan untuk meningkatkan pengecualian. Akibatnya, kueri itu hilang begitu saja.

  Untuk mengatasi masalah ini, fungsi lambda contoh di sini menambahkan handler peristiwa `'ws close'` yang melempar pengecualian untuk driver saat membuat sambungan jarak jauh. Namun, pengecualian ini tidak muncul di sepanjang jalur permintaan-respons kueri Gremlin, dan karena itu tidak dapat digunakan untuk memicu backoff-and-retry logika apa pun dalam fungsi lambda itu sendiri. Sebaliknya, pengecualian yang dilemparkan oleh handler peristiwa `'ws close'` membawa hasil dalam pengecualian tidak tertangani yang menyebabkan invokasi lambda gagal. Hal ini memungkinkan klien yang meng-invoke fungsi untuk menangani kesalahan dan mencoba lagi invokasi lambda jika sesuai.

  Kami menyarankan Anda menerapkan backoff-and-retry logika dalam fungsi lambda itu sendiri untuk melindungi klien Anda dari masalah koneksi intermiten. Namun, pemecahan masalah di atas memerlukan klien agar menerapkan logika coba lagi juga, untuk menangani kegagalan yang dihasilkan dari masalah sambungan tertentu ini.

### Kode JavaScript
<a name="lambda-functions-examples-javascript-code"></a>

```
const gremlin = require('gremlin');
const async = require('async');
const {getUrlAndHeaders} = require('gremlin-aws-sigv4/lib/utils');

const traversal = gremlin.process.AnonymousTraversalSource.traversal;
const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection;
const t = gremlin.process.t;
const __ = gremlin.process.statics;

let conn = null;
let g = null;

async function query(context) {

  const id = context.id;

  return g.V(id)
    .fold()
    .coalesce(
      __.unfold(), 
      __.addV('User').property(t.id, id)
    )
    .id().next();
}

async function doQuery() {
  const id = Math.floor(Math.random() * 10000).toString();

  let result = await query({id: id}); 
  return result['value'];
}

exports.handler = async (event, context) => {

  const getConnectionDetails = () => {
    if (process.env['USE_IAM'] == 'true'){
       return getUrlAndHeaders(
         process.env['NEPTUNE_ENDPOINT'],
         process.env['NEPTUNE_PORT'],
         {},
         '/gremlin',
         'wss'); 
    } else {
      const database_url = 'wss://' + process.env['NEPTUNE_ENDPOINT'] + ':' + process.env['NEPTUNE_PORT'] + '/gremlin';
      return { url: database_url, headers: {}};
    }    
  };


  const createRemoteConnection = () => {
    const { url, headers } = getConnectionDetails();

    const c = new DriverRemoteConnection(
      url, 
      {          
        headers: headers 
      });  

     c._client._connection.on('close', (code, message) => {
         console.info(`close - ${code} ${message}`);
         if (code == 1006){
           console.error('Connection closed prematurely');
           throw new Error('Connection closed prematurely');
         }
       });  

     return c;     
  };

  const createGraphTraversalSource = (conn) => {
    return traversal().withRemote(conn);
  };

  if (conn == null){
    console.info("Initializing connection")
    conn = createRemoteConnection();
    g = createGraphTraversalSource(conn);
  }

  return async.retry(
    { 
      times: 5,
      interval: 1000,
      errorFilter: function (err) { 

        // Add filters here to determine whether error can be retried
        console.warn('Determining whether retriable error: ' + err.message);

        // Check for connection issues
        if (err.message.startsWith('WebSocket is not open')){
          console.warn('Reopening connection');
          conn.close();
          conn = createRemoteConnection();
          g = createGraphTraversalSource(conn);
          return true;
        }

        // Check for ConcurrentModificationException
        if (err.message.includes('ConcurrentModificationException')){
          console.warn('Retrying query because of ConcurrentModificationException');
          return true;
        }

        // Check for ReadOnlyViolationException
        if (err.message.includes('ReadOnlyViolationException')){
          console.warn('Retrying query because of ReadOnlyViolationException');
          return true;
        }

        return false; 
      }

    }, 
    doQuery);
};
```

## Contoh Fungsi Lambda Python untuk Amazon Neptune
<a name="lambda-functions-examples-python"></a>

Berikut adalah beberapa hal yang perlu diperhatikan tentang fungsi contoh AWS Lambda Python berikut:
+ Ini menggunakan [modul backoff](https://pypi.org/project/backoff/).
+ Ini diatur `pool_size=1` untuk menjaga dari pembuatan kolam koneksi yang tidak perlu.

```
import os, sys, backoff, math
from random import randint
from gremlin_python import statics
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.driver.protocol import GremlinServerError
from gremlin_python.driver import serializer
from gremlin_python.process.anonymous_traversal import traversal
from gremlin_python.process.graph_traversal import __
from gremlin_python.process.strategies import *
from gremlin_python.process.traversal import T
from aiohttp.client_exceptions import ClientConnectorError
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import ReadOnlyCredentials
from types import SimpleNamespace

import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)


reconnectable_err_msgs = [
    'ReadOnlyViolationException',
    'Server disconnected',
    'Connection refused',
    'Connection was already closed',
    'Connection was closed by server',
    'Failed to connect to server: HTTP Error code 403 - Forbidden'
]

retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs

network_errors = [OSError, ClientConnectorError]

retriable_errors = [GremlinServerError, RuntimeError, Exception] + network_errors

def prepare_iamdb_request(database_url):

    service = 'neptune-db'
    method = 'GET'

    access_key = os.environ['AWS_ACCESS_KEY_ID']
    secret_key = os.environ['AWS_SECRET_ACCESS_KEY']
    region = os.environ['AWS_REGION']
    session_token = os.environ['AWS_SESSION_TOKEN']


    creds = SimpleNamespace(
        access_key=access_key, secret_key=secret_key, token=session_token, region=region,
    )

    request = AWSRequest(method=method, url=database_url, data=None)
    SigV4Auth(creds, service, region).add_auth(request)

    return database_url, request.headers.items()

def is_retriable_error(e):

    is_retriable = False
    err_msg = str(e)

    if isinstance(e, tuple(network_errors)):
        is_retriable = True
    else:
        is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs)

    logger.error('error: [{}] {}'.format(type(e), err_msg))
    logger.info('is_retriable: {}'.format(is_retriable))

    return is_retriable

def is_non_retriable_error(e):
    return not is_retriable_error(e)

def reset_connection_if_connection_issue(params):

    is_reconnectable = False

    e = sys.exc_info()[1]
    err_msg = str(e)

    if isinstance(e, tuple(network_errors)):
        is_reconnectable = True
    else:
        is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs)

    logger.info('is_reconnectable: {}'.format(is_reconnectable))

    if is_reconnectable:
        global conn
        global g
        conn.close()
        conn = create_remote_connection()
        g = create_graph_traversal_source(conn)

@backoff.on_exception(backoff.constant,
                      tuple(retriable_errors),
                      max_tries=5,
                      jitter=None,
                      giveup=is_non_retriable_error,
                      on_backoff=reset_connection_if_connection_issue,
                      interval=1)
def query(**kwargs):

    id = kwargs['id']

    return (g.V().hasLabel('column').has('column_name', 'amhstr_ag_type').in_('hascolumn').dedup().valueMap().limit(10).toList())

def doQuery(event):
    return query(id=str(randint(0, 10000)))

def lambda_handler(event, context):
    result = doQuery(event)
    logger.info('result – {}'.format(result))
    return result

def create_graph_traversal_source(conn):
    return traversal().withRemote(conn)

def create_remote_connection():
    logger.info('Creating remote connection')

    (database_url, headers) = connection_info()

    # Convert headers to a dictionary if it's not already
    headers_dict = dict(headers) if isinstance(headers, list) else headers

    print(headers)
    return DriverRemoteConnection(
        database_url,
        'g',
        pool_size=1,
        headers=headers_dict)


def connection_info():

    database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort'])

    if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true':
        return prepare_iamdb_request(database_url)
    else:
        return (database_url, {})

conn = create_remote_connection()
g = create_graph_traversal_source(conn)
```

Berikut adalah hasil sampel, menunjukkan periode bolak-balik beban berat dan ringan:

![\[Diagram menunjukkan hasil sampel dari contoh fungsi Lambda Python.\]](http://docs.aws.amazon.com/id_id/neptune/latest/userguide/images/python-lambda-results.png)
