Lambda を利用して UDF を作成およびデプロイする
カスタム UDF を作成するには、UserDefinedFunctionHandler
クラスを拡張して新しい Java クラスを作成します。SDK の UserDefinedFunctionHandler.java
このセクションの手順では、コマンドラインとデプロイから Apache Maven
Maven を使用して Athena のカスタム UDF を作成するには、次のステップを実行します。
SDK のクローン作成と開発環境の準備
始める前に、sudo
yum install git -y
を使用して git がシステムにインストールされていることを確認してください。
AWS クエリフェデレーション SDK をインストールするには
-
コマンドラインで次のコマンドを入力して、SDK リポジトリのクローンを作成します。このリポジトリには、SDK、例、データソースコネクタのスイートが含まれています。データソースコネクタの詳細については、「Amazon Athena 横串検索を使用する」を参照してください。
git clone https://github.com/awslabs/aws-athena-query-federation.git
この手順の前提条件をインストールする
Apache Maven、AWS CLI、および AWS Serverless Application Model ビルドツールが既にインストールされている開発マシンで作業している場合は、このステップをスキップできます。
-
クローン作成時に作成した
aws-athena-query-federation
ディレクトリのルートから、開発環境を準備する prepare_dev_env.shスクリプトを実行します。 -
インストールプロセスによって作成された新しい変数を調達するようにシェルを更新するか、ターミナルセッションを再起動します。
source ~/.profile
重要
このステップをスキップすると、後で AWS CLI または AWS SAM ビルドツールが Lambda 関数を公開できないというエラーが表示されます。
Maven プロジェクトの作成
次のコマンドを実行して、Maven プロジェクトを作成します。groupId
を組織の一意の ID に置き換え、my-athena-udf
をアプリケーション名に置き換えます。詳細については、Apache Maven ドキュメントの「How do I make my first Maven project?
mvn -B archetype:generate \ -DarchetypeGroupId=org.apache.maven.archetypes \ -DgroupId=
groupId
\ -DartifactId=my-athena-udfs
Maven プロジェクトへの依存関係とプラグインの追加
Maven プロジェクト pom.xml
ファイルに次の設定を追加します。例については、GitHub の pom.xml
<properties> <aws-athena-federation-sdk.version>2022.47.1</aws-athena-federation-sdk.version> </properties> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-athena-federation-sdk</artifactId> <version>${aws-athena-federation-sdk.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.1</version> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
UDF に Java コードを記述
UserDefinedFunctionHandler.java
次の例では、UDF の 2 つの Java メソッド、compress()
および decompress()
がクラス MyUserDefinedFunctions
内に作成されます。
*package *com.mycompany.athena.udfs; public class MyUserDefinedFunctions extends UserDefinedFunctionHandler { private static final String SOURCE_TYPE = "MyCompany"; public MyUserDefinedFunctions() { super(SOURCE_TYPE); } /** * Compresses a valid UTF-8 String using the zlib compression library. * Encodes bytes with Base64 encoding scheme. * * @param input the String to be compressed * @return the compressed String */ public String compress(String input) { byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8); // create compressor Deflater compressor = new Deflater(); compressor.setInput(inputBytes); compressor.finish(); // compress bytes to output stream byte[] buffer = new byte[4096]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length); while (!compressor.finished()) { int bytes = compressor.deflate(buffer); byteArrayOutputStream.write(buffer, 0, bytes); } try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException("Failed to close ByteArrayOutputStream", e); } // return encoded string byte[] compressedBytes = byteArrayOutputStream.toByteArray(); return Base64.getEncoder().encodeToString(compressedBytes); } /** * Decompresses a valid String that has been compressed using the zlib compression library. * Decodes bytes with Base64 decoding scheme. * * @param input the String to be decompressed * @return the decompressed String */ public String decompress(String input) { byte[] inputBytes = Base64.getDecoder().decode((input)); // create decompressor Inflater decompressor = new Inflater(); decompressor.setInput(inputBytes, 0, inputBytes.length); // decompress bytes to output stream byte[] buffer = new byte[4096]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length); try { while (!decompressor.finished()) { int bytes = decompressor.inflate(buffer); if (bytes == 0 && decompressor.needsInput()) { throw new DataFormatException("Input is truncated"); } byteArrayOutputStream.write(buffer, 0, bytes); } } catch (DataFormatException e) { throw new RuntimeException("Failed to decompress string", e); } try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException("Failed to close ByteArrayOutputStream", e); } // return decoded string byte[] decompressedBytes = byteArrayOutputStream.toByteArray(); return new String(decompressedBytes, StandardCharsets.UTF_8); } }
JAR ファイルのビルド
mvn clean install
を実行して、プロジェクトをビルドします。正常にビルドされると、JAR ファイルが target
という名前のプロジェクトの
フォルダに作成されます。ここで、artifactId
-version
.jarartifactId
は Maven プロジェクトで指定した名前(例: my-athena-udfs
)です。
JAR を AWS Lambda にデプロイする
コードを Lambda にデプロイする、次の 2 つの方法があります。
-
AWS Serverless Application Repository を使用してデプロイする(推奨)
-
JAR ファイルから Lambda 関数を作成する
オプション 1: AWS Serverless Application Repository にデプロイする
JAR ファイルを AWS Serverless Application Repository にデプロイするときは、アプリケーションのアーキテクチャを表す AWS SAM テンプレート YAML ファイルを作成します。次に、この YAML ファイルと、アプリケーションのアーティファクトがアップロードされて AWS Serverless Application Repository に対して利用可能になる Simple Storage Service (Amazon S3) バケットを指定します。以下の手順では、先ほどクローンした Athena Query Federation SDK の athena-query-federation/tools
ディレクトリにある publish.sh
詳細と要件については、「AWS Serverless Application Repository デベロッパーガイド」の「アプリケーションの公開」、「AWS Serverless Application Model デベロッパーガイド」の「AWS SAM テンプレートの概念」、および「AWS SAM CLI を使用してサーバーレスアプリケーションを公開する」を参照してください。
次の例は、YAML ファイルのパラメータを示しています。同様のパラメータを YAML ファイルに追加し、プロジェクトディレクトリに保存します。完全な例については、GitHub の athena-udf.yaml
Transform: 'AWS::Serverless-2016-10-31' Metadata: 'AWS::ServerlessRepo::Application': Name:
MyApplicationName
Description: 'The description I write for my application
' Author: 'Author Name
' Labels: - athena-federation SemanticVersion: 1.0.0 Parameters: LambdaFunctionName: Description: 'The name of the Lambda function that will contain your UDFs.
' Type: String LambdaTimeout: Description: 'Maximum Lambda invocation runtime in seconds. (min 1 - 900 max)' Default: 900 Type: Number LambdaMemory: Description: 'Lambda memory in MB (min 128 - 3008 max).' Default: 3008 Type: Number Resources: ConnectorConfig: Type: 'AWS::Serverless::Function' Properties: FunctionName: !Ref LambdaFunctionName Handler: "full.path.to.your.handler. For example, com.amazonaws.athena.connectors.udfs.MyUDFHandler
" CodeUri: "Relative path to your JAR file. For example, ./target/athena-udfs-1.0.jar
" Description: "My description of the UDFs that this Lambda function enables.
" Runtime: java8 Timeout: !Ref LambdaTimeout MemorySize: !Ref LambdaMemory
YAML ファイルを保存したプロジェクトディレクトリに publish.sh
スクリプトをコピーし、次のコマンドを実行します。
./publish.sh
MyS3Location
MyYamlFile
たとえば、バケットの場所が s3://amzn-s3-demo-bucket/mysarapps/athenaudf
で、YAML ファイルが my-athena-udfs.yaml
として保存された場合:
./publish.sh amzn-s3-demo-bucket/mysarapps/athenaudf my-athena-udfs
Lambda 関数を作成するには
-
https://console.aws.amazon.com/lambda/
で Lambda コンソールを開いて [Create function] (関数の作成) をクリックし、[Browse serverless app repository] (Serverless Application Repository の参照) を選択します。 -
[プライベートアプリケーション] を選択し、リストでアプリケーションを見つけるか、キーワードを使用してアプリケーションを検索して選択します。
-
アプリケーションの詳細を確認して指定し、[デプロイ] を選択します。
これで、Lambda 関数の JAR ファイルで定義されたメソッド名を Athena で UDF として使用できるようになりました。
オプション 2: Lambda 関数を直接作成する
コンソールまたは AWS CLI を使用して、Lambda 関数を直接作成することもできます。以下の例は、Lambda create-function
CLI コマンドを使用する方法を示しています。
aws lambda create-function \ --function-name
MyLambdaFunctionName
\ --runtime java8 \ --role arn:aws:iam::1234567890123:role/my_lambda_role
\ --handlercom.mycompany.athena.udfs.MyUserDefinedFunctions
\ --timeout 900 \ --zip-file fileb://./target/my-athena-udfs-1.0-SNAPSHOT.jar