使用 Lambda 创建和部署 UDF - Amazon Athena

使用 Lambda 创建和部署 UDF

要创建自定义 UDF,可以通过扩展 UserDefinedFunctionHandler 类来创建新的 Java 类。开发工具包中的 UserDefinedFunctionHandler.java 的源代码在 GitHub 上的 awslabs/aws-athena-query-federation/athena-federation-sdk 存储库中提供,其中还提供有示例 UDF 实施,您可以对其进行检查和修改,以创建自定义 UDF。

本节中的步骤演示从命令行使用 Apache Maven 编写和构建自定义 UDF Jar 文件和部署。

执行以下步骤以使用 Maven 为 Athena 创建自定义 UDF

克隆开发工具包并准备开发环境

在开始之前,请确保使用 sudo yum install git -y 在您的系统上安装 git。

安装 AWS 查询联合开发工具包
  • 在命令行输入以下内容以克隆软件开发工具包存储库。此存储库包括软件开发工具包、示例和一套数据源连接器。有关数据源连接器的更多信息,请参阅使用 Amazon Athena 联合查询

    git clone https://github.com/awslabs/aws-athena-query-federation.git
此程序的安装先决条件

如果您正在使用已安装 Apache Maven、AWS CLI 和 AWS Serverless Application Model 构建工具的开发计算机,则可以跳过此步骤。

  1. 从克隆时创建的 aws-athena-query-federation 目录的根目录下,运行准备开发环境的 prepare_dev_env.sh 脚本。

  2. 更新您的 shell 以获取在安装过程中创建的新变量或重新启动终端会话。

    source ~/.profile
    重要

    如果您跳过此步骤,稍后将收到关于 AWS CLI 或 AWS SAM 构建工具无法发布 Lambda 函数的错误。

创建您的 Maven 项目

运行以下命令以创建您的 Maven 项目。将 groupId 替换为组织的唯一 ID,并将 my-athena-udf 替换为您的应用程序的名称。有关详细信息,请参阅 Apache Maven 文档中的如何创建我的第一个 Maven 项目?

mvn -B archetype:generate \ -DarchetypeGroupId=org.apache.maven.archetypes \ -DgroupId=groupId \ -DartifactId=my-athena-udfs

将依赖项和插件添加到您的 Maven 项目中

将以下配置添加到 Maven 项目 pom.xml 文件中。要查看示例,请参阅 GitHub 中的 pom.xml 文件。

