實現用戶定義函 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

實現用戶定義函

用戶定義函數(UDFs)是擴展點,允許您調用常用邏輯或自定義邏輯,這些邏輯不能以其他方式在查詢中表達。您可以使用 Python 或類似 Java 或斯卡拉的JVM語言來實現您的工作室筆記本UDFs中的段落。您也可以將包含以某種JVM語言UDFs實作的 Studio 筆記本外部JAR檔案加入。

當實現JARs該註冊子類UserDefinedFunction(或您自己的抽像類)的抽像類時,請在 Apache Maven 中使用提供的範圍,Gradle 中的compileOnly依賴聲明SBT,在中提供的範圍或UDF項目構建配置中的等效指令。這允許UDF源代碼針對 Flink 進行編譯APIs,但 Flink API 類本身不包含在構建工件中。從堅持 Maven 項目上這樣的先決條件的 UDF jar 例子中參考這個 pom

若要使用主控台將UDFJAR檔案新增至您的 Studio 筆記本,請依照下列步驟執行:

  1. 將您的UDFJAR檔案上傳到 Amazon S3。

  2. 在中 AWS Management Console,選擇用於建立 Studio 筆記本的自訂建立選項。

  3. 遵循 Studio 筆記本建立工作流程,直到進入組態步驟。

  4. 使用者定義的函數區段中,選擇新增使用者定義的函數

  5. 指定JAR檔案的 Amazon S3 位置或具有您的UDF. ZIP

  6. 選擇 Save changes (儲存變更)。

若要在使用建立新 Studio 筆記本UDFJAR時加入 CreateApplicationAPI,請在CustomArtifactConfiguration資料類型中指定JAR位置。若要新增UDFJAR至現有 Studio 筆記本,請呼叫UpdateApplicationAPI作業並指定CustomArtifactsConfigurationUpdate資料類型中的JAR位置。或者,您可以使用將UDFJAR檔案加入 AWS Management Console 至您的 Studio 筆記本。

使用使用者定義函數的考量

  • Managed Service for Apache Flink Studio 使用 Apache Zeppelin 術語,其中筆記本是指一個 Zeppelin 執行個體,可以包含多條筆記。然後,每條筆記可以包含多個段落。借助 Managed Service for Apache Flink Studio,解譯器過程在筆記本中的所有筆記間共用。因此,如果您在一個音符中使用 Function 執行明確的createTemporarySystem函數註冊,則可以在同一筆記本的另一個筆記中按原樣引用相同的函數註冊。

    然而,部署為應用程式作業只適用於個別筆記,而不是筆記本中的所有筆記。當您執行部署為應用程式時,只會使用作用中筆記的內容來產生應用程式。在其他筆記本中執行的任何明確函數註冊都不屬於產生的應用程式相依性。此外,在部署為應用程式選項期間,會透過將的主類別名稱轉換為小寫字串,JAR進行隱含函數註冊。

    例如,如果TextAnalyticsUDF是的主類 UDFJAR,則隱式註冊將導致函數名稱textanalyticsudf。因此,如果 Studio 的筆記 1 中的明確函數註冊如下所示發生,那麼因為共用解譯器,該筆記本中的所有其他筆記 (例如筆記 2) 均可透過名稱 myNewFuncNameForClass 引用該函數:

    stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())

    但是,在對筆記 2 執行部署為應用程式操作期間,此明確註冊將不包含在相依性中,因此已部署的應用程式將無法按預期執行。由於隱含註冊,依預設,對此函數的所有引用都應帶有 textanalyticsudf 而不是 myNewFuncNameForClass

    如果需要進行自訂函數名稱註冊,則筆記 2 本身預計將包含另一個段落來執行另一個明確註冊,如下所示:

    %flink(parallelism=l) import com.amazonaws.kinesis.udf.textanalytics.TextAnalyticsUDF # re-register the JAR for UDF with custom name stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())
    %flink. ssql(type=update, parallelism=1) INSERT INTO table2 SELECT myNewFuncNameForClass(column_name) FROM table1 ;
  • 如果您UDFJAR包含 FlinkSDKs,請配置您的 Java 項目,以便UDF源代碼可以針對 Flink 進行編譯SDKs,但 Flink SDK 類本身不包含在構建工件中,例如. JAR

    您可以在 Apache Maven 中使用provided範圍,Gradle 中的compileOnly依賴聲明,provided範圍或其UDF項目構建配置中的等效指令。SBT您可以從 UDF jar 示例中引用此 pom,該示例堅持 maven 項目上的先決條件。如需完整的 step-by-step 教學課程,請參閱這篇文章,使用具有適用於 Apache Flink、Amazon Translate 和亞馬遜 Amazon Comprehend 的亞馬遜受管服務的SQL功能來翻譯、編輯和分析串流資料