实现用户定义的函数 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

实现用户定义的函数

用户定义函数 (UDFs) 是扩展点,允许您调用查询中无法以其他方式表达的常用逻辑或自定义逻辑。你可以使用 Python 或 Java 或 Scala 之类的JVM语言在 Studio 笔记本UDFs中以段落形式实现。您还可以将包含以某种JVM语言UDFs实现的外部JAR文件添加到 Studio 笔记本中。

在实现JARs该寄存器子类UserDefinedFunction(或您自己的抽象类)的抽象类时,请使用 Apache Maven 中提供的作用域、Gradle 中的compileOnly依赖项声明、中SBT提供的作用域或项目构建配置中的UDF等效指令。这允许UDF源代码针对 Flink 进行编译APIs,但是 Flink API 类本身并未包含在编译工件中。请参阅 UDF jar 示例中的这个 pom,它在 Maven 项目中符合这样的先决条件。

要使用控制台向 Studio 笔记本添加UDFJAR文件,请执行以下步骤:

  1. 将您的UDFJAR文件上传到亚马逊 S3。

  2. 在中 AWS Management Console,选择用于创建 Studio 笔记本的自定义创建选项。

  3. 按照 Studio 笔记本的创建工作流程进行操作,直到进入配置步骤。

  4. 用户定义的函数部分,选择添加用户定义的函数

  5. 指定JAR文件或实现您的文件的 Amazon S3 位置UDF。ZIP

  6. 选择 Save changes(保存更改)

要在使用创建新 Studio 笔记本UDFJAR时添加 CreateApplicationAPI,请在CustomArtifactConfiguration数据类型中指定JAR位置。要将添加到现有 Studio 笔记本中,请调用该UpdateApplicationAPI操作并指定CustomArtifactsConfigurationUpdate数据类型中的JAR位置。UDF JAR或者,您可以使用将UDFJAR文件添加 AWS Management Console 到 Studio 笔记本上。

用户定义的函数的注意事项

  • Managed Service for Apache Flink Studio 使用 Apache Zeppelin 的术语,其中笔记本是可以包含多个音符的齐柏林飞艇实例。然后,每个注释可以包含多个段落。通过 Managed Service for Apache Flink Studio,解释器流程可以在笔记本中的所有笔记中共享。因此,如果您在一个注释中使用 Function 执行显式createTemporarySystem函数注册,则可以在同一笔记本的另一个注释中按原样引用相同的函数。

    但是,“部署为应用程序” 操作仅适用于单个笔记,而不是笔记本中的所有笔记。执行部署为应用程序时,仅使用活动注释的内容来生成应用程序。在其他 笔记本 中执行的任何显式函数注册都不是生成的应用程序依赖关系的一部分。此外,在 “部署为应用程序” 选项期间,通过将的主类名转换为小写字符串JAR来进行隐式函数注册。

    例如,如果TextAnalyticsUDF是的主类 UDFJAR,则隐式注册将生成函数名textanalyticsudf。因此,如果 Studio 注释 1 中的显式函数注册如下所示,那么myNewFuncNameForClass由于共享解释器,该笔记本中的所有其他注释(比如注释 2)都可以按名称引用该函数:

    stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())

    但是,在注释 2 中作为应用程序部署操作期间,此显式注册将不包含在依赖项中,因此已部署的应用程序将无法按预期运行。由于采用了隐式注册,因此默认情况下,对该函数的所有引用都应该是 with textanalyticsudf 和 not myNewFuncNameForClass

    如果需要注册自定义函数名,那么注释 2 本身应该包含另一段来执行另一次显式注册,如下所示:

    %flink(parallelism=l) import com.amazonaws.kinesis.udf.textanalytics.TextAnalyticsUDF # re-register the JAR for UDF with custom name stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())
    %flink. ssql(type=update, parallelism=1) INSERT INTO table2 SELECT myNewFuncNameForClass(column_name) FROM table1 ;
  • 如果您UDFJAR包含 FlinkSDKs,请配置您的 Java 项目,以便UDF源代码可以针对 Flink 进行编译SDKs,但是 Flink SDK 类本身并未包含在编译工件中,例如。JAR

    你可以在 Apache Maven 中使用provided作用域,在 Gradle 中使用compileOnly依赖关系声明,在中使用provided作用域SBT,或者在其UDF项目构建配置中使用等效指令。你可以从 UDF jar 示例中引用这个 pom,它符合 maven 项目的先决条件。有关完整的 step-by-step 教程,请参阅此使用适用于 Apache Flink、Amazon Translate 和 Amazon Comprehend 的亚马逊托管服务的SQL函数翻译、编辑和分析流数据