Amazon MWAA mit Amazon RDS für Microsoft SQL Server verwenden - Amazon Managed Workflows für Apache Airflow

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Amazon MWAA mit Amazon RDS für Microsoft SQL Server verwenden

Sie können Amazon Managed Workflows for Apache Airflow verwenden, um eine Verbindung zu einem RDSSQLFor-Server herzustellen. Der folgende Beispielcode wird in einer DAGs Amazon Managed Workflows for Apache Airflow-Umgebung verwendet, um eine Verbindung zu einem Amazon RDS for Microsoft SQL Server herzustellen und Abfragen auf diesem auszuführen.

Version

  • Der Beispielcode auf dieser Seite kann mit Apache Airflow v1 in Python 3.7 verwendet werden.

  • Sie können das Codebeispiel auf dieser Seite mit Apache Airflow v2 in Python 3.10 verwenden.

Voraussetzungen

Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:

  • Eine MWAAAmazon-Umgebung.

  • Amazon MWAA und der RDS for SQL Server laufen im selben AmazonVPC/

  • VPCDie Sicherheitsgruppen von Amazon MWAA und dem Server sind mit den folgenden Verbindungen konfiguriert:

    • Eine eingehende Regel für den Port, der für Amazon RDS in der Sicherheitsgruppe MWAA von Amazon 1433 geöffnet ist

    • Oder eine ausgehende Regel für den Port von 1433 Open von Amazon MWAA nach RDS

  • Apache Airflow Connection RDS for SQL Server spiegelt den Hostnamen, den Port, den Benutzernamen und das Passwort aus der RDS SQL Amazon-Serverdatenbank wider, die im vorherigen Prozess erstellt wurde.

Abhängigkeiten

Um den Beispielcode in diesem Abschnitt zu verwenden, fügen Sie Ihrem die folgende Abhängigkeit hinzu. requirements.txt Weitere Informationen hierzu finden Sie unter Python-Abhängigkeiten installieren.

Apache Airflow v2
apache-airflow-providers-microsoft-mssql==1.0.1 apache-airflow-providers-odbc==1.0.1 pymssql==2.2.1
Apache Airflow v1
apache-airflow[mssql]==1.10.12

Apache Airflow v2-Verbindung

Wenn Sie eine Verbindung in Apache Airflow v2 verwenden, stellen Sie sicher, dass das Airflow-Verbindungsobjekt die folgenden Schlüssel-Wert-Paare enthält:

  1. Verbindungs-ID: mssql_default

  2. Verbindungstyp: Amazon Web Services

  3. Gastgeber: YOUR_DB_HOST

  4. Schema:

  5. Einloggen: admin

  6. Passwort:

  7. Hafen: 1433

  8. Zusätzlich:

Codebeispiel

  1. Navigieren Sie in der Befehlszeile zu dem Verzeichnis, in dem Ihr DAG Code gespeichert ist. Beispielsweise:

    cd dags
  2. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal untersql-server.py.

    """ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ import pymssql import logging import sys from airflow import DAG from datetime import datetime from airflow.operators.mssql_operator import MsSqlOperator from airflow.operators.python_operator import PythonOperator default_args = { 'owner': 'aws', 'depends_on_past': False, 'start_date': datetime(2019, 2, 20), 'provide_context': True } dag = DAG( 'mssql_conn_example', default_args=default_args, schedule_interval=None) drop_db = MsSqlOperator( task_id="drop_db", sql="DROP DATABASE IF EXISTS testdb;", mssql_conn_id="mssql_default", autocommit=True, dag=dag ) create_db = MsSqlOperator( task_id="create_db", sql="create database testdb;", mssql_conn_id="mssql_default", autocommit=True, dag=dag ) create_table = MsSqlOperator( task_id="create_table", sql="CREATE TABLE testdb.dbo.pet (name VARCHAR(20), owner VARCHAR(20));", mssql_conn_id="mssql_default", autocommit=True, dag=dag ) insert_into_table = MsSqlOperator( task_id="insert_into_table", sql="INSERT INTO testdb.dbo.pet VALUES ('Olaf', 'Disney');", mssql_conn_id="mssql_default", autocommit=True, dag=dag ) def select_pet(**kwargs): try: conn = pymssql.connect( server='sampledb.<xxxxxx>.<region>.rds.amazonaws.com', user='admin', password='<yoursupersecretpassword>', database='testdb' ) # Create a cursor from the connection cursor = conn.cursor() cursor.execute("SELECT * from testdb.dbo.pet") row = cursor.fetchone() if row: print(row) except: logging.error("Error when creating pymssql database connection: %s", sys.exc_info()[0]) select_query = PythonOperator( task_id='select_query', python_callable=select_pet, dag=dag, ) drop_db >> create_db >> create_table >> insert_into_table >> select_query

Als nächstes

  • Erfahren Sie unter, wie Sie die requirements.txt Datei in diesem Beispiel in Ihren Amazon S3 S3-Bucket hochladenPython-Abhängigkeiten installieren.

  • Erfahren Sie unter, wie Sie den DAG Code in diesem Beispiel in den dags Ordner in Ihrem Amazon S3 S3-Bucket hochladenHinzufügen oder Aktualisieren DAGs.

  • Erkunden Sie Beispielskripte und andere Beispiele für pymssql-Module.

  • Weitere Informationen zum Ausführen von SQL Code in einer bestimmten SQL Microsoft-Datenbank mithilfe des mssql_operator finden Sie im Apache Airflow-Referenzhandbuch.