를 사용하여 SSH 연결 만들기 SSHOperator - Amazon Managed Workflows for Apache Airflow

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

를 사용하여 SSH 연결 만들기 SSHOperator

다음 예제에서는 Apache Airflow용 Amazon Managed Workflow 환경에서 DAG 를 사용하여 원격 Amazon EC2 인스턴스에 연결하는 방법을 설명합니다. SSHOperator 유사한 접근 방식을 사용하여 액세스 권한이 있는 모든 원격 인스턴스에 연결할 수 있습니다. SSH

다음 예제에서는 Amazon S3의 환경 SSH dags 디렉터리에 비밀 키 (.pem) 를 업로드합니다. 그런 다음 requirements.txt을(를) 사용하여 필요한 종속성을 설치하고 UI에 새 Apache Airflow 연결을 생성합니다. 마지막으로 원격 인스턴스에 대한 SSH 연결을 DAG 생성하는 a를 작성합니다.

버전

  • 이 페이지의 코드 예제를 Python 3.10의 아파치 에어플로우 v2와 함께 사용할 수 있습니다.

사전 조건

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.

권한

  • 이 페이지의 코드 예제를 사용하는 데 추가 권한이 필요하지 않습니다.

요구 사항

웹 서버에 apache-airflow-providers-ssh 패키지를 설치하려면 다음 파라미터를 requirements.txt에 추가합니다. 환경이 업데이트되고 Amazon이 종속 항목을 MWAA 성공적으로 설치하면 UI에 새 SSH연결 유형이 표시됩니다.

-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt apache-airflow-providers-ssh
참고

-c에서 제약 URL 조건을 정의합니다. requirements.txt 이렇게 하면 Amazon이 사용자 환경에 맞는 올바른 패키지 버전을 MWAA 설치할 수 있습니다.

암호 키를 Amazon S3에 복사

다음 AWS Command Line Interface 명령을 사용하여 Amazon S3의 환경 dags 디렉터리에 .pem 키를 복사합니다.

$ aws s3 cp your-secret-key.pem s3://your-bucket/dags/

Amazon은 .pem 키를 dags 포함한 콘텐츠를 로컬 /usr/local/airflow/dags/ 디렉터리에 MWAA 복사합니다. 이렇게 하면 Apache Airflow가 키에 액세스할 수 있습니다.

새 Apache Airflow 연결 생성

아파치 에어플로우 SSH UI를 사용하여 새 연결을 만들려면
  1. Amazon MWAA 콘솔에서 환경 페이지를 엽니다.

  2. 환경 목록에서 사용자 환경에 맞는 Airflow UI 열기를 선택합니다.

  3. Apache Airflow UI 페이지의 상단 내비게이션 바에서 관리자를 선택하여 드롭다운 목록을 확장한 다음 연결을 선택합니다.

  4. 연결 목록 페이지에서 +를 선택하거나 새 레코드 추가 버튼을 선택하여 새 연결을 추가합니다.

  5. 연결 추가 페이지에서 다음 정보를 추가합니다.

    1. 연결 IDssh_new를 입력합니다.

    2. 연결 유형의 경우 드롭다운 SSH목록에서 선택합니다.

      참고

      목록에 해당 SSH연결 유형이 없는 경우 Amazon은 필요한 apache-airflow-providers-ssh 패키지를 설치하지 MWAA 않은 것입니다. 이 패키지를 포함하도록 requirements.txt 파일을 업데이트한 다음 다시 시도하십시오.

    3. 호스트에 연결하려는 Amazon EC2 인스턴스의 IP 주소를 입력합니다. 예: 12.345.67.89.

    4. 사용자 이름에 Amazon EC2 인스턴스에 연결 ec2-user 중인지 여부를 입력합니다. Apache Airflow를 연결하려는 원격 인스턴스의 유형에 따라 사용자 이름이 달라질 수 있습니다.

    5. Extra의 경우 다음 키-값 쌍을 형식으로 JSON 입력합니다.

      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }

      이 키-값 쌍은 Apache Airflow가 로컬 /dags 디렉터리에서 암호 키를 찾도록 지시합니다.

코드 샘플

다음은 SSHOperator 를 DAG 사용하여 대상 Amazon EC2 인스턴스에 연결한 다음 hostname Linux 명령을 실행하여 인스턴스 이름을 인쇄합니다. 원격 인스턴스에서 모든 명령 또는 스크립트를 DAG 실행하도록 을 수정할 수 있습니다.

  1. 터미널을 열고 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

    cd dags
  2. 다음 코드 샘플의 내용을 복사하고 로컬에서 ssh.py로 저장합니다.

    from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
  3. 다음 AWS CLI 명령을 실행하여 환경 DAG 버킷에 복사한 다음 Apache Airflow UI를 DAG 사용하여 트리거합니다.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 성공하면 의 작업 ssh_task 로그에 다음과 비슷한 출력이 표시됩니다. ssh_operator_example DAG

    [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
    Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
    [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4)
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
    [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
    [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
    [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916