

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# AWS Lambda 亚马逊 Neptune 的函数示例
<a name="lambda-functions-examples"></a>

以下用 Java JavaScript 和 Python 编写的示例 AWS Lambda 函数说明了使用惯用惯用惯用随机生成的 ID 对单个顶点进行翻转。`fold().coalesce().unfold()`

每个函数中的大部分代码是样板代码，负责管理连接，并在出现错误时重试连接和查询。实际的应用程序逻辑和 Gremlin 查询分别在 `doQuery()` 和 `query()` 方法中实现。如果您使用这些示例作为自己的 Lambda 函数的基础，则可以专注于修改 `doQuery()` 和 `query()`。

这些函数配置为重试失败的查询 5 次，两次重试之间等待 1 秒钟。

这些函数要求值必须存在于以下 Lambda 环境变量中：
+ **`NEPTUNE_ENDPOINT`** – 您的 Neptune 数据库集群端点。对于 Python 来说，这应该是 `neptuneEndpoint`。
+ **`NEPTUNE_PORT`** – Neptune 端口。对于 Python 来说，这应该是 `neptunePort`。
+ **`USE_IAM `**—（`true`或`false`）如果您的数据库启用了 AWS Identity and Access Management (IAM) 数据库身份验证，请将`USE_IAM`环境变量设置为`true`。这会导致 Lambda 函数对指向 Neptune 的连接请求进行 Sigv4 签名。对于此类 IAM 数据库身份验证请求，请确保 Lambda 函数的执行角色附加了相应的 IAM policy，允许该函数连接到您的 Neptune 数据库集群（请参阅[IAM policy 的类型](security-iam-access-manage.md#iam-auth-policy)）。

## Amazon Neptune 的 Java Lambda 函数示例
<a name="lambda-functions-examples-java"></a>

关于 Java AWS Lambda 函数，请记住以下几点：
+ Java 驱动程序会维护其自己的连接池，而您不需要该连接池，因此请使用 `minConnectionPoolSize(1)` 和 `maxConnectionPoolSize(1)` 配置您的 `Cluster` 对象。
+ `Cluster` 对象的构建速度可能很慢，因为它会创建一个或多个序列化器 [默认情况下为 Gyro，如果您已将其配置为其它输出格式（例如 `binary`），则会创建另一个序列化器]。这些可能需要一段时间才能实例化。
+ 连接池使用第一个请求进行初始化。此时，如果您使用的是 IAM 数据库身份验证，驱动程序会设置 `Netty` 堆栈、分配字节缓冲区并创建签名密钥。所有这些都可能增加冷启动延迟。
+ Java 驱动程序的连接池监控服务器主机的可用性，并在连接失败时自动尝试重新连接。它会启动后台任务以尝试重新建立连接。使用 `reconnectInterval( )` 配置尝试重新连接的间隔。当驱动程序尝试重新连接时，您的 Lambda 函数只需重试查询即可。

  如果重试之间的间隔小于重新连接尝试之间的间隔，则在连接失败时重试会再次失败，因为主机被认为不可用。这不适用于在引发 `ConcurrentModificationException` 时重试。
+ 使用 Java 8 而不是 Java 11。在 Java 11 中，默认情况下不启用 `Netty` 优化。
+ 此示例使用 [Retry4j](https://github.com/elennick/retry4j) 进行重试。
+ 要在 Java Lambda 函数中使用 `Sigv4` 签名驱动程序，请参阅[通过 Gremlin Java 使用 IAM 连接到 Amazon Neptune 数据库](iam-auth-connecting-gremlin-java.md)中的依赖项要求。

**警告**  
来自 Retry4j 的 `CallExecutor` 可能不是线程安全的。考虑让每个线程使用自己的 `CallExecutor` 实例。

**注意**  
 以下示例已更新，加入了 requestInterceptor() 的使用。这是在 TinkerPop 3.6.6 中添加的。在 3.6.6 TinkerPop 版本之前，代码示例使用了 HandshakeInterceptor ()，该版本已不推荐使用该版本。

```
package com.amazonaws.examples.social;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.evanlennick.retry4j.CallExecutor;
import com.evanlennick.retry4j.CallExecutorBuilder;
import com.evanlennick.retry4j.Status;
import com.evanlennick.retry4j.config.RetryConfig;
import com.evanlennick.retry4j.config.RetryConfigBuilder;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.neptune.auth.NeptuneNettyHttpSigV4Signer;
import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.structure.T;

import java.io.*;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.function.Function;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold;

public class MyHandler implements RequestStreamHandler {

  private final GraphTraversalSource g;
  private final CallExecutor<Object> executor;
  private final Random idGenerator = new Random();

  public MyHandler() {

    this.g = AnonymousTraversalSource
            .traversal()
            .withRemote(DriverRemoteConnection.using(createCluster()));


    this.executor = new CallExecutorBuilder<Object>()
            .config(createRetryConfig())
            .build();

  }

  @Override
  public void handleRequest(InputStream input,
                            OutputStream output,
                            Context context) throws IOException {

    doQuery(input, output);
  }

  private void doQuery(InputStream input, OutputStream output) throws IOException {
    try {

      Map<String, Object> args = new HashMap<>();
      args.put("id", idGenerator.nextInt());

      String result = query(args);

      try (Writer writer = new BufferedWriter(new OutputStreamWriter(output, UTF_8))) {
          writer.write(result);
      }

    } finally {
        input.close();
        output.close();
    }
  }

  private String query(Map<String, Object> args) {
    int id = (int) args.get("id");

    @SuppressWarnings("unchecked")
    Callable<Object> query = () -> g.V(id)
      .fold()
      .coalesce(
        unfold(),
        addV("Person").property(T.id, id))
      .id().next();

    Status<Object> status = executor.execute(query);

    return status.getResult().toString();
  }

  private Cluster createCluster() {
    Cluster.Builder builder = Cluster.build()
                                     .addContactPoint(System.getenv("NEPTUNE_ENDPOINT"))
                                     .port(Integer.parseInt(System.getenv("NEPTUNE_PORT")))
                                     .enableSsl(true)
                                     .minConnectionPoolSize(1)
                                     .maxConnectionPoolSize(1)
                                     .serializer(Serializers.GRAPHBINARY_V1D0)
                                     .reconnectInterval(2000);

  if (Boolean.parseBoolean(getOptionalEnv("USE_IAM", "true"))) {
    // The following example uses requestInterceptor(), which was introduced
    // in TinkerPop 3.6.6. If you are using a TinkerPop version earlier than
    // 3.6.6 (but 3.5.5 or higher), use handshakeInterceptor() instead.
    builder.requestInterceptor( r ->
      {
        NeptuneNettyHttpSigV4Signer sigV4Signer = new NeptuneNettyHttpSigV4Signer(region, DefaultCredentialsProvider.create());
        sigV4Signer.signRequest(r);
        return r;
      }
    )
    
    return builder.create();
  }

  private RetryConfig createRetryConfig() {
    return new RetryConfigBuilder().retryOnCustomExceptionLogic(retryLogic())
                                   .withDelayBetweenTries(1000, ChronoUnit.MILLIS)
                                   .withMaxNumberOfTries(5)
                                   .withFixedBackoff()
                                   .build();
  }

  private Function<Exception, Boolean> retryLogic() {
    return e -> {
      StringWriter stringWriter = new StringWriter();
      e.printStackTrace(new PrintWriter(stringWriter));
      String message = stringWriter.toString();

      // Check for connection issues
      if ( message.contains("Timed out while waiting for an available host") ||
           message.contains("Timed-out waiting for connection on Host") ||
           message.contains("Connection to server is no longer active") ||
           message.contains("Connection reset by peer") ||
           message.contains("SSLEngine closed already") ||
           message.contains("Pool is shutdown") ||
           message.contains("ExtendedClosedChannelException") ||
           message.contains("Broken pipe")) {
        return true;
      }

      // Concurrent writes can sometimes trigger a ConcurrentModificationException.
      // In these circumstances you may want to backoff and retry.
      if (message.contains("ConcurrentModificationException")) {
          return true;
      }

      // If the primary fails over to a new instance, existing connections to the old primary will
      // throw a ReadOnlyViolationException. You may want to back and retry.
      if (message.contains("ReadOnlyViolationException")) {
          return true;
      }

      return false;
    };
  }

  private String getOptionalEnv(String name, String defaultValue) {
    String value = System.getenv(name);
    if (value != null && value.length() > 0) {
      return value;
    } else {
      return defaultValue;
    }
  }
}
```

如果要在函数中加入重新连接逻辑，请参阅[Java 重新连接示例](access-graph-gremlin-java-reconnect-example.md)。

## JavaScript 亚马逊 Neptune 的 Lambda 函数示例
<a name="lambda-functions-examples-javascript"></a>

**有关此示例的注意事项**
+  JavaScript 驱动程序不维护连接池。它总是打开单个连接。
+ [该示例函数使用 4 中的 Sigv4 签名实用程序对启用 IAM 身份验证的数据库的请求进行签名。gremlin-aws-sigv](https://github.com/shutterstock/gremlin-aws-sigv4)
+ 它使用开源[异步实用程序模块](https://github.com/caolan/async)中的 retry [() 函数来处理 backoff-and-retry尝试](https://caolan.github.io/async/v3/docs.html#retry)。
+ Gremlin 终端步骤返回 a JavaScript `promise`（参见[TinkerPop 文档](https://tinkerpop.apache.org/docs/current/reference/#gremlin-javascript-connecting)）。对于 `next()`，这是一个 `{value, done}` 元组。
+ 连接错误是在处理程序内部引发的，并使用符合此处概述的建议的 backoff-and-retry逻辑进行处理，但有一个例外。有一种连接问题，驱动程序不会将其视为例外，因此这种 backoff-and-retry逻辑无法解决这个问题。

  问题在于，如果在驱动程序发送请求之后但在驱动程序收到响应之前关闭连接，则查询似乎已完成，但返回 null 值。就 lambda 函数客户端而言，该函数似乎成功完成，但响应为空。

  此问题的影响取决于您的应用程序如何处理空响应。有些应用程序可能会将读取请求中的空响应视为错误，但其它应用程序可能会错误地将其视为空结果。

  遇到此连接问题的写入请求也将返回空响应。响应为空的成功调用是表示成功还是失败？ 如果调用写入函数的客户端只是将成功调用该函数视为已提交对数据库的写入，而不检查响应的正文，则系统可能会丢失数据。

  此问题源于驱动程序如何处理由底层套接字发出的事件。当底层网络套接字因`ECONNRESET`错误而关闭时，驱动程序 WebSocket 使用的套接字将被关闭并发出一个`'ws close'`事件。然而，驱动程序中没有任何内容可通过一种用于引发异常的方式来处理该事件。结果，查询就消失了。

  为了解决此问题，此处的示例 lambda 函数添加了一个 `'ws close'` 事件处理程序，用于在创建远程连接时会向驱动程序引发异常。但是，这个异常不是沿着 Gremlin 查询的请求-响应路径引发的，因此不能用来触发 lambda 函数本身中的任何 backoff-and-retry逻辑。相反，`'ws close'` 事件处理程序引发的异常会导致未处理的异常，从而导致 lambda 调用失败。这允许调用该函数的客户端处理错误，并在适当时重试 lambda 调用。

  我们建议您在 lambda 函数本身中实现 backoff-and-retry逻辑，以保护您的客户端免受间歇性连接问题的影响。但是，上述问题的解决方法也要求客户端实现重试逻辑，以处理由此特定连接问题导致的失败。

### Javascript 代码
<a name="lambda-functions-examples-javascript-code"></a>

```
const gremlin = require('gremlin');
const async = require('async');
const {getUrlAndHeaders} = require('gremlin-aws-sigv4/lib/utils');

const traversal = gremlin.process.AnonymousTraversalSource.traversal;
const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection;
const t = gremlin.process.t;
const __ = gremlin.process.statics;

let conn = null;
let g = null;

async function query(context) {

  const id = context.id;

  return g.V(id)
    .fold()
    .coalesce(
      __.unfold(), 
      __.addV('User').property(t.id, id)
    )
    .id().next();
}

async function doQuery() {
  const id = Math.floor(Math.random() * 10000).toString();

  let result = await query({id: id}); 
  return result['value'];
}

exports.handler = async (event, context) => {

  const getConnectionDetails = () => {
    if (process.env['USE_IAM'] == 'true'){
       return getUrlAndHeaders(
         process.env['NEPTUNE_ENDPOINT'],
         process.env['NEPTUNE_PORT'],
         {},
         '/gremlin',
         'wss'); 
    } else {
      const database_url = 'wss://' + process.env['NEPTUNE_ENDPOINT'] + ':' + process.env['NEPTUNE_PORT'] + '/gremlin';
      return { url: database_url, headers: {}};
    }    
  };


  const createRemoteConnection = () => {
    const { url, headers } = getConnectionDetails();

    const c = new DriverRemoteConnection(
      url, 
      {          
        headers: headers 
      });  

     c._client._connection.on('close', (code, message) => {
         console.info(`close - ${code} ${message}`);
         if (code == 1006){
           console.error('Connection closed prematurely');
           throw new Error('Connection closed prematurely');
         }
       });  

     return c;     
  };

  const createGraphTraversalSource = (conn) => {
    return traversal().withRemote(conn);
  };

  if (conn == null){
    console.info("Initializing connection")
    conn = createRemoteConnection();
    g = createGraphTraversalSource(conn);
  }

  return async.retry(
    { 
      times: 5,
      interval: 1000,
      errorFilter: function (err) { 

        // Add filters here to determine whether error can be retried
        console.warn('Determining whether retriable error: ' + err.message);

        // Check for connection issues
        if (err.message.startsWith('WebSocket is not open')){
          console.warn('Reopening connection');
          conn.close();
          conn = createRemoteConnection();
          g = createGraphTraversalSource(conn);
          return true;
        }

        // Check for ConcurrentModificationException
        if (err.message.includes('ConcurrentModificationException')){
          console.warn('Retrying query because of ConcurrentModificationException');
          return true;
        }

        // Check for ReadOnlyViolationException
        if (err.message.includes('ReadOnlyViolationException')){
          console.warn('Retrying query because of ReadOnlyViolationException');
          return true;
        }

        return false; 
      }

    }, 
    doQuery);
};
```

## Amazon Neptune 的 Python Lambda 函数示例
<a name="lambda-functions-examples-python"></a>

以下是关于以下 Python AWS Lambda 示例函数的一些注意事项：
+ 它使用[回退模块](https://pypi.org/project/backoff/)。
+ 它设置 `pool_size=1` 以防止创建不必要的连接池。

```
import os, sys, backoff, math
from random import randint
from gremlin_python import statics
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.driver.protocol import GremlinServerError
from gremlin_python.driver import serializer
from gremlin_python.process.anonymous_traversal import traversal
from gremlin_python.process.graph_traversal import __
from gremlin_python.process.strategies import *
from gremlin_python.process.traversal import T
from aiohttp.client_exceptions import ClientConnectorError
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import ReadOnlyCredentials
from types import SimpleNamespace

import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)


reconnectable_err_msgs = [
    'ReadOnlyViolationException',
    'Server disconnected',
    'Connection refused',
    'Connection was already closed',
    'Connection was closed by server',
    'Failed to connect to server: HTTP Error code 403 - Forbidden'
]

retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs

network_errors = [OSError, ClientConnectorError]

retriable_errors = [GremlinServerError, RuntimeError, Exception] + network_errors

def prepare_iamdb_request(database_url):

    service = 'neptune-db'
    method = 'GET'

    access_key = os.environ['AWS_ACCESS_KEY_ID']
    secret_key = os.environ['AWS_SECRET_ACCESS_KEY']
    region = os.environ['AWS_REGION']
    session_token = os.environ['AWS_SESSION_TOKEN']


    creds = SimpleNamespace(
        access_key=access_key, secret_key=secret_key, token=session_token, region=region,
    )

    request = AWSRequest(method=method, url=database_url, data=None)
    SigV4Auth(creds, service, region).add_auth(request)

    return database_url, request.headers.items()

def is_retriable_error(e):

    is_retriable = False
    err_msg = str(e)

    if isinstance(e, tuple(network_errors)):
        is_retriable = True
    else:
        is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs)

    logger.error('error: [{}] {}'.format(type(e), err_msg))
    logger.info('is_retriable: {}'.format(is_retriable))

    return is_retriable

def is_non_retriable_error(e):
    return not is_retriable_error(e)

def reset_connection_if_connection_issue(params):

    is_reconnectable = False

    e = sys.exc_info()[1]
    err_msg = str(e)

    if isinstance(e, tuple(network_errors)):
        is_reconnectable = True
    else:
        is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs)

    logger.info('is_reconnectable: {}'.format(is_reconnectable))

    if is_reconnectable:
        global conn
        global g
        conn.close()
        conn = create_remote_connection()
        g = create_graph_traversal_source(conn)

@backoff.on_exception(backoff.constant,
                      tuple(retriable_errors),
                      max_tries=5,
                      jitter=None,
                      giveup=is_non_retriable_error,
                      on_backoff=reset_connection_if_connection_issue,
                      interval=1)
def query(**kwargs):

    id = kwargs['id']

    return (g.V().hasLabel('column').has('column_name', 'amhstr_ag_type').in_('hascolumn').dedup().valueMap().limit(10).toList())

def doQuery(event):
    return query(id=str(randint(0, 10000)))

def lambda_handler(event, context):
    result = doQuery(event)
    logger.info('result – {}'.format(result))
    return result

def create_graph_traversal_source(conn):
    return traversal().withRemote(conn)

def create_remote_connection():
    logger.info('Creating remote connection')

    (database_url, headers) = connection_info()

    # Convert headers to a dictionary if it's not already
    headers_dict = dict(headers) if isinstance(headers, list) else headers

    print(headers)
    return DriverRemoteConnection(
        database_url,
        'g',
        pool_size=1,
        headers=headers_dict)


def connection_info():

    database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort'])

    if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true':
        return prepare_iamdb_request(database_url)
    else:
        return (database_url, {})

conn = create_remote_connection()
g = create_graph_traversal_source(conn)
```

以下是示例结果，显示了重负载和轻负载的交替时期：

![\[该图显示了 Python Lambda 函数示例的示例结果。\]](http://docs.aws.amazon.com/zh_cn/neptune/latest/userguide/images/python-lambda-results.png)
