

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# AWS Lambda Amazon Neptune 的 函數範例
<a name="lambda-functions-examples"></a>

下列以 Java、JavaScript 和 Python `fold().coalesce().unfold()` 撰寫的範例 AWS Lambda 函數，說明使用 idiom 以隨機產生的 ID 支撐單一頂點。

每個函數中的大部分程式碼都是樣板程式碼，負責管理連線並在發生錯誤時重試連線和查詢。真正的應用程式邏輯和 Gemlin 查詢是在 `doQuery()` 和 `query()` 方法中分別實作。如果您使用這些範例做為自己的 Lambda 函數的基礎，便可專注於修改 `doQuery()` 和 `query()`。

這些函數會設定為重試失敗的查詢 5 次，在重試之間等候 1 秒鐘。

這些函數需要一些值存在於下列 Lambda 環境變數中：
+ **`NEPTUNE_ENDPOINT`** – 您的 Neptune 資料庫叢集端點。對於 Python，這應該是 `neptuneEndpoint`。
+ **`NEPTUNE_PORT`** – Neptune 連接埠。對於 Python，這應該是 `neptunePort`。
+ **`USE_IAM `**   – (`true` 或 `false`) 如果您的資料庫已啟用 AWS Identity and Access Management (IAM) 資料庫身分驗證，請將`USE_IAM`環境變數設定為 `true`。這會導致 Lambda 函數向 Neptune 提出 Sigv4-sign 連線請求。對於此類 IAM 資料庫身分驗證請求，請確保 Lambda 函數的執行角色已附加適當的 IAM 政策，其允許函數連線到 Neptune 資料庫叢集 (請參閱 [IAM 政策的類型](security-iam-access-manage.md#iam-auth-policy))。

## Amazon Neptune 的 Java Lambda 函數範例
<a name="lambda-functions-examples-java"></a>

以下是有關 Java AWS Lambda 函數的一些注意事項：
+ Java 驅動程式會維護自己的連線集區，您不需要這些連線集區，因此請使用 `minConnectionPoolSize(1)` 和 `maxConnectionPoolSize(1)` 設定您的 `Cluster` 物件。
+ `Cluster` 物件的建置速度可能很慢，因為它會建立一個或多個序列化程序 (預設為 Gyro，再加上另一個序列化程序，如果您已針對其他輸出格式設定它的話，例如 `binary`)。這些需要一些時間才能執行個體化。
+ 連接集區會連同第一個請求進行初始化。此時，驅動程式會設定 `Netty` 堆疊、配置位元組緩衝區，以及建立簽署金鑰 (如果您使用 IAM 資料庫身分驗證的話)。所有這些都會增加冷啟動延遲。
+ Java 驅動程式的連線集區會監控伺服器主機的可用性，並在連線失敗時自動嘗試重新連線。它會啟動背景任務以嘗試重新建立連線。使用 `reconnectInterval( )` 設定重新連線嘗試之間的間隔。當驅動程式嘗試重新連線時，您的 Lambda 函數只需重試查詢即可。

  如果重試之間的間隔小於重新連線嘗試之間的間隔，則重試失敗的連線會再次失敗，因為主機被視為無法使用。這不適用於 `ConcurrentModificationException` 的重試。
+ 使用 Java 8 而不是 Java 11。預設不會在 Java 11 中啟用 `Netty` 最佳化。
+ 此範例使用 [Retry4j](https://github.com/elennick/retry4j) 進行重試。
+ 若要在 Java Lambda 函數中使用 `Sigv4` 簽署驅動程式，請參閱 [使用 IAM 搭配 Gremlin Java 連線至 Amazon Neptune 資料庫](iam-auth-connecting-gremlin-java.md) 中的相依性需求。

**警告**  
來自 Retry4j 的 `CallExecutor` 可能不是執行緒安全的程式碼。考慮讓每個執行緒使用自己的 `CallExecutor` 執行個體。

**注意**  
 下列範例已更新為包含 requestInterceptor() 的使用。這已在 TinkerPop 3.6.6 中新增。在 TinkerPop 3.6.6 版之前，程式碼範例使用 handshakeInterceptor()，該版本已棄用。

```
package com.amazonaws.examples.social;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.evanlennick.retry4j.CallExecutor;
import com.evanlennick.retry4j.CallExecutorBuilder;
import com.evanlennick.retry4j.Status;
import com.evanlennick.retry4j.config.RetryConfig;
import com.evanlennick.retry4j.config.RetryConfigBuilder;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.neptune.auth.NeptuneNettyHttpSigV4Signer;
import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.structure.T;

import java.io.*;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.function.Function;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold;

public class MyHandler implements RequestStreamHandler {

  private final GraphTraversalSource g;
  private final CallExecutor<Object> executor;
  private final Random idGenerator = new Random();

  public MyHandler() {

    this.g = AnonymousTraversalSource
            .traversal()
            .withRemote(DriverRemoteConnection.using(createCluster()));


    this.executor = new CallExecutorBuilder<Object>()
            .config(createRetryConfig())
            .build();

  }

  @Override
  public void handleRequest(InputStream input,
                            OutputStream output,
                            Context context) throws IOException {

    doQuery(input, output);
  }

  private void doQuery(InputStream input, OutputStream output) throws IOException {
    try {

      Map<String, Object> args = new HashMap<>();
      args.put("id", idGenerator.nextInt());

      String result = query(args);

      try (Writer writer = new BufferedWriter(new OutputStreamWriter(output, UTF_8))) {
          writer.write(result);
      }

    } finally {
        input.close();
        output.close();
    }
  }

  private String query(Map<String, Object> args) {
    int id = (int) args.get("id");

    @SuppressWarnings("unchecked")
    Callable<Object> query = () -> g.V(id)
      .fold()
      .coalesce(
        unfold(),
        addV("Person").property(T.id, id))
      .id().next();

    Status<Object> status = executor.execute(query);

    return status.getResult().toString();
  }

  private Cluster createCluster() {
    Cluster.Builder builder = Cluster.build()
                                     .addContactPoint(System.getenv("NEPTUNE_ENDPOINT"))
                                     .port(Integer.parseInt(System.getenv("NEPTUNE_PORT")))
                                     .enableSsl(true)
                                     .minConnectionPoolSize(1)
                                     .maxConnectionPoolSize(1)
                                     .serializer(Serializers.GRAPHBINARY_V1D0)
                                     .reconnectInterval(2000);

  if (Boolean.parseBoolean(getOptionalEnv("USE_IAM", "true"))) {
    // The following example uses requestInterceptor(), which was introduced
    // in TinkerPop 3.6.6. If you are using a TinkerPop version earlier than
    // 3.6.6 (but 3.5.5 or higher), use handshakeInterceptor() instead.
    builder.requestInterceptor( r ->
      {
        NeptuneNettyHttpSigV4Signer sigV4Signer = new NeptuneNettyHttpSigV4Signer(region, DefaultCredentialsProvider.create());
        sigV4Signer.signRequest(r);
        return r;
      }
    )
    
    return builder.create();
  }

  private RetryConfig createRetryConfig() {
    return new RetryConfigBuilder().retryOnCustomExceptionLogic(retryLogic())
                                   .withDelayBetweenTries(1000, ChronoUnit.MILLIS)
                                   .withMaxNumberOfTries(5)
                                   .withFixedBackoff()
                                   .build();
  }

  private Function<Exception, Boolean> retryLogic() {
    return e -> {
      StringWriter stringWriter = new StringWriter();
      e.printStackTrace(new PrintWriter(stringWriter));
      String message = stringWriter.toString();

      // Check for connection issues
      if ( message.contains("Timed out while waiting for an available host") ||
           message.contains("Timed-out waiting for connection on Host") ||
           message.contains("Connection to server is no longer active") ||
           message.contains("Connection reset by peer") ||
           message.contains("SSLEngine closed already") ||
           message.contains("Pool is shutdown") ||
           message.contains("ExtendedClosedChannelException") ||
           message.contains("Broken pipe")) {
        return true;
      }

      // Concurrent writes can sometimes trigger a ConcurrentModificationException.
      // In these circumstances you may want to backoff and retry.
      if (message.contains("ConcurrentModificationException")) {
          return true;
      }

      // If the primary fails over to a new instance, existing connections to the old primary will
      // throw a ReadOnlyViolationException. You may want to back and retry.
      if (message.contains("ReadOnlyViolationException")) {
          return true;
      }

      return false;
    };
  }

  private String getOptionalEnv(String name, String defaultValue) {
    String value = System.getenv(name);
    if (value != null && value.length() > 0) {
      return value;
    } else {
      return defaultValue;
    }
  }
}
```

如果要在您的函數中包含重新連線邏輯，請參閱 [Java 重新連線範例](access-graph-gremlin-java-reconnect-example.md)。

## Amazon Neptune 的 JavaScript Lambda 函數範例
<a name="lambda-functions-examples-javascript"></a>

**關於此範例的注意事項**
+ JavaScript 驅動程式不會維護連線集區。它一律開啟單一連線。
+ 範例函數會使用來自 [gremlin-aws-sigv4](https://github.com/shutterstock/gremlin-aws-sigv4) 的 Sigv4 簽署公用程式，將請求簽署至啟用 IAM 身分驗證的資料庫。
+ 它會使用來自開放原始碼[非同步公用程式模組](https://github.com/caolan/async)中的 [retry( )](https://caolan.github.io/async/v3/docs.html#retry) 函數來處理「退避和重試」嘗試。
+ Gremlin 終端步驟會傳回一個 JavaScript `promise` (請參閱 [TinkerPop 文件](https://tinkerpop.apache.org/docs/current/reference/#gremlin-javascript-connecting))。對於 `next()`，這是 `{value, done}` 元組。
+ 連線錯誤是在處理常式內引發的，並根據這裡列出的建議使用一些退避和重試邏輯進行處理，但有一個例外狀況。有一種連線問題是驅動程式不會被視為例外狀況，因此無法透過這種退避和重試邏輯來解決。

  問題是，如果在驅動程式傳送請求之後，但在驅動程式收到回應之前關閉連線，則查詢似乎已完成，但傳回 null 值。就 lambda 函數用戶端而言，該函數似乎成功完成，但回應是空的。

  此問題的影響取決於您的應用程式如何處理空白回應。某些應用程式可能會將來自讀取請求的空白回應視為錯誤，但其他應用程式可能會將其視為空白結果。

  遇到此連線問題的寫入請求也會傳回空白回應。空白回應的成功調用表示成功還是失敗？ 如果調用 write 函數的用戶端只是將函數的成功調用視為表示已遞交對資料庫的寫入，而不是檢查回應的本文，則系統可能似乎遺失了資料。

  此問題起因於驅動程式如何處理基礎通訊端發出的事件。當基礎網路通訊端由於 `ECONNRESET` 錯誤而關閉時，驅動程式使用的 WebSocket 會關閉並發出 `'ws close'` 事件。不過，驅動程式中沒有任何東西可以透過用來引發例外狀況的方式處理該事件。因此，查詢就會消失。

  若要解決此問題，這裡的範例 Lambda 函數會新增 `'ws close'` 事件處理常式，在建立遠端連線時將例外狀況擲回驅動程式。不過，此例外狀況並非沿著 Gemlin 查詢的要求-回應路徑引發，因此無法用來觸發 lambda 函數本身內的任何退避和重試邏輯。`'ws close'` 事件處理常式所擲回的例外狀況會產生未處理的例外狀況，導致 lambda 調用失敗。這允許調用函數的用戶端處理錯誤，並在適當的情況下重試 lambda 調用。

  我們建議您在 lambda 函數本身中實作退避和重試邏輯，以保護您的用戶端免於間歇性連線問題。不過，上述問題的因應措施也需要用戶端實作重試邏輯，以處理起因於這個特定連線問題的失敗。

### Javascript 程式碼
<a name="lambda-functions-examples-javascript-code"></a>

```
const gremlin = require('gremlin');
const async = require('async');
const {getUrlAndHeaders} = require('gremlin-aws-sigv4/lib/utils');

const traversal = gremlin.process.AnonymousTraversalSource.traversal;
const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection;
const t = gremlin.process.t;
const __ = gremlin.process.statics;

let conn = null;
let g = null;

async function query(context) {

  const id = context.id;

  return g.V(id)
    .fold()
    .coalesce(
      __.unfold(), 
      __.addV('User').property(t.id, id)
    )
    .id().next();
}

async function doQuery() {
  const id = Math.floor(Math.random() * 10000).toString();

  let result = await query({id: id}); 
  return result['value'];
}

exports.handler = async (event, context) => {

  const getConnectionDetails = () => {
    if (process.env['USE_IAM'] == 'true'){
       return getUrlAndHeaders(
         process.env['NEPTUNE_ENDPOINT'],
         process.env['NEPTUNE_PORT'],
         {},
         '/gremlin',
         'wss'); 
    } else {
      const database_url = 'wss://' + process.env['NEPTUNE_ENDPOINT'] + ':' + process.env['NEPTUNE_PORT'] + '/gremlin';
      return { url: database_url, headers: {}};
    }    
  };


  const createRemoteConnection = () => {
    const { url, headers } = getConnectionDetails();

    const c = new DriverRemoteConnection(
      url, 
      {          
        headers: headers 
      });  

     c._client._connection.on('close', (code, message) => {
         console.info(`close - ${code} ${message}`);
         if (code == 1006){
           console.error('Connection closed prematurely');
           throw new Error('Connection closed prematurely');
         }
       });  

     return c;     
  };

  const createGraphTraversalSource = (conn) => {
    return traversal().withRemote(conn);
  };

  if (conn == null){
    console.info("Initializing connection")
    conn = createRemoteConnection();
    g = createGraphTraversalSource(conn);
  }

  return async.retry(
    { 
      times: 5,
      interval: 1000,
      errorFilter: function (err) { 

        // Add filters here to determine whether error can be retried
        console.warn('Determining whether retriable error: ' + err.message);

        // Check for connection issues
        if (err.message.startsWith('WebSocket is not open')){
          console.warn('Reopening connection');
          conn.close();
          conn = createRemoteConnection();
          g = createGraphTraversalSource(conn);
          return true;
        }

        // Check for ConcurrentModificationException
        if (err.message.includes('ConcurrentModificationException')){
          console.warn('Retrying query because of ConcurrentModificationException');
          return true;
        }

        // Check for ReadOnlyViolationException
        if (err.message.includes('ReadOnlyViolationException')){
          console.warn('Retrying query because of ReadOnlyViolationException');
          return true;
        }

        return false; 
      }

    }, 
    doQuery);
};
```

## Amazon Neptune 的 Python Lambda 函數範例
<a name="lambda-functions-examples-python"></a>

以下是有關下列 Python AWS Lambda 範例函數的一些注意事項：
+ 它會使用[退避模組](https://pypi.org/project/backoff/)。
+ 它會設定 `pool_size=1` 以防止建立不必要的連線集區。

```
import os, sys, backoff, math
from random import randint
from gremlin_python import statics
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.driver.protocol import GremlinServerError
from gremlin_python.driver import serializer
from gremlin_python.process.anonymous_traversal import traversal
from gremlin_python.process.graph_traversal import __
from gremlin_python.process.strategies import *
from gremlin_python.process.traversal import T
from aiohttp.client_exceptions import ClientConnectorError
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import ReadOnlyCredentials
from types import SimpleNamespace

import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)


reconnectable_err_msgs = [
    'ReadOnlyViolationException',
    'Server disconnected',
    'Connection refused',
    'Connection was already closed',
    'Connection was closed by server',
    'Failed to connect to server: HTTP Error code 403 - Forbidden'
]

retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs

network_errors = [OSError, ClientConnectorError]

retriable_errors = [GremlinServerError, RuntimeError, Exception] + network_errors

def prepare_iamdb_request(database_url):

    service = 'neptune-db'
    method = 'GET'

    access_key = os.environ['AWS_ACCESS_KEY_ID']
    secret_key = os.environ['AWS_SECRET_ACCESS_KEY']
    region = os.environ['AWS_REGION']
    session_token = os.environ['AWS_SESSION_TOKEN']


    creds = SimpleNamespace(
        access_key=access_key, secret_key=secret_key, token=session_token, region=region,
    )

    request = AWSRequest(method=method, url=database_url, data=None)
    SigV4Auth(creds, service, region).add_auth(request)

    return database_url, request.headers.items()

def is_retriable_error(e):

    is_retriable = False
    err_msg = str(e)

    if isinstance(e, tuple(network_errors)):
        is_retriable = True
    else:
        is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs)

    logger.error('error: [{}] {}'.format(type(e), err_msg))
    logger.info('is_retriable: {}'.format(is_retriable))

    return is_retriable

def is_non_retriable_error(e):
    return not is_retriable_error(e)

def reset_connection_if_connection_issue(params):

    is_reconnectable = False

    e = sys.exc_info()[1]
    err_msg = str(e)

    if isinstance(e, tuple(network_errors)):
        is_reconnectable = True
    else:
        is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs)

    logger.info('is_reconnectable: {}'.format(is_reconnectable))

    if is_reconnectable:
        global conn
        global g
        conn.close()
        conn = create_remote_connection()
        g = create_graph_traversal_source(conn)

@backoff.on_exception(backoff.constant,
                      tuple(retriable_errors),
                      max_tries=5,
                      jitter=None,
                      giveup=is_non_retriable_error,
                      on_backoff=reset_connection_if_connection_issue,
                      interval=1)
def query(**kwargs):

    id = kwargs['id']

    return (g.V().hasLabel('column').has('column_name', 'amhstr_ag_type').in_('hascolumn').dedup().valueMap().limit(10).toList())

def doQuery(event):
    return query(id=str(randint(0, 10000)))

def lambda_handler(event, context):
    result = doQuery(event)
    logger.info('result – {}'.format(result))
    return result

def create_graph_traversal_source(conn):
    return traversal().withRemote(conn)

def create_remote_connection():
    logger.info('Creating remote connection')

    (database_url, headers) = connection_info()

    # Convert headers to a dictionary if it's not already
    headers_dict = dict(headers) if isinstance(headers, list) else headers

    print(headers)
    return DriverRemoteConnection(
        database_url,
        'g',
        pool_size=1,
        headers=headers_dict)


def connection_info():

    database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort'])

    if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true':
        return prepare_iamdb_request(database_url)
    else:
        return (database_url, {})

conn = create_remote_connection()
g = create_graph_traversal_source(conn)
```

以下是範例結果，顯示重負載和輕負載的交替週期：

![\[顯示來自範例 Python Lambda 函數之範例結果的圖表。\]](http://docs.aws.amazon.com/zh_tw/neptune/latest/userguide/images/python-lambda-results.png)