<properties> <aws-athena-federation-sdk.version>2022.47.1</aws-athena-federation-sdk.version> </properties> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-athena-federation-sdk</artifactId> <version>${aws-athena-federation-sdk.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.1</version> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build>

为 UDF 编写 Java 代码

通过扩展 UserDefinedFunctionHandler.java 来创建一个新类。在类中编写您的 UDF。

在以下示例中,已在 MyUserDefinedFunctions 类中创建了两个用于 UDF、compress()decompress() 的 Java 方法。

*package *com.mycompany.athena.udfs; public class MyUserDefinedFunctions extends UserDefinedFunctionHandler { private static final String SOURCE_TYPE = "MyCompany"; public MyUserDefinedFunctions() { super(SOURCE_TYPE); } /** * Compresses a valid UTF-8 String using the zlib compression library. * Encodes bytes with Base64 encoding scheme. * * @param input the String to be compressed * @return the compressed String */ public String compress(String input) { byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8); // create compressor Deflater compressor = new Deflater(); compressor.setInput(inputBytes); compressor.finish(); // compress bytes to output stream byte[] buffer = new byte[4096]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length); while (!compressor.finished()) { int bytes = compressor.deflate(buffer); byteArrayOutputStream.write(buffer, 0, bytes); } try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException("Failed to close ByteArrayOutputStream", e); } // return encoded string byte[] compressedBytes = byteArrayOutputStream.toByteArray(); return Base64.getEncoder().encodeToString(compressedBytes); } /** * Decompresses a valid String that has been compressed using the zlib compression library. * Decodes bytes with Base64 decoding scheme. * * @param input the String to be decompressed * @return the decompressed String */ public String decompress(String input) { byte[] inputBytes = Base64.getDecoder().decode((input)); // create decompressor Inflater decompressor = new Inflater(); decompressor.setInput(inputBytes, 0, inputBytes.length); // decompress bytes to output stream byte[] buffer = new byte[4096]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length); try { while (!decompressor.finished()) { int bytes = decompressor.inflate(buffer); if (bytes == 0 && decompressor.needsInput()) { throw new DataFormatException("Input is truncated"); } byteArrayOutputStream.write(buffer, 0, bytes); } } catch (DataFormatException e) { throw new RuntimeException("Failed to decompress string", e); } try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException("Failed to close ByteArrayOutputStream", e); } // return decoded string byte[] decompressedBytes = byteArrayOutputStream.toByteArray(); return new String(decompressedBytes, StandardCharsets.UTF_8); } }

构建 JAR 文件

运行 mvn clean install 以构建您的项目。成功构建后,将在名为 artifactId-version.jar 的项目的 target 文件夹中创建一个 JAR 文件,其中 artifactId 是您在 Maven 项目中提供的名称,例如 my-athena-udfs

将 JAR 部署到 AWS Lambda

有两个选项可以将代码部署到 Lambda:

  • 使用 AWS Serverless Application Repository 部署(推荐)

  • 从 JAR 文件创建 Lambda 函数

选项 1:部署到 AWS Serverless Application Repository

将 JAR 文件部署到 AWS Serverless Application Repository 时,您将创建一个代表应用程序体系结构的 AWS SAM 模板 YAML 文件。然后,您可以指定此 YAML 文件和一个 Amazon S3 存储桶,在其中上载应用程序的构件并使其对 AWS Serverless Application Repository 可用。下面的过程使用 publish.sh 脚本,该脚本位于您之前克隆的 Athena Query Federation SDK 的 athena-query-federation/tools 目录中。

有关更多信息和要求,请参阅《AWS Serverless Application Repository 开发人员指南》中的 发布应用程序、《AWS Serverless Application Model 开发人员指南》中的 AWS SAM 模板概念使用 AWS SAM CLI 发布无服务器应用程序

以下示例演示了 YAML 文件中的参数。将类似的参数添加到 YAML 文件并将其保存在项目目录中。有关完整示例,请参阅 GitHub 中的 athena-udf.yaml

Transform: 'AWS::Serverless-2016-10-31' Metadata: 'AWS::ServerlessRepo::Application': Name: MyApplicationName Description: 'The description I write for my application' Author: 'Author Name' Labels: - athena-federation SemanticVersion: 1.0.0 Parameters: LambdaFunctionName: Description: 'The name of the Lambda function that will contain your UDFs.' Type: String LambdaTimeout: Description: 'Maximum Lambda invocation runtime in seconds. (min 1 - 900 max)' Default: 900 Type: Number LambdaMemory: Description: 'Lambda memory in MB (min 128 - 3008 max).' Default: 3008 Type: Number Resources: ConnectorConfig: Type: 'AWS::Serverless::Function' Properties: FunctionName: !Ref LambdaFunctionName Handler: "full.path.to.your.handler. For example, com.amazonaws.athena.connectors.udfs.MyUDFHandler" CodeUri: "Relative path to your JAR file. For example, ./target/athena-udfs-1.0.jar" Description: "My description of the UDFs that this Lambda function enables." Runtime: java8 Timeout: !Ref LambdaTimeout MemorySize: !Ref LambdaMemory

publish.sh 脚本复制到保存 YAML 文件的项目目录中,然后运行以下命令:

./publish.sh MyS3Location MyYamlFile

例如,如果您的存储桶位置为 s3://amzn-s3-demo-bucket/mysarapps/athenaudf 并且您的 YAML 文件已另存为 my-athena-udfs.yaml

./publish.sh amzn-s3-demo-bucket/mysarapps/athenaudf my-athena-udfs
创建 Lambda 函数
  1. 在以下位置打开 Lambda 控制台:https://console.aws.amazon.com/lambda/,选择 Create function(创建函数),然后选择 Browse serverless app repository(浏览无服务器应用程序存储库)

  2. 选择 Private applications (私有应用程序),在列表中找到您的应用程序,或使用关键词搜索它,然后选择它。

  3. 查看并提供应用程序详细信息,然后选择 Deploy (部署)

    您现在可以使用在 Lambda 函数 JAR 文件中定义的方法名称作为 Athena 中的 UDF。

选项 2:直接创建 Lambda 函数

您也可以直接使用控制台或 AWS CLI 创建 Lambda 函数。以下示例演示如何使用 create-function Lambda CLI 命令。

aws lambda create-function \ --function-name MyLambdaFunctionName \ --runtime java8 \ --role arn:aws:iam::1234567890123:role/my_lambda_role \ --handler com.mycompany.athena.udfs.MyUserDefinedFunctions \ --timeout 900 \ --zip-file fileb://./target/my-athena-udfs-1.0-SNAPSHOT.jar