Lambda を利用して UDF を作成およびデプロイする - Amazon Athena

Lambda を利用して UDF を作成およびデプロイする

カスタム UDF を作成するには、UserDefinedFunctionHandler クラスを拡張して新しい Java クラスを作成します。SDK の UserDefinedFunctionHandler.java のソースコードは、GitHub の awslabs/aws-athena-query-federation/athena-federation-sdk リポジトリで入手できます。このコードには、検証および変更して UDF を作成するために使用できる UDF 実装例が付属しています。

このセクションの手順では、コマンドラインとデプロイから Apache Maven を使用してカスタム UDF Jar ファイルを記述し、構築する方法を示します。

Maven を使用して Athena のカスタム UDF を作成するには、次のステップを実行します。

SDK のクローン作成と開発環境の準備

始める前に、sudo yum install git -y を使用して git がシステムにインストールされていることを確認してください。

AWS クエリフェデレーション SDK をインストールするには
  • コマンドラインで次のコマンドを入力して、SDK リポジトリのクローンを作成します。このリポジトリには、SDK、例、データソースコネクタのスイートが含まれています。データソースコネクタの詳細については、「Amazon Athena 横串検索を使用する」を参照してください。

    git clone https://github.com/awslabs/aws-athena-query-federation.git
この手順の前提条件をインストールする

Apache Maven、AWS CLI、および AWS Serverless Application Model ビルドツールが既にインストールされている開発マシンで作業している場合は、このステップをスキップできます。

  1. クローン作成時に作成した aws-athena-query-federation ディレクトリのルートから、開発環境を準備する prepare_dev_env.sh スクリプトを実行します。

  2. インストールプロセスによって作成された新しい変数を調達するようにシェルを更新するか、ターミナルセッションを再起動します。

    source ~/.profile
    重要

    このステップをスキップすると、後で AWS CLI または AWS SAM ビルドツールが Lambda 関数を公開できないというエラーが表示されます。

Maven プロジェクトの作成

次のコマンドを実行して、Maven プロジェクトを作成します。groupId を組織の一意の ID に置き換え、my-athena-udf をアプリケーション名に置き換えます。詳細については、Apache Maven ドキュメントの「How do I make my first Maven project?」を参照してください。

mvn -B archetype:generate \ -DarchetypeGroupId=org.apache.maven.archetypes \ -DgroupId=groupId \ -DartifactId=my-athena-udfs

Maven プロジェクトへの依存関係とプラグインの追加

Maven プロジェクト pom.xml ファイルに次の設定を追加します。例については、GitHub の pom.xml ファイルを参照してください。

