

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# AWS Lambda Amazon Neptune의 함수 예제
<a name="lambda-functions-examples"></a>

Java, JavaScript 및 Python으로 작성된 다음 예제 AWS Lambda 함수는 `fold().coalesce().unfold()` 관용구를 사용하여 무작위로 생성된 ID로 단일 버텍스를 업서트하는 방법을 보여줍니다.

각 함수의 코드 대부분은 연결을 관리하고 오류가 발생할 경우 연결 및 쿼리를 다시 시도하는 보일러플레이트 코드입니다. 실제 애플리케이션 로직과 Gremlin 쿼리는 각각 `doQuery()` 및 `query()` 메서드에서 구현됩니다. 이 예제를 자체 Lambda 함수의 기반으로 사용하면 `doQuery()` 및 `query()` 수정에 집중할 수 있습니다.

함수는 실패한 쿼리를 5번 재시도하고 재시도 사이에 1초 동안 대기하도록 구성됩니다.

함수를 사용하려면 다음 Lambda 환경 변수에 값이 있어야 합니다.
+ **`NEPTUNE_ENDPOINT`**   –   Neptune DB 클러스터 엔드포인트입니다. Python의 경우 `neptuneEndpoint`여야 합니다.
+ **`NEPTUNE_PORT`**   –   Neptune 포트입니다. Python의 경우 `neptunePort`여야 합니다.
+ **`USE_IAM `**   –   (`true` 또는 `false`) 데이터베이스에 AWS Identity and Access Management (IAM) 데이터베이스 인증이 활성화된 경우 `USE_IAM` 환경 변수를 로 설정합니다`true`. 그러면 Lambda 함수가 Neptune에 대한 연결 요청에 Sigv4 서명을 하게 됩니다. 이러한 IAM DB 인증 요청의 경우 Lambda 함수의 실행 역할에 함수를 Neptune DB 클러스터에 연결할 수 있도록 허용하는 적절한 IAM 정책이 연결되어 있는지 확인하세요([IAM 정책 유형](security-iam-access-manage.md#iam-auth-policy) 참조).

## Amazon Neptune용 Java Lambda 함수 예제
<a name="lambda-functions-examples-java"></a>

다음은 Java AWS Lambda 함수에 대해 염두에 두어야 할 몇 가지 사항입니다.
+ Java 드라이버는 사용자에게 필요 없는 자체 연결 풀을 유지 관리하므로, `minConnectionPoolSize(1)` 및 `maxConnectionPoolSize(1)`를 사용하여 `Cluster` 객체를 구성하세요.
+ 이 `Cluster` 객체는 하나 이상의 직렬 변환기를 생성하기 때문에, 구축 속도가 느릴 수 있습니다(기본적으로 Gyro이고, `binary`과 같은 추가 출력 형식에 대해 구성한 경우 또 하나). 이를 인스턴스화하는 데 시간이 걸릴 수 있습니다.
+ 연결 풀은 첫 번째 요청으로 초기화됩니다. 이때 드라이버는 `Netty` 스택을 설정하고, 바이트 버퍼를 할당하고, IAM DB 인증을 사용하는 경우 서명 키를 생성합니다. 이 모든 것이 콜드 스타트 지연 시간을 가중시킬 수 있습니다.
+ Java 드라이버의 연결 풀은 서버 호스트의 가용성을 모니터링하고 연결에 실패할 경우 자동으로 재연결을 시도합니다. 연결을 다시 설정하기 위한 백그라운드 작업이 시작됩니다. `reconnectInterval( )`을 사용하여 재연결 시도 간격을 구성합니다. 드라이버가 재연결을 시도하는 동안 Lambda 함수는 간단히 쿼리를 재시도할 수 있습니다.

  재시도 간격이 재연결 시도 간격보다 짧으면 호스트를 사용할 수 없는 것으로 간주하여 실패한 연결에 대한 재시도가 다시 실패합니다. `ConcurrentModificationException`에 대한 재시도에는 적용되지 않습니다.
+ Java 11 대신 Java 8을 사용하세요. `Netty` 최적화는 Java 11에서 기본적으로 활성화되어 있지 않습니다.
+ 이 예제에서는 재시도에 [Retry4j](https://github.com/elennick/retry4j)를 사용합니다.
+ Java Lambda 함수에서 `Sigv4` 서명 드라이버를 사용하려면 [Gremlin Java와 함께 IAM을 사용하여 Amazon Neptune 데이터베이스에 연결](iam-auth-connecting-gremlin-java.md)에서 종속성 요구 사항을 참조하세요.

**주의**  
Retry4j에서 `CallExecutor`는 스레드 세이프가 아닐 수 있습니다. 각 스레드가 자체 `CallExecutor` 인스턴스를 사용하는 것을 고려해 보세요.

**참고**  
 다음 예제는 requestInterceptor()를 사용하도록 업데이트되었습니다. 이는 TinkerPop 3.6.6에 추가되었습니다. TinkerPop 버전 3.6.6 이전에는 코드 예제에서 해당 릴리스에서 더 이상 사용되지 않는 handshakeInterceptor()를 사용했습니다.

```
package com.amazonaws.examples.social;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.evanlennick.retry4j.CallExecutor;
import com.evanlennick.retry4j.CallExecutorBuilder;
import com.evanlennick.retry4j.Status;
import com.evanlennick.retry4j.config.RetryConfig;
import com.evanlennick.retry4j.config.RetryConfigBuilder;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.neptune.auth.NeptuneNettyHttpSigV4Signer;
import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.structure.T;

import java.io.*;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.function.Function;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold;

public class MyHandler implements RequestStreamHandler {

  private final GraphTraversalSource g;
  private final CallExecutor<Object> executor;
  private final Random idGenerator = new Random();

  public MyHandler() {

    this.g = AnonymousTraversalSource
            .traversal()
            .withRemote(DriverRemoteConnection.using(createCluster()));


    this.executor = new CallExecutorBuilder<Object>()
            .config(createRetryConfig())
            .build();

  }

  @Override
  public void handleRequest(InputStream input,
                            OutputStream output,
                            Context context) throws IOException {

    doQuery(input, output);
  }

  private void doQuery(InputStream input, OutputStream output) throws IOException {
    try {

      Map<String, Object> args = new HashMap<>();
      args.put("id", idGenerator.nextInt());

      String result = query(args);

      try (Writer writer = new BufferedWriter(new OutputStreamWriter(output, UTF_8))) {
          writer.write(result);
      }

    } finally {
        input.close();
        output.close();
    }
  }

  private String query(Map<String, Object> args) {
    int id = (int) args.get("id");

    @SuppressWarnings("unchecked")
    Callable<Object> query = () -> g.V(id)
      .fold()
      .coalesce(
        unfold(),
        addV("Person").property(T.id, id))
      .id().next();

    Status<Object> status = executor.execute(query);

    return status.getResult().toString();
  }

  private Cluster createCluster() {
    Cluster.Builder builder = Cluster.build()
                                     .addContactPoint(System.getenv("NEPTUNE_ENDPOINT"))
                                     .port(Integer.parseInt(System.getenv("NEPTUNE_PORT")))
                                     .enableSsl(true)
                                     .minConnectionPoolSize(1)
                                     .maxConnectionPoolSize(1)
                                     .serializer(Serializers.GRAPHBINARY_V1D0)
                                     .reconnectInterval(2000);

  if (Boolean.parseBoolean(getOptionalEnv("USE_IAM", "true"))) {
    // The following example uses requestInterceptor(), which was introduced
    // in TinkerPop 3.6.6. If you are using a TinkerPop version earlier than
    // 3.6.6 (but 3.5.5 or higher), use handshakeInterceptor() instead.
    builder.requestInterceptor( r ->
      {
        NeptuneNettyHttpSigV4Signer sigV4Signer = new NeptuneNettyHttpSigV4Signer(region, DefaultCredentialsProvider.create());
        sigV4Signer.signRequest(r);
        return r;
      }
    )
    
    return builder.create();
  }

  private RetryConfig createRetryConfig() {
    return new RetryConfigBuilder().retryOnCustomExceptionLogic(retryLogic())
                                   .withDelayBetweenTries(1000, ChronoUnit.MILLIS)
                                   .withMaxNumberOfTries(5)
                                   .withFixedBackoff()
                                   .build();
  }

  private Function<Exception, Boolean> retryLogic() {
    return e -> {
      StringWriter stringWriter = new StringWriter();
      e.printStackTrace(new PrintWriter(stringWriter));
      String message = stringWriter.toString();

      // Check for connection issues
      if ( message.contains("Timed out while waiting for an available host") ||
           message.contains("Timed-out waiting for connection on Host") ||
           message.contains("Connection to server is no longer active") ||
           message.contains("Connection reset by peer") ||
           message.contains("SSLEngine closed already") ||
           message.contains("Pool is shutdown") ||
           message.contains("ExtendedClosedChannelException") ||
           message.contains("Broken pipe")) {
        return true;
      }

      // Concurrent writes can sometimes trigger a ConcurrentModificationException.
      // In these circumstances you may want to backoff and retry.
      if (message.contains("ConcurrentModificationException")) {
          return true;
      }

      // If the primary fails over to a new instance, existing connections to the old primary will
      // throw a ReadOnlyViolationException. You may want to back and retry.
      if (message.contains("ReadOnlyViolationException")) {
          return true;
      }

      return false;
    };
  }

  private String getOptionalEnv(String name, String defaultValue) {
    String value = System.getenv(name);
    if (value != null && value.length() > 0) {
      return value;
    } else {
      return defaultValue;
    }
  }
}
```

함수에 재연결 로직을 포함하려면 [Java 재연결 샘플](access-graph-gremlin-java-reconnect-example.md)을 참조하세요.

## Amazon Neptune용 JavaScript Lambda 함수 예제
<a name="lambda-functions-examples-javascript"></a>

**예제 참고 사항**
+ JavaScript 드라이버는 연결 풀을 유지하지 않습니다. 항상 단일 연결을 엽니다.
+ 예제 함수는 [gremlin-aws-sigv4](https://github.com/shutterstock/gremlin-aws-sigv4)의 Sigv4 서명 유틸리티를 사용하여 IAM 인증 지원 데이터베이스에 대한 요청에 서명합니다.
+ 오픈 소스 [비동기 유틸리티 모듈](https://github.com/caolan/async)의 [retry( )](https://caolan.github.io/async/v3/docs.html#retry) 함수를 사용하여 backoff-and-retry 시도를 처리합니다.
+ Gremlin 터미널 단계는 JavaScript `promise`를 반환합니다([TinkerPop 설명서](https://tinkerpop.apache.org/docs/current/reference/#gremlin-javascript-connecting) 참조). `next()`의 경우 `{value, done}` 튜플입니다.
+ 연결 오류는 핸들러 내에서 발생하며, 여기에 설명된 권장 사항에 따라 일부 backoff-and-retry 로직을 사용하여 처리합니다. 단, 한 가지 예외가 있습니다. 드라이버가 예외로 취급하지 않는 연결 문제가 하나 있는데, 따라서 이 backoff-and-retry 로직으로는 해결할 수 없습니다.

  문제는 드라이버가 요청을 보낸 후 드라이버가 응답을 받기 전에 연결이 끊어지면 쿼리가 완료된 것처럼 보이지만, null 값을 반환한다는 것입니다. Lambda 함수 클라이언트의 경우 함수가 성공적으로 완료된 것으로 보이지만, 응답이 비어 있습니다.

  이 문제의 영향은 애플리케이션이 빈 응답을 처리하는 방식에 따라 달라집니다. 읽기 요청의 빈 응답을 오류로 처리하는 애플리케이션도 있지만, 빈 결과로 잘못 처리하는 애플리케이션도 있습니다.

  이 연결 문제가 발생한 쓰기 요청도 빈 응답을 반환합니다. 간접 호출에 성공하고 응답이 비어 있으면 성공 또는 실패 신호로 여길 수 있을까요? 쓰기 함수를 간접적으로 호출하는 클라이언트가 응답 본문을 검사하지 않고 함수 간접 호출 성공을 단순히 데이터베이스에 대한 쓰기가 커밋되었음을 의미한다고 간주하는 경우, 시스템에 데이터가 손실된 것처럼 보일 수 있습니다.

  이 문제는 드라이버가 기본 소켓에서 내보낸 이벤트를 처리하는 방식 때문에 발생합니다. 기본 네트워크 소켓이 `ECONNRESET` 오류로 닫히면 드라이버에서 사용하는 WebSocket이 닫히고 `'ws close'` 이벤트가 발생합니다. 하지만 드라이버에는 예외를 유발하는 데 사용할 수 있는 방식으로 해당 이벤트를 처리할 수 있는 기능이 없습니다. 따라서 쿼리는 그냥 사라집니다.

  이 문제를 해결하기 위해 여기에 있는 예제 Lambda 함수는 원격 연결을 생성할 때 드라이버에 예외를 발생시키는 `'ws close'` 이벤트 핸들러를 추가합니다. 그러나 이 예외는 Gremlin 쿼리의 요청-응답 경로를 따라 발생하지 않으므로, Lambda 함수 자체 내에서 backoff-and-retry 로직을 트리거하는 데 사용할 수 없습니다. 대신 `'ws close'` 이벤트 핸들러에서 발생한 예외로 인해 처리되지 않은 예외가 발생하여 Lambda 간접 호출이 실패합니다. 이렇게 되면 함수를 간접적으로 호출하는 클라이언트가 오류를 처리하고 필요한 경우 Lambda 간접 호출을 재시도할 수 있습니다.

  클라이언트를 간헐적인 연결 문제로부터 보호하려면 Lambda 함수 자체에 backoff-and-retry 로직을 구현하는 것이 좋습니다. 하지만 위 문제를 해결하려면 클라이언트가 이 특정 연결 문제로 인한 실패를 처리하기 위한 재시도 로직도 구현해야 합니다.

### Javascript 코드
<a name="lambda-functions-examples-javascript-code"></a>

```
const gremlin = require('gremlin');
const async = require('async');
const {getUrlAndHeaders} = require('gremlin-aws-sigv4/lib/utils');

const traversal = gremlin.process.AnonymousTraversalSource.traversal;
const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection;
const t = gremlin.process.t;
const __ = gremlin.process.statics;

let conn = null;
let g = null;

async function query(context) {

  const id = context.id;

  return g.V(id)
    .fold()
    .coalesce(
      __.unfold(), 
      __.addV('User').property(t.id, id)
    )
    .id().next();
}

async function doQuery() {
  const id = Math.floor(Math.random() * 10000).toString();

  let result = await query({id: id}); 
  return result['value'];
}

exports.handler = async (event, context) => {

  const getConnectionDetails = () => {
    if (process.env['USE_IAM'] == 'true'){
       return getUrlAndHeaders(
         process.env['NEPTUNE_ENDPOINT'],
         process.env['NEPTUNE_PORT'],
         {},
         '/gremlin',
         'wss'); 
    } else {
      const database_url = 'wss://' + process.env['NEPTUNE_ENDPOINT'] + ':' + process.env['NEPTUNE_PORT'] + '/gremlin';
      return { url: database_url, headers: {}};
    }    
  };


  const createRemoteConnection = () => {
    const { url, headers } = getConnectionDetails();

    const c = new DriverRemoteConnection(
      url, 
      {          
        headers: headers 
      });  

     c._client._connection.on('close', (code, message) => {
         console.info(`close - ${code} ${message}`);
         if (code == 1006){
           console.error('Connection closed prematurely');
           throw new Error('Connection closed prematurely');
         }
       });  

     return c;     
  };

  const createGraphTraversalSource = (conn) => {
    return traversal().withRemote(conn);
  };

  if (conn == null){
    console.info("Initializing connection")
    conn = createRemoteConnection();
    g = createGraphTraversalSource(conn);
  }

  return async.retry(
    { 
      times: 5,
      interval: 1000,
      errorFilter: function (err) { 

        // Add filters here to determine whether error can be retried
        console.warn('Determining whether retriable error: ' + err.message);

        // Check for connection issues
        if (err.message.startsWith('WebSocket is not open')){
          console.warn('Reopening connection');
          conn.close();
          conn = createRemoteConnection();
          g = createGraphTraversalSource(conn);
          return true;
        }

        // Check for ConcurrentModificationException
        if (err.message.includes('ConcurrentModificationException')){
          console.warn('Retrying query because of ConcurrentModificationException');
          return true;
        }

        // Check for ReadOnlyViolationException
        if (err.message.includes('ReadOnlyViolationException')){
          console.warn('Retrying query because of ReadOnlyViolationException');
          return true;
        }

        return false; 
      }

    }, 
    doQuery);
};
```

## Amazon Neptune용 Python Lambda 함수 예제
<a name="lambda-functions-examples-python"></a>

다음 Python AWS Lambda 예제 함수에 대해 알아두어야 할 몇 가지 사항은 아래와 같습니다.
+ [백오프 모듈](https://pypi.org/project/backoff/)을 사용합니다.
+ 불필요한 연결 풀을 만들지 않도록 `pool_size=1`을 설정합니다.

```
import os, sys, backoff, math
from random import randint
from gremlin_python import statics
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.driver.protocol import GremlinServerError
from gremlin_python.driver import serializer
from gremlin_python.process.anonymous_traversal import traversal
from gremlin_python.process.graph_traversal import __
from gremlin_python.process.strategies import *
from gremlin_python.process.traversal import T
from aiohttp.client_exceptions import ClientConnectorError
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import ReadOnlyCredentials
from types import SimpleNamespace

import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)


reconnectable_err_msgs = [
    'ReadOnlyViolationException',
    'Server disconnected',
    'Connection refused',
    'Connection was already closed',
    'Connection was closed by server',
    'Failed to connect to server: HTTP Error code 403 - Forbidden'
]

retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs

network_errors = [OSError, ClientConnectorError]

retriable_errors = [GremlinServerError, RuntimeError, Exception] + network_errors

def prepare_iamdb_request(database_url):

    service = 'neptune-db'
    method = 'GET'

    access_key = os.environ['AWS_ACCESS_KEY_ID']
    secret_key = os.environ['AWS_SECRET_ACCESS_KEY']
    region = os.environ['AWS_REGION']
    session_token = os.environ['AWS_SESSION_TOKEN']


    creds = SimpleNamespace(
        access_key=access_key, secret_key=secret_key, token=session_token, region=region,
    )

    request = AWSRequest(method=method, url=database_url, data=None)
    SigV4Auth(creds, service, region).add_auth(request)

    return database_url, request.headers.items()

def is_retriable_error(e):

    is_retriable = False
    err_msg = str(e)

    if isinstance(e, tuple(network_errors)):
        is_retriable = True
    else:
        is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs)

    logger.error('error: [{}] {}'.format(type(e), err_msg))
    logger.info('is_retriable: {}'.format(is_retriable))

    return is_retriable

def is_non_retriable_error(e):
    return not is_retriable_error(e)

def reset_connection_if_connection_issue(params):

    is_reconnectable = False

    e = sys.exc_info()[1]
    err_msg = str(e)

    if isinstance(e, tuple(network_errors)):
        is_reconnectable = True
    else:
        is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs)

    logger.info('is_reconnectable: {}'.format(is_reconnectable))

    if is_reconnectable:
        global conn
        global g
        conn.close()
        conn = create_remote_connection()
        g = create_graph_traversal_source(conn)

@backoff.on_exception(backoff.constant,
                      tuple(retriable_errors),
                      max_tries=5,
                      jitter=None,
                      giveup=is_non_retriable_error,
                      on_backoff=reset_connection_if_connection_issue,
                      interval=1)
def query(**kwargs):

    id = kwargs['id']

    return (g.V().hasLabel('column').has('column_name', 'amhstr_ag_type').in_('hascolumn').dedup().valueMap().limit(10).toList())

def doQuery(event):
    return query(id=str(randint(0, 10000)))

def lambda_handler(event, context):
    result = doQuery(event)
    logger.info('result – {}'.format(result))
    return result

def create_graph_traversal_source(conn):
    return traversal().withRemote(conn)

def create_remote_connection():
    logger.info('Creating remote connection')

    (database_url, headers) = connection_info()

    # Convert headers to a dictionary if it's not already
    headers_dict = dict(headers) if isinstance(headers, list) else headers

    print(headers)
    return DriverRemoteConnection(
        database_url,
        'g',
        pool_size=1,
        headers=headers_dict)


def connection_info():

    database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort'])

    if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true':
        return prepare_iamdb_request(database_url)
    else:
        return (database_url, {})

conn = create_remote_connection()
g = create_graph_traversal_source(conn)
```

다음은 무거운 부하와 가벼운 부하가 번갈아 나타나는 기간을 보여주는 샘플 결과입니다.

![\[예제 Python Lambda 함수의 샘플 결과를 보여주는 다이어그램.\]](http://docs.aws.amazon.com/ko_kr/neptune/latest/userguide/images/python-lambda-results.png)
