使用 Lambda 建立和部署 UDF - Amazon Athena

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Lambda 建立和部署 UDF

若要建立自訂 UDF,請延伸 UserDefinedFunctionHandler 類別來建立新的 Java 類別。開發套件中的 UserDefinedFunctionHandler.java 的原始程式碼位於 GitHub 的 awslabs/aws-athena-query-federation/athena-federation-sdk 儲存庫中,另外還有 UDF 實作範例,供您檢查和修改以建立自訂 UDF。

本節的步驟示範使用 Apache Maven,從命令列和部署來撰寫和建置自訂 UDF Jar 檔案。

執行下列步驟,使用 Maven 建立適用於 Athena 的自訂 UDF

複製開發套件並準備開發環境

開始之前,務必使用 sudo yum install git -y 將 git 安裝在您的系統上。

安裝 AWS 查詢聯合 SDK
  • 在命令列輸入以下命令,以複製開發套件儲存庫。此儲存庫包含開發套件、範例和一套資料來源連接器。如需資料來源連接器的詳細資訊,請參閱使用 Amazon Athena 聯合查詢

    git clone https://github.com/awslabs/aws-athena-query-federation.git
安裝此程序的先決條件

如果您在已安裝 Apache Maven、 AWS CLI和 AWS Serverless Application Model 建置工具的開發機器上工作,您可以略過此步驟。

  1. 從您複製時所建立的 aws-athena-query-federation 目錄的根目錄,執行 prepare_dev_env.sh 指令碼以準備開發環境。

  2. 更新 shell 以獲取安裝程序所建立的新變數,或重新啟動終端機工作階段。

    source ~/.profile
    重要

    如果您略過此步驟,稍後您會收到有關 AWS CLI 或 AWS SAM 建置工具無法發佈 Lambda 函數的錯誤。

建立 Maven 專案

執行以下命令來建立 Maven 專案。以組織的唯一 ID 替換 groupId,以應用程式的名稱替換 my-athena-udf。如需詳細資訊,請參閱 Apache Maven 文件中的如何建立我的第一個 Maven 專案?

mvn -B archetype:generate \ -DarchetypeGroupId=org.apache.maven.archetypes \ -DgroupId=groupId \ -DartifactId=my-athena-udfs

將相依性和外掛程式新增至 Maven 專案

將以下組態新增至 Maven 專案 pom.xml 檔檔案。相關範例請參閱 GitHub 中的 pom.xml 檔案。

<properties> <aws-athena-federation-sdk.version>2022.47.1</aws-athena-federation-sdk.version> </properties> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-athena-federation-sdk</artifactId> <version>${aws-athena-federation-sdk.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.1</version> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build>

撰寫 UDF 的 Java 程式碼

延伸 UserDefinedFunctionHandler.java 以建立新的類別。在類別中撰寫 UDF。

在下列範例中,類別 MyUserDefinedFunctions 內建立 UDF 的兩個 Java 方法:compress()decompress()

*package *com.mycompany.athena.udfs; public class MyUserDefinedFunctions extends UserDefinedFunctionHandler { private static final String SOURCE_TYPE = "MyCompany"; public MyUserDefinedFunctions() { super(SOURCE_TYPE); } /** * Compresses a valid UTF-8 String using the zlib compression library. * Encodes bytes with Base64 encoding scheme. * * @param input the String to be compressed * @return the compressed String */ public String compress(String input) { byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8); // create compressor Deflater compressor = new Deflater(); compressor.setInput(inputBytes); compressor.finish(); // compress bytes to output stream byte[] buffer = new byte[4096]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length); while (!compressor.finished()) { int bytes = compressor.deflate(buffer); byteArrayOutputStream.write(buffer, 0, bytes); } try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException("Failed to close ByteArrayOutputStream", e); } // return encoded string byte[] compressedBytes = byteArrayOutputStream.toByteArray(); return Base64.getEncoder().encodeToString(compressedBytes); } /** * Decompresses a valid String that has been compressed using the zlib compression library. * Decodes bytes with Base64 decoding scheme. * * @param input the String to be decompressed * @return the decompressed String */ public String decompress(String input) { byte[] inputBytes = Base64.getDecoder().decode((input)); // create decompressor Inflater decompressor = new Inflater(); decompressor.setInput(inputBytes, 0, inputBytes.length); // decompress bytes to output stream byte[] buffer = new byte[4096]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length); try { while (!decompressor.finished()) { int bytes = decompressor.inflate(buffer); if (bytes == 0 && decompressor.needsInput()) { throw new DataFormatException("Input is truncated"); } byteArrayOutputStream.write(buffer, 0, bytes); } } catch (DataFormatException e) { throw new RuntimeException("Failed to decompress string", e); } try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException("Failed to close ByteArrayOutputStream", e); } // return decoded string byte[] decompressedBytes = byteArrayOutputStream.toByteArray(); return new String(decompressedBytes, StandardCharsets.UTF_8); } }

建置 JAR 檔案

執行 mvn clean install 來建置專案。成功建置後,專案的 target 資料夾中會建立名為 artifactId-version.jar 的 JAR 檔案,其中 artifactId 是您在 Maven 專案中提供的名稱,例如 my-athena-udfs

將 JAR 部署至 AWS Lambda

將程式碼部署到 Lambda 時有兩種選項:

  • 使用 部署 AWS Serverless Application Repository (建議)

  • 從 JAR 檔案建立 Lambda 函數

選項 1:部署至 AWS Serverless Application Repository

當您將 JAR 檔案部署到 時 AWS Serverless Application Repository,您可以建立範本 AWS SAM YAML 檔案,代表應用程式的架構。然後,您需指定此 YAML 檔案和 Simple Storage Service (Amazon S3) 儲存貯體,應用程式的成品會上傳到此處,並提供給 AWS Serverless Application Repository。下列程序使用 publish.sh 指令碼 (位於您先前複製的 Athena Query Federation 軟體開發套件的 athena-query-federation/tools 目錄中)。

如需詳細資訊和需求,請參閱《 AWS Serverless Application Repository 開發人員指南》中的發佈應用程式、》 AWS Serverless Application Model 開發人員指南》中的AWS SAM 範本概念,以及使用 AWS SAM CLI 發佈無伺服器應用程式

下列範例示範 YAML 檔案中的參數。將類似的參數新增至 YAML 檔案,並儲存在專案目錄中。如需完整範例,請參閱 GitHub 中的 athena-udf.yaml

Transform: 'AWS::Serverless-2016-10-31' Metadata: 'AWS::ServerlessRepo::Application': Name: MyApplicationName Description: 'The description I write for my application' Author: 'Author Name' Labels: - athena-federation SemanticVersion: 1.0.0 Parameters: LambdaFunctionName: Description: 'The name of the Lambda function that will contain your UDFs.' Type: String LambdaTimeout: Description: 'Maximum Lambda invocation runtime in seconds. (min 1 - 900 max)' Default: 900 Type: Number LambdaMemory: Description: 'Lambda memory in MB (min 128 - 3008 max).' Default: 3008 Type: Number Resources: ConnectorConfig: Type: 'AWS::Serverless::Function' Properties: FunctionName: !Ref LambdaFunctionName Handler: "full.path.to.your.handler. For example, com.amazonaws.athena.connectors.udfs.MyUDFHandler" CodeUri: "Relative path to your JAR file. For example, ./target/athena-udfs-1.0.jar" Description: "My description of the UDFs that this Lambda function enables." Runtime: java8 Timeout: !Ref LambdaTimeout MemorySize: !Ref LambdaMemory

publish.sh 指令碼複製到您儲存 YAML 檔案的專案目錄,然後執行以下命令:

./publish.sh MyS3Location MyYamlFile

例如,如果儲存貯體位置是 s3://amzn-s3-demo-bucket/mysarapps/athenaudf,而且 YAML 檔案儲存為 my-athena-udfs.yaml

./publish.sh amzn-s3-demo-bucket/mysarapps/athenaudf my-athena-udfs
若要建立 Lambda 函數
  1. 前往以下位置開啟 Lambda 主控台:https://console.aws.amazon.com/lambda/,選擇 Create function (建立函數),然後選擇 Browse serverless app repository (瀏覽無伺服器應用程式儲存庫)

  2. 選擇 Private applications (私有應用程式),在清單中尋找您的應用程式,或使用關鍵字搜尋應用程式,然後選取應用程式。

  3. 檢閱並提供應用程式詳細資料,然後選擇 Deploy (部署)

    您現在可以使用 Lambda 函數 JAR 檔案中定義的方法名稱,作為 Athena 中的 UDF。

選項 2:直接建立 Lambda 函數

您也可以直接使用 主控台或 建立 Lambda 函數 AWS CLI。下列範例示範如何使用 Lambda create-function CLI 命令。

aws lambda create-function \ --function-name MyLambdaFunctionName \ --runtime java8 \ --role arn:aws:iam::1234567890123:role/my_lambda_role \ --handler com.mycompany.athena.udfs.MyUserDefinedFunctions \ --timeout 900 \ --zip-file fileb://./target/my-athena-udfs-1.0-SNAPSHOT.jar