

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 实施用户定义的函数
<a name="how-zeppelin-udf"></a>

用户定义函数 (UDFs) 是扩展点，允许您调用查询中无法以其他方式表达的常用逻辑或自定义逻辑。你可以使用 Python 或 Java 或 Scala 等 JVM 语言在 Studio UDFs 笔记本中的段落中实现。您还可以将包含以 JVM 语言 UDFs 实现的外部 JAR 文件添加到您的 Studio 笔记本中。

在实现 JARs 该注册子类`UserDefinedFunction`（或您自己的抽象类）的抽象类时，请使用 Apache Maven 中提供的作用域、Gradle 中的`compileOnly`依赖项声明、SBT 中提供的作用域或您的 UDF 项目构建配置中的等效指令。这允许 UDF 源代码针对 Flink 进行编译 APIs，但是 Flink API 类本身并未包含在编译工件中。请参阅 UDF jar 示例中的这个 [pom](https://github.com/aws-samples/kinesis-udfs-textanalytics/blob/ec27108faa48f1a4c5d173ed3a2ef4565b58b5b5/kinesis-udfs-textanalytics-linear/pom.xml#L47)，它在 Maven 项目中符合这样的先决条件。

**注意**  
*有关示例设置，请参阅AWS Machine Learning 博客*中的[通过 Amazon Managed Service for Apache Flink、Amazon Translate 和 Amazon Comprehend 使用 SQL 函数翻译、编辑和分析流数据](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesis-data-analytics-amazon-translate-and-amazon-comprehend/)。

要使用控制台将 UDF JAR 文件添加到 Studio 笔记本中，请执行以下步骤：

1. 将您的 UDF JAR 文件上载到 Amazon S3。

1. 在中 AWS 管理控制台，选择用于**创建 Studio 笔记本的自定义**创建选项。

1. 按照 Studio 笔记本的创建工作流程进行操作，直到进入**配置**步骤。

1. 在**用户定义的函数**部分，选择**添加用户定义的函数**。

1. 指定实现 UDF 的 JAR 文件或 ZIP 文件的 Amazon S3 位置。

1. 选择**保存更改**。

要在使用 [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)API 创建新的 Studio 笔记本时添加 UDF JAR，请在`CustomArtifactConfiguration`数据类型中指定 JAR 位置。要将 UDF JAR 添加到现有 Studio 笔记本中，请调用 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)API 操作并在`CustomArtifactsConfigurationUpdate`数据类型中指定 JAR 位置。或者，您可以使用将 UDF JAR 文件 AWS 管理控制台 添加到您的 Studio 笔记本中。

## 用户定义的函数的注意事项
<a name="how-zeppelin-udf-considerations"></a>
+ Managed Service for Apache Flink Studio 使用 [Apache Zeppelin 的术语](https://zeppelin.apache.org/docs/0.9.0/quickstart/explore_ui.html)，其中笔记本是可以包含多个音符的齐柏林飞艇实例。然后，每个注释可以包含多个段落。通过 Managed Service for Apache Flink Studio，解释器流程可以在笔记本中的所有笔记中共享。因此，如果您在一个注释中使用 Function 执行显式[createTemporarySystem函数](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableEnvironment.html#createTemporarySystemFunction-java.lang.String-java.lang.Class-)注册，则可以在同一笔记本的另一个注释中按原样引用相同的函数。

  但是，“*部署为应用程序*” 操作仅适用于*单个*笔记，而不是笔记本中的所有笔记。执行部署为应用程序时，仅使用活动注释的内容来生成应用程序。在其他 笔记本 中执行的任何显式函数注册都不是生成的应用程序依赖关系的一部分。此外，在 “部署为应用程序” 选项期间，通过将 JAR 的主类名转换为小写字符串来进行隐式函数注册。

   例如，如果`TextAnalyticsUDF`是 UDF JAR 的主类，则隐式注册将生成函数名称`textanalyticsudf`。因此，如果 Studio 注释 1 中的显式函数注册如下所示，那么`myNewFuncNameForClass`由于共享解释器，该笔记本中的所有其他注释（比如注释 2）都可以按名称引用该函数：

  `stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())`

   但是，在注释 2 中作为应用程序部署操作期间，此显式注册*将不包含*在依赖项中，因此已部署的应用程序将无法按预期运行。由于采用了隐式注册，因此默认情况下，对该函数的所有引用都应该是 with `textanalyticsudf` 和 not `myNewFuncNameForClass`。

   如果需要注册自定义函数名，那么注释 2 本身应该包含另一段来执行另一次显式注册，如下所示：

  ```
  %flink(parallelism=l)
  import com.amazonaws.kinesis.udf.textanalytics.TextAnalyticsUDF 
  # re-register the JAR for UDF with custom name
  stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())
  ```

  ```
  %flink. ssql(type=update, parallelism=1) 
  INSERT INTO
      table2
  SELECT
      myNewFuncNameForClass(column_name)
  FROM
      table1
  ;
  ```
+ 如果您的 UDF JAR 包含 Flink SDKs，请配置您的 Java 项目，以便 UDF 源代码可以针对 Flink 进行编译 SDKs，但是 Flink SDK 类本身并未包含在构建工件（例如 JAR）中。

  您可以在 Apache Maven 中使用`provided`作用域，在 Gradle 中使用`compileOnly`依赖关系声明，在 SBT 中使用`provided`作用域，或者在他们的 UDF 项目构建配置中使用等效指令。您可以从 UDF jar 示例中引用这个 [pom](https://github.com/aws-samples/kinesis-udfs-textanalytics/blob/ec27108faa48f1a4c5d173ed3a2ef4565b58b5b5/kinesis-udfs-textanalytics-linear/pom.xml#L47)，它在 maven 项目中遵循了这样的先决条件。有关完整的 step-by-step教程，请参阅此[使用适用于 Apache Flink、Amazon Translate 和 Amazon Comprehend 的亚马逊托管服务的 SQL 函数翻译、编辑和分析流数据](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesis-data-analytics-amazon-translate-and-amazon-comprehend/)。