

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 将 Amazon MWAA 与 Amazon EKS 一起使用
<a name="mwaa-eks-example"></a>

以下示例演示了如何将 Amazon MWAA 与 Amazon EKS 一起使用。

**Topics**
+ [版本](#mwaa-eks-example-version)
+ [先决条件](#eksctl-prereqs)
+ [创建 Amazon EC2 公有密钥](#eksctl-create-key)
+ [创建集群](#create-cluster-eksctl)
+ [创建 `mwaa` 命名空间](#eksctl-namespace)
+ [为 `mwaa` 命名空间创建角色](#eksctl-role)
+ [创建并附加 Amazon EKS 集群的 IAM 角色](#eksctl-iam-role)
+ [创建 requirements.txt 文件](#eksctl-requirements)
+ [为 Amazon EKS 创建身份映射](#eksctl-identity-map)
+ [创建 `kubeconfig`](#eksctl-kube-config)
+ [创建 DAG](#eksctl-create-dag)
+ [将 DAG 和 `kube_config.yaml` 添加到 Amazon S3 存储桶中](#eksctl-dag-bucket)
+ [启用并触发示例](#eksctl-trigger-pod)

## 版本
<a name="mwaa-eks-example-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="eksctl-prereqs"></a>

要使用本主题中的示例，您需要以下内容：
+ [Amazon MWAA 环境。](get-started.md)
+ eksctl。要了解更多信息，请参阅[安装 eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html#install-eksctl)。
+ kubectl。要了解更多信息，请参阅[安装和设置 kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/)。在某些情况下，它是与 eksctl 一起安装的。
+ 在您创建 Amazon MWAA 环境的区域中的 EC2 密钥对。要了解更多信息，请参阅[创建或导入密钥对](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair)。

**注意**  
使用 `eksctl` 命令时，可以包含 `--profile`，以指定默认配置文件以外的配置文件。

## 创建 Amazon EC2 公有密钥
<a name="eksctl-create-key"></a>

使用以下命令，以从私有密钥对中创建公有密钥。

```
ssh-keygen -y -f myprivatekey.pem > mypublickey.pub
```

要了解更新信息，请参阅[检索密钥对的公有密钥](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#retrieving-the-public-key)。

## 创建集群
<a name="create-cluster-eksctl"></a>

使用以下命令来创建集群。如果您想要为集群自定义名称或在其他区域创建集群，请替换名称和区域值。您必须在与您在创建 Amazon MWAA 环境的同一区域中创建集群。替换子网的值，使其与您用于 Amazon MWAA 的 Amazon VPC 网络中的子网相匹配。替换 `ssh-public-key` 的值以匹配您使用的密钥。您可以使用位于同一区域的 Amazon EC2 中的现有密钥，也可以在创建 Amazon MWAA 环境的同一区域创建新密钥。

```
eksctl create cluster \
--name mwaa-eks \
--region us-west-2 \
--version 1.18 \
--nodegroup-name linux-nodes \
--nodes 3 \
--nodes-min 1 \
--nodes-max 4 \
--with-oidc \
--ssh-access \
--ssh-public-key MyPublicKey \
--managed \
--vpc-public-subnets "subnet-11111111111111111, subnet-2222222222222222222" \
--vpc-private-subnets "subnet-33333333333333333, subnet-44444444444444444"
```

完成集群的创建需要一段时间。完成后，您可以使用以下命令验证集群是否已成功创建并配置了 IAM OIDC 提供商：

```
eksctl utils associate-iam-oidc-provider \
--region us-west-2 \
--cluster mwaa-eks \
--approve
```

## 创建 `mwaa` 命名空间
<a name="eksctl-namespace"></a>

确认集群已成功创建后，使用以下命令为 pod 创建命名空间。

```
kubectl create namespace mwaa
```

## 为 `mwaa` 命名空间创建角色
<a name="eksctl-role"></a>

创建命名空间后，在 EKS 上为可在 MWAA 命名空间中运行 pod 的 Amazon MWAA 用户创建角色和角色绑定。如果您为命名空间使用了不同的名称，请将 `-n mwaa` 中的 mwaa 名称替换为您使用的名称。

```
cat << EOF | kubectl apply -f - -n mwaa
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: mwaa-role
rules:
  - apiGroups:
  - ""
  - "apps"
  - "batch"
  - "extensions"
resources:      
  - "jobs"
  - "pods"
  - "pods/attach"
			- "pods/exec"
  - "pods/log"
  - "pods/portforward"
  - "secrets"
  - "services"
verbs:
  - "create"
  - "delete"
  - "describe"
  - "get"
  - "list"
  - "patch"
  - "update"
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: mwaa-role-binding
  subjects:
    - kind: User
  name: mwaa-service
  roleRef:
    kind: Role
  name: mwaa-role
  apiGroup: rbac.authorization.k8s.io
EOF
```

运行以下命令来确认新角色可以访问 Amazon EKS 集群。如果您没有使用以下名称，请务必使用正确的名称*mwaa*：

```
kubectl get pods -n mwaa --as mwaa-service
```

您会收到一条含有如下内容的消息：

```
No resources found in mwaa namespace.
```

## 创建并附加 Amazon EKS 集群的 IAM 角色
<a name="eksctl-iam-role"></a>

您必须创建一个 IAM 角色，然后将其绑定到 Amazon EKS（k8s）集群，这样该角色才能通过 IAM 进行身份验证。该角色仅用于登录集群，没有任何控制台或 API 调用的权限。

使用 [Amazon MWAA 执行角色](mwaa-create-role.md) 中的步骤为 Amazon MWAA 环境创建新角色。但是，与其创建和附加该主题中描述的策略，不如附加以下策略：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "airflow:PublishMetrics",
            "Resource": "arn:aws:airflow:us-east-1:111122223333:environment/${MWAA_ENV_NAME}"
        },
        {
            "Effect": "Deny",
            "Action": "s3:ListAllMyBuckets",
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*"
            ],
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "logs:GetLogEvents",
                "logs:GetLogRecord",
                "logs:GetLogGroupFields",
                "logs:GetQueryResults",
                "logs:DescribeLogGroups"
            ],
            "Resource": [
            "arn:aws:logs:us-east-1:111122223333:log-group:airflow-${MWAA_ENV_NAME}-*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "cloudwatch:PutMetricData",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ChangeMessageVisibility",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:ReceiveMessage",
                "sqs:SendMessage"
            ],
            "Resource": "arn:aws:sqs:us-east-1:*:airflow-celery-*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "kms:DescribeKey",
                "kms:GenerateDataKey*",
                "kms:Encrypt"
            ],
            "NotResource": "arn:aws:kms:*:111122223333:key/*",
            "Condition": {
                "StringLike": {
                    "kms:ViaService": [
                    "sqs.us-east-1.amazonaws.com"
                    ]
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "eks:DescribeCluster"
            ],
            "Resource": "arn:aws:eks:us-east-1:111122223333:cluster/${EKS_CLUSTER_NAME}"
        }
    ]
}
```

------

创建角色后，编辑 Amazon MWAA 环境，使用您创建的角色作为环境的执行角色。要更改角色，请编辑要使用的环境。您可以在“**权限**”下选择执行角色。

**已知问题：**
+ 角色 ARNs 存在一个已知问题，子路径无法通过 Amazon EKS 进行身份验证。解决方法是手动创建服务角色，而不是使用 Amazon MWAA 自己创建的服务角色。要了解更多信息，请参阅[在 aws-auth ConfigMap 中当 ARN 包含路径时，带有路径的角色不起作用](https://github.com/kubernetes-sigs/aws-iam-authenticator/issues/268)
+ 如果 IAM 中没有 Amazon MWAA 服务列表，则需要选择备用服务策略，例如 Amazon EC2，然后更新该角色的信任策略以匹配以下内容：

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": [
            "airflow-env.amazonaws.com",
            "airflow.amazonaws.com"
          ]
        },
        "Action": "sts:AssumeRole"
      }
    ]
  }
  ```

------

  要了解更多信息，请参阅[如何在 IAM 角色中使用信任策略](https://aws.amazon.com/blogs/security/how-to-use-trust-policies-with-iam-roles/)。

## 创建 requirements.txt 文件
<a name="eksctl-requirements"></a>

要使用本节中的示例代码，请确保已向 `requirements.txt` 中添加了以下数据库选项之一。要了解更多信息，请参阅 [安装 Python 依赖项](working-dags-dependencies.md)。

```
kubernetes
apache-airflow[cncf.kubernetes]==3.0.0
```

## 为 Amazon EKS 创建身份映射
<a name="eksctl-identity-map"></a>

使用您在以下命令中创建的角色的 ARN 为 Amazon EKS 创建身份映射。将区域*us-east-1*更改为创建环境的区域。替换角色的 ARN，最后替换*mwaa-execution-role*为环境的执行角色。

```
eksctl create iamidentitymapping \
--region us-east-1 \
--cluster mwaa-eks \
--arn arn:aws:iam::123456789012:role/mwaa-execution-role \
--username mwaa-service
```

## 创建 `kubeconfig`
<a name="eksctl-kube-config"></a>

使用以下命令创建 `kubeconfig`：

```
aws eks update-kubeconfig \
--region us-west-2 \
--kubeconfig ./kube_config.yaml \
--name mwaa-eks \
--alias aws
```

如果您在运行 `update-kubeconfig` 时使用了特定的配置文件，则需要删除添加到 kube\$1config.yaml 文件中的 `env:` 部分，这样它才能在 Amazon MWAA 中正常运行。为此，请从文件中删除以下内容，然后将其保存：

```
env:
 - name: AWS_PROFILE
 value: profile_name
```

## 创建 DAG
<a name="eksctl-create-dag"></a>

使用以下代码示例创建 Python 文件，例如 DAG 的 `mwaa_pod_example.py` 文件。

```
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

default_args = {
   'owner': 'aws',
   'depends_on_past': False,
   'start_date': datetime(2019, 2, 20),
   'provide_context': True
}

dag = DAG(
   'kubernetes_pod_example', default_args=default_args, schedule_interval=None)

#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'

podRun = KubernetesPodOperator(
                       namespace="mwaa",
                       image="ubuntu:18.04",
                       cmds=["bash"],
                       arguments=["-c", "ls"],
                       labels={"foo": "bar"},
                       name="mwaa-pod-test",
                       task_id="pod-task",
                       get_logs=True,
                       dag=dag,
                       is_delete_operator_pod=False,
                       config_file=kube_config_path,
                       in_cluster=False,
                       cluster_context='aws'
                       )
```

## 将 DAG 和 `kube_config.yaml` 添加到 Amazon S3 存储桶中
<a name="eksctl-dag-bucket"></a>

将您创建的 DAG 和 `kube_config.yaml` 文件放入 Amazon MWAA 环境的 Amazon S3 存储桶中。您可以使用 Amazon S3 控制台或 AWS Command Line Interface将所有文件放入存储桶中。

## 启用并触发示例
<a name="eksctl-trigger-pod"></a>

在 Apache Airflow 中，启用该示例，然后将其触发。

成功运行并完成后，使用以下命令验证 Pod：

```
kubectl get pods -n mwaa
```

您将获得与下内容类似的输出：

```
NAME READY STATUS RESTARTS AGE
mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s
```

然后，您可以使用以下命令验证 Pod 的输出。请将名称值替换为上一个命令返回的值：

```
kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888
```