<properties> <aws-athena-federation-sdk.version>2022.47.1</aws-athena-federation-sdk.version> </properties> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-athena-federation-sdk</artifactId> <version>${aws-athena-federation-sdk.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.1</version> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build>

UDF に Java コードを記述

UserDefinedFunctionHandler.java を拡張して、新しいクラスを作成します。クラス内に UDF を記述します。

次の例では、UDF の 2 つの Java メソッド、compress() および decompress() がクラス MyUserDefinedFunctions 内に作成されます。

*package *com.mycompany.athena.udfs; public class MyUserDefinedFunctions extends UserDefinedFunctionHandler { private static final String SOURCE_TYPE = "MyCompany"; public MyUserDefinedFunctions() { super(SOURCE_TYPE); } /** * Compresses a valid UTF-8 String using the zlib compression library. * Encodes bytes with Base64 encoding scheme. * * @param input the String to be compressed * @return the compressed String */ public String compress(String input) { byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8); // create compressor Deflater compressor = new Deflater(); compressor.setInput(inputBytes); compressor.finish(); // compress bytes to output stream byte[] buffer = new byte[4096]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length); while (!compressor.finished()) { int bytes = compressor.deflate(buffer); byteArrayOutputStream.write(buffer, 0, bytes); } try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException("Failed to close ByteArrayOutputStream", e); } // return encoded string byte[] compressedBytes = byteArrayOutputStream.toByteArray(); return Base64.getEncoder().encodeToString(compressedBytes); } /** * Decompresses a valid String that has been compressed using the zlib compression library. * Decodes bytes with Base64 decoding scheme. * * @param input the String to be decompressed * @return the decompressed String */ public String decompress(String input) { byte[] inputBytes = Base64.getDecoder().decode((input)); // create decompressor Inflater decompressor = new Inflater(); decompressor.setInput(inputBytes, 0, inputBytes.length); // decompress bytes to output stream byte[] buffer = new byte[4096]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length); try { while (!decompressor.finished()) { int bytes = decompressor.inflate(buffer); if (bytes == 0 && decompressor.needsInput()) { throw new DataFormatException("Input is truncated"); } byteArrayOutputStream.write(buffer, 0, bytes); } } catch (DataFormatException e) { throw new RuntimeException("Failed to decompress string", e); } try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException("Failed to close ByteArrayOutputStream", e); } // return decoded string byte[] decompressedBytes = byteArrayOutputStream.toByteArray(); return new String(decompressedBytes, StandardCharsets.UTF_8); } }

JAR ファイルのビルド

mvn clean install を実行して、プロジェクトをビルドします。正常にビルドされると、JAR ファイルが target という名前のプロジェクトの artifactId-version.jar フォルダに作成されます。ここで、artifactId は Maven プロジェクトで指定した名前(例: my-athena-udfs)です。

JAR を AWS Lambda にデプロイする

コードを Lambda にデプロイする、次の 2 つの方法があります。

  • AWS Serverless Application Repository を使用してデプロイする(推奨)

  • JAR ファイルから Lambda 関数を作成する

オプション 1: AWS Serverless Application Repository にデプロイする

JAR ファイルを AWS Serverless Application Repository にデプロイするときは、アプリケーションのアーキテクチャを表す AWS SAM テンプレート YAML ファイルを作成します。次に、この YAML ファイルと、アプリケーションのアーティファクトがアップロードされて AWS Serverless Application Repository に対して利用可能になる Simple Storage Service (Amazon S3) バケットを指定します。以下の手順では、先ほどクローンした Athena Query Federation SDK の athena-query-federation/tools ディレクトリにある publish.sh スクリプトを使用します。

詳細と要件については、「AWS Serverless Application Repository デベロッパーガイド」の「アプリケーションの公開」、「AWS Serverless Application Model デベロッパーガイド」の「AWS SAM テンプレートの概念」、および「AWS SAM CLI を使用してサーバーレスアプリケーションを公開する」を参照してください。

次の例は、YAML ファイルのパラメータを示しています。同様のパラメータを YAML ファイルに追加し、プロジェクトディレクトリに保存します。完全な例については、GitHub の athena-udf.yaml を参照してください。

Transform: 'AWS::Serverless-2016-10-31' Metadata: 'AWS::ServerlessRepo::Application': Name: MyApplicationName Description: 'The description I write for my application' Author: 'Author Name' Labels: - athena-federation SemanticVersion: 1.0.0 Parameters: LambdaFunctionName: Description: 'The name of the Lambda function that will contain your UDFs.' Type: String LambdaTimeout: Description: 'Maximum Lambda invocation runtime in seconds. (min 1 - 900 max)' Default: 900 Type: Number LambdaMemory: Description: 'Lambda memory in MB (min 128 - 3008 max).' Default: 3008 Type: Number Resources: ConnectorConfig: Type: 'AWS::Serverless::Function' Properties: FunctionName: !Ref LambdaFunctionName Handler: "full.path.to.your.handler. For example, com.amazonaws.athena.connectors.udfs.MyUDFHandler" CodeUri: "Relative path to your JAR file. For example, ./target/athena-udfs-1.0.jar" Description: "My description of the UDFs that this Lambda function enables." Runtime: java8 Timeout: !Ref LambdaTimeout MemorySize: !Ref LambdaMemory

YAML ファイルを保存したプロジェクトディレクトリに publish.sh スクリプトをコピーし、次のコマンドを実行します。

./publish.sh MyS3Location MyYamlFile

たとえば、バケットの場所が s3://amzn-s3-demo-bucket/mysarapps/athenaudf で、YAML ファイルが my-athena-udfs.yaml として保存された場合:

./publish.sh amzn-s3-demo-bucket/mysarapps/athenaudf my-athena-udfs
Lambda 関数を作成するには
  1. https://console.aws.amazon.com/lambda/ で Lambda コンソールを開いて [Create function] (関数の作成) をクリックし、[Browse serverless app repository] (Serverless Application Repository の参照) を選択します。

  2. [プライベートアプリケーション] を選択し、リストでアプリケーションを見つけるか、キーワードを使用してアプリケーションを検索して選択します。

  3. アプリケーションの詳細を確認して指定し、[デプロイ] を選択します。

    これで、Lambda 関数の JAR ファイルで定義されたメソッド名を Athena で UDF として使用できるようになりました。

オプション 2: Lambda 関数を直接作成する

コンソールまたは AWS CLI を使用して、Lambda 関数を直接作成することもできます。以下の例は、Lambda create-function CLI コマンドを使用する方法を示しています。

aws lambda create-function \ --function-name MyLambdaFunctionName \ --runtime java8 \ --role arn:aws:iam::1234567890123:role/my_lambda_role \ --handler com.mycompany.athena.udfs.MyUserDefinedFunctions \ --timeout 900 \ --zip-file fileb://./target/my-athena-udfs-1.0-SNAPSHOT.jar