기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
AWS Lambda Amazon Neptune의 함수 예제
Java, JavaScript 및 Python으로 작성된 다음 예제 AWS Lambda 함수는 fold().coalesce().unfold()
idiom을 사용하여 무작위로 생성된 ID로 단일 버텍스를 업서스팅하는 방법을 보여줍니다.
각 함수의 코드 대부분은 연결을 관리하고 오류가 발생할 경우 연결 및 쿼리를 다시 시도하는 보일러플레이트 코드입니다. 실제 애플리케이션 로직과 Gremlin 쿼리는 각각 doQuery()
및 query()
메서드에서 구현됩니다. 이 예제를 자체 Lambda 함수의 기반으로 사용하면 doQuery()
및 query()
수정에 집중할 수 있습니다.
함수는 실패한 쿼리를 5번 재시도하고 재시도 사이에 1초 동안 대기하도록 구성됩니다.
함수를 사용하려면 다음 Lambda 환경 변수에 값이 있어야 합니다.
NEPTUNE_ENDPOINT
– Neptune DB 클러스터 엔드포인트입니다. Python의 경우neptuneEndpoint
여야 합니다.NEPTUNE_PORT
– Neptune 포트입니다. Python의 경우neptunePort
여야 합니다.-
USE_IAM
– (true
또는false
) 데이터베이스에 AWS Identity and Access Management (IAM) 데이터베이스 인증이 활성화된 경우USE_IAM
환경 변수를 로 설정합니다true
. 그러면 Lambda 함수가 Neptune에 대한 연결 요청에 Sigv4 서명을 하게 됩니다. 이러한 IAM DB 인증 요청의 경우 Lambda 함수의 실행 역할에 함수를 Neptune DB 클러스터에 연결할 수 있도록 허용하는 적절한 IAM 정책이 연결되어 있는지 확인하세요(IAM 정책 유형 참조).
Amazon Neptune용 Java Lambda 함수 예제
다음은 Java AWS Lambda 함수에 유의해야 할 몇 가지 사항입니다.
Java 드라이버는 사용자에게 필요 없는 자체 연결 풀을 유지 관리하므로,
minConnectionPoolSize(1)
및maxConnectionPoolSize(1)
를 사용하여Cluster
객체를 구성하세요.이
Cluster
객체는 하나 이상의 직렬 변환기를 생성하기 때문에, 구축 속도가 느릴 수 있습니다(기본적으로 Gyro이고,binary
과 같은 추가 출력 형식에 대해 구성한 경우 또 하나). 이를 인스턴스화하는 데 시간이 걸릴 수 있습니다.연결 풀은 첫 번째 요청으로 초기화됩니다. 이때 드라이버는
Netty
스택을 설정하고, 바이트 버퍼를 할당하고, IAM DB 인증을 사용하는 경우 서명 키를 생성합니다. 이 모든 것이 콜드 스타트 지연 시간을 가중시킬 수 있습니다.-
Java 드라이버의 연결 풀은 서버 호스트의 가용성을 모니터링하고 연결에 실패할 경우 자동으로 재연결을 시도합니다. 연결을 다시 설정하기 위한 백그라운드 작업이 시작됩니다.
reconnectInterval( )
을 사용하여 재연결 시도 간격을 구성합니다. 드라이버가 재연결을 시도하는 동안 Lambda 함수는 간단히 쿼리를 재시도할 수 있습니다.재시도 간격이 재연결 시도 간격보다 짧으면 호스트를 사용할 수 없는 것으로 간주하여 실패한 연결에 대한 재시도가 다시 실패합니다.
ConcurrentModificationException
에 대한 재시도에는 적용되지 않습니다. Java 11 대신 Java 8을 사용하세요.
Netty
최적화는 Java 11에서 기본적으로 활성화되어 있지 않습니다.이 예제에서는 재시도에 Retry4j
를 사용합니다. Java Lambda 함수에서
Sigv4
서명 드라이버를 사용하려면 Gremlin Java와 함께 IAM을 사용하여 Amazon Neptune 데이터베이스에 연결에서 종속성 요구 사항을 참조하세요.
주의
Retry4j에서 CallExecutor
는 스레드 세이프가 아닐 수 있습니다. 각 스레드가 자체 CallExecutor
인스턴스를 사용하는 것을 고려해 보세요.
참고
다음 예제는 requestInterceptor() 사용을 포함하도록 업데이트되었습니다. 이는 TinkerPop 3.6.6에 추가되었습니다. TinkerPop 버전 3.6.6 이전에는 코드 예제에서 해당 릴리스에서 더 이상 사용되지 않는 handshakeInterceptor()를 사용했습니다.
package com.amazonaws.examples.social; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.evanlennick.retry4j.CallExecutor; import com.evanlennick.retry4j.CallExecutorBuilder; import com.evanlennick.retry4j.Status; import com.evanlennick.retry4j.config.RetryConfig; import com.evanlennick.retry4j.config.RetryConfigBuilder; import org.apache.tinkerpop.gremlin.driver.Cluster; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.neptune.auth.NeptuneNettyHttpSigV4Signer; import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; import org.apache.tinkerpop.gremlin.driver.ser.Serializers; import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.structure.T; import java.io.*; import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.function.Function; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; public class MyHandler implements RequestStreamHandler { private final GraphTraversalSource g; private final CallExecutor<Object> executor; private final Random idGenerator = new Random(); public MyHandler() { this.g = AnonymousTraversalSource .traversal() .withRemote(DriverRemoteConnection.using(createCluster())); this.executor = new CallExecutorBuilder<Object>() .config(createRetryConfig()) .build(); } @Override public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { doQuery(input, output); } private void doQuery(InputStream input, OutputStream output) throws IOException { try { Map<String, Object> args = new HashMap<>(); args.put("id", idGenerator.nextInt()); String result = query(args); try (Writer writer = new BufferedWriter(new OutputStreamWriter(output, UTF_8))) { writer.write(result); } } finally { input.close(); output.close(); } } private String query(Map<String, Object> args) { int id = (int) args.get("id"); @SuppressWarnings("unchecked") Callable<Object> query = () -> g.V(id) .fold() .coalesce( unfold(), addV("Person").property(T.id, id)) .id().next(); Status<Object> status = executor.execute(query); return status.getResult().toString(); } private Cluster createCluster() { Cluster.Builder builder = Cluster.build() .addContactPoint(System.getenv("NEPTUNE_ENDPOINT")) .port(Integer.parseInt(System.getenv("NEPTUNE_PORT"))) .enableSsl(true) .minConnectionPoolSize(1) .maxConnectionPoolSize(1) .serializer(Serializers.GRAPHBINARY_V1D0) .reconnectInterval(2000); if (Boolean.parseBoolean(getOptionalEnv("USE_IAM", "true"))) { // For versions of TinkerPop 3.4.11 or higher: builder.requestInterceptor( r -> { NeptuneNettyHttpSigV4Signer sigV4Signer = new NeptuneNettyHttpSigV4Signer(region, new DefaultAWSCredentialsProviderChain()); sigV4Signer.signRequest(r); return r; } ) // Versions of TinkerPop prior to 3.4.11 should use the following approach. // Be sure to adjust the imports to include: // import org.apache.tinkerpop.gremlin.driver.SigV4WebSocketChannelizer; // builder = builder.channelizer(SigV4WebSocketChannelizer.class); return builder.create(); } private RetryConfig createRetryConfig() { return new RetryConfigBuilder().retryOnCustomExceptionLogic(retryLogic()) .withDelayBetweenTries(1000, ChronoUnit.MILLIS) .withMaxNumberOfTries(5) .withFixedBackoff() .build(); } private Function<Exception, Boolean> retryLogic() { return e -> { StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String message = stringWriter.toString(); // Check for connection issues if ( message.contains("Timed out while waiting for an available host") || message.contains("Timed-out waiting for connection on Host") || message.contains("Connection to server is no longer active") || message.contains("Connection reset by peer") || message.contains("SSLEngine closed already") || message.contains("Pool is shutdown") || message.contains("ExtendedClosedChannelException") || message.contains("Broken pipe")) { return true; } // Concurrent writes can sometimes trigger a ConcurrentModificationException. // In these circumstances you may want to backoff and retry. if (message.contains("ConcurrentModificationException")) { return true; } // If the primary fails over to a new instance, existing connections to the old primary will // throw a ReadOnlyViolationException. You may want to back and retry. if (message.contains("ReadOnlyViolationException")) { return true; } return false; }; } private String getOptionalEnv(String name, String defaultValue) { String value = System.getenv(name); if (value != null && value.length() > 0) { return value; } else { return defaultValue; } } }
함수에 재연결 로직을 포함하려면 Java 재연결 샘플을 참조하세요.
Amazon Neptune용 JavaScript Lambda 함수 예제
예제 참고 사항
JavaScript 드라이버는 연결 풀을 유지하지 않습니다. 항상 단일 연결을 엽니다.
예제 함수는 gremlin-aws-sigv4
의 Sigv4 서명 유틸리티를 사용하여 IAM 인증 지원 데이터베이스에 대한 요청에 서명합니다. 오픈 소스 비동기 유틸리티 모듈
의 retry( ) 함수를 사용하여 backoff-and-retry 시도를 처리합니다. Gremlin 터미널 단계는 JavaScript
promise
를 반환합니다(TinkerPop 설명서참조). next()
의 경우{value, done}
튜플입니다.-
연결 오류는 핸들러 내에서 발생하며, 여기에 설명된 권장 사항에 따라 일부 backoff-and-retry 로직을 사용하여 처리합니다. 단, 한 가지 예외가 있습니다. 드라이버가 예외로 취급하지 않는 연결 문제가 하나 있는데, 따라서 이 backoff-and-retry 로직으로는 해결할 수 없습니다.
문제는 드라이버가 요청을 보낸 후 드라이버가 응답을 받기 전에 연결이 끊어지면 쿼리가 완료된 것처럼 보이지만, null 값을 반환한다는 것입니다. Lambda 함수 클라이언트의 경우 함수가 성공적으로 완료된 것으로 보이지만, 응답이 비어 있습니다.
이 문제의 영향은 애플리케이션이 빈 응답을 처리하는 방식에 따라 달라집니다. 읽기 요청의 빈 응답을 오류로 처리하는 애플리케이션도 있지만, 빈 결과로 잘못 처리하는 애플리케이션도 있습니다.
이 연결 문제가 발생한 쓰기 요청도 빈 응답을 반환합니다. 간접 호출에 성공하고 응답이 비어 있으면 성공 또는 실패 신호로 여길 수 있을까요? 쓰기 함수를 간접적으로 호출하는 클라이언트가 응답 본문을 검사하지 않고 함수 간접 호출 성공을 단순히 데이터베이스에 대한 쓰기가 커밋되었음을 의미한다고 간주하는 경우, 시스템에 데이터가 손실된 것처럼 보일 수 있습니다.
이 문제는 드라이버가 기본 소켓에서 내보낸 이벤트를 처리하는 방식 때문에 발생합니다. 기본 네트워크 소켓이
ECONNRESET
오류로 닫히면 드라이버에서 사용하는 WebSocket이 닫히고'ws close'
이벤트가 발생합니다. 하지만 드라이버에는 예외를 유발하는 데 사용할 수 있는 방식으로 해당 이벤트를 처리할 수 있는 기능이 없습니다. 따라서 쿼리는 그냥 사라집니다.이 문제를 해결하기 위해 여기에 있는 예제 Lambda 함수는 원격 연결을 생성할 때 드라이버에 예외를 발생시키는
'ws close'
이벤트 핸들러를 추가합니다. 그러나 이 예외는 Gremlin 쿼리의 요청-응답 경로를 따라 발생하지 않으므로, Lambda 함수 자체 내에서 backoff-and-retry 로직을 트리거하는 데 사용할 수 없습니다. 대신'ws close'
이벤트 핸들러에서 발생한 예외로 인해 처리되지 않은 예외가 발생하여 Lambda 간접 호출이 실패합니다. 이렇게 되면 함수를 간접적으로 호출하는 클라이언트가 오류를 처리하고 필요한 경우 Lambda 간접 호출을 재시도할 수 있습니다.클라이언트를 간헐적인 연결 문제로부터 보호하려면 Lambda 함수 자체에 backoff-and-retry 로직을 구현하는 것이 좋습니다. 하지만 위 문제를 해결하려면 클라이언트가 이 특정 연결 문제로 인한 실패를 처리하기 위한 재시도 로직도 구현해야 합니다.
Javascript 코드
const gremlin = require('gremlin'); const async = require('async'); const {getUrlAndHeaders} = require('gremlin-aws-sigv4/lib/utils'); const traversal = gremlin.process.AnonymousTraversalSource.traversal; const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection; const t = gremlin.process.t; const __ = gremlin.process.statics; let conn = null; let g = null; async function query(context) { const id = context.id; return g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(t.id, id) ) .id().next(); } async function doQuery() { const id = Math.floor(Math.random() * 10000).toString(); let result = await query({id: id}); return result['value']; } exports.handler = async (event, context) => { const getConnectionDetails = () => { if (process.env['USE_IAM'] == 'true'){ return getUrlAndHeaders( process.env['NEPTUNE_ENDPOINT'], process.env['NEPTUNE_PORT'], {}, '/gremlin', 'wss'); } else { const database_url = 'wss://' + process.env['NEPTUNE_ENDPOINT'] + ':' + process.env['NEPTUNE_PORT'] + '/gremlin'; return { url: database_url, headers: {}}; } }; const createRemoteConnection = () => { const { url, headers } = getConnectionDetails(); const c = new DriverRemoteConnection( url, { mimeType: 'application/vnd.gremlin-v2.0+json', headers: headers }); c._client._connection.on('close', (code, message) => { console.info(`close - ${code} ${message}`); if (code == 1006){ console.error('Connection closed prematurely'); throw new Error('Connection closed prematurely'); } }); return c; }; const createGraphTraversalSource = (conn) => { return traversal().withRemote(conn); }; if (conn == null){ console.info("Initializing connection") conn = createRemoteConnection(); g = createGraphTraversalSource(conn); } return async.retry( { times: 5, interval: 1000, errorFilter: function (err) { // Add filters here to determine whether error can be retried console.warn('Determining whether retriable error: ' + err.message); // Check for connection issues if (err.message.startsWith('WebSocket is not open')){ console.warn('Reopening connection'); conn.close(); conn = createRemoteConnection(); g = createGraphTraversalSource(conn); return true; } // Check for ConcurrentModificationException if (err.message.includes('ConcurrentModificationException')){ console.warn('Retrying query because of ConcurrentModificationException'); return true; } // Check for ReadOnlyViolationException if (err.message.includes('ReadOnlyViolationException')){ console.warn('Retrying query because of ReadOnlyViolationException'); return true; } return false; } }, doQuery); };
Amazon Neptune용 Python Lambda 함수 예제
다음 Python AWS Lambda 예제 함수에 대해 알아두어야 할 몇 가지 사항은 아래와 같습니다.
백오프 모듈
을 사용합니다. 불필요한 연결 풀을 만들지 않도록
pool_size=1
을 설정합니다.
import os, sys, backoff, math from random import randint from gremlin_python import statics from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection from gremlin_python.driver.protocol import GremlinServerError from gremlin_python.driver import serializer from gremlin_python.process.anonymous_traversal import traversal from gremlin_python.process.graph_traversal import __ from gremlin_python.process.strategies import * from gremlin_python.process.traversal import T from aiohttp.client_exceptions import ClientConnectorError from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.credentials import ReadOnlyCredentials from types import SimpleNamespace import logging logger = logging.getLogger() logger.setLevel(logging.INFO) reconnectable_err_msgs = [ 'ReadOnlyViolationException', 'Server disconnected', 'Connection refused', 'Connection was already closed', 'Connection was closed by server', 'Failed to connect to server: HTTP Error code 403 - Forbidden' ] retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs network_errors = [OSError, ClientConnectorError] retriable_errors = [GremlinServerError, RuntimeError, Exception] + network_errors def prepare_iamdb_request(database_url): service = 'neptune-db' method = 'GET' access_key = os.environ['AWS_ACCESS_KEY_ID'] secret_key = os.environ['AWS_SECRET_ACCESS_KEY'] region = os.environ['AWS_REGION'] session_token = os.environ['AWS_SESSION_TOKEN'] creds = SimpleNamespace( access_key=access_key, secret_key=secret_key, token=session_token, region=region, ) request = AWSRequest(method=method, url=database_url, data=None) SigV4Auth(creds, service, region).add_auth(request) return database_url, request.headers.items() def is_retriable_error(e): is_retriable = False err_msg = str(e) if isinstance(e, tuple(network_errors)): is_retriable = True else: is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs) logger.error('error: [{}] {}'.format(type(e), err_msg)) logger.info('is_retriable: {}'.format(is_retriable)) return is_retriable def is_non_retriable_error(e): return not is_retriable_error(e) def reset_connection_if_connection_issue(params): is_reconnectable = False e = sys.exc_info()[1] err_msg = str(e) if isinstance(e, tuple(network_errors)): is_reconnectable = True else: is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs) logger.info('is_reconnectable: {}'.format(is_reconnectable)) if is_reconnectable: global conn global g conn.close() conn = create_remote_connection() g = create_graph_traversal_source(conn) @backoff.on_exception(backoff.constant, tuple(retriable_errors), max_tries=5, jitter=None, giveup=is_non_retriable_error, on_backoff=reset_connection_if_connection_issue, interval=1) def query(**kwargs): id = kwargs['id'] return (g.V().hasLabel('column').has('column_name', 'amhstr_ag_type').in_('hascolumn').dedup().valueMap().limit(10).toList()) def doQuery(event): return query(id=str(randint(0, 10000))) def lambda_handler(event, context): result = doQuery(event) logger.info('result – {}'.format(result)) return result def create_graph_traversal_source(conn): return traversal().withRemote(conn) def create_remote_connection(): logger.info('Creating remote connection') (database_url, headers) = connection_info() # Convert headers to a dictionary if it's not already headers_dict = dict(headers) if isinstance(headers, list) else headers print(headers) return DriverRemoteConnection( database_url, 'g', pool_size=1, headers=headers_dict) def connection_info(): database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort']) if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true': return prepare_iamdb_request(database_url) else: return (database_url, {}) conn = create_remote_connection() g = create_graph_traversal_source(conn)
다음은 무거운 부하와 가벼운 부하가 번갈아 나타나는 기간을 보여주는 샘플 결과입니다.
