Apache-Kafka-Konnektor von Amazon Athena - Amazon Athena

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Apache-Kafka-Konnektor von Amazon Athena

Der Amazon Athena-Connector für Apache Kafka ermöglicht es Amazon Athena, SQL Abfragen zu Ihren Apache Kafka-Themen auszuführen. Verwenden Sie diesen Konnektor, um Apache-Kafka-Themen als Tabellen und Nachrichten als Zeilen in Athena anzuzeigen.

Dieser Konnektor verwendet keine Glue-Verbindungen, um die Konfigurationseigenschaften in Glue zu zentralisieren. Die Verbindungskonfiguration erfolgt über Lambda.

Voraussetzungen

Stellen Sie den Konnektor für Ihr AWS-Konto mithilfe der Athena-Konsole oder AWS Serverless Application Repository bereit. Weitere Informationen finden Sie unter Erstellen Sie eine Datenquellenverbindung oder Verwenden Sie den AWS Serverless Application Repository , um einen Datenquellenconnector bereitzustellen.

Einschränkungen

  • DDLSchreibvorgänge werden nicht unterstützt.

  • Alle relevanten Lambda-Grenzwerte. Weitere Informationen finden Sie unter Lambda quotas (Lambda-Kontingente) im AWS Lambda -Entwicklerhandbuch.

  • Datums- und Zeitstempeldatentypen in Filterbedingungen müssen in geeignete Datentypen umgewandelt werden.

  • Datentypen Datum und Zeitstempel werden für den CSV Dateityp nicht unterstützt und als Varchar-Werte behandelt.

  • Die Zuordnung zu verschachtelten JSON Feldern wird nicht unterstützt. Der Konnektor ordnet nur Felder der obersten Ebene zu.

  • Der Konnektor unterstützt keine komplexen Typen. Komplexe Typen werden als Zeichenfolgen interpretiert.

  • Um komplexe JSON Werte zu extrahieren oder mit ihnen zu arbeiten, verwenden Sie die entsprechenden JSON Funktionen, die in Athena verfügbar sind. Weitere Informationen finden Sie unter Extrahieren Sie JSON Daten aus Zeichenketten.

  • Der Konnektor unterstützt keinen Zugriff auf Kafka-Nachrichtenmetadaten.

Bedingungen

  • Metadaten-Handler – Ein Lambda-Handler, der Metadaten von Ihrer Datenbank-Instance abruft.

  • Record Handler – Ein Lambda-Handler, der Datensätze aus Ihrer Datenbank-Instance abruft.

  • Composite Handler – Ein Lambda-Handler, der sowohl Metadaten als auch Datensätze aus Ihrer Datenbank-Instance abruft.

  • Kafka-Endpunkt – Eine Textzeichenfolge, die eine Verbindung zu einer Kafka-Instance herstellt.

Cluster-Kompatibilität

Der Kafka-Konnektor kann mit den folgenden Cluster-Typen verwendet werden.

Zu Confluent verbinden

Die Verbindung zu Confluent erfordert die folgenden Schritte:

  1. APIGenerieren Sie einen Schlüssel aus Confluent.

  2. Speichern Sie den Benutzernamen und das Passwort für den API Confluent-Schlüssel in. AWS Secrets Manager

  3. Geben Sie den geheimen Namen für die secrets_manager_secret-Umgebungsvariable im Kafka-Konnektor an.

  4. Folgen Sie den Schritten im Den Kafka-Konnektor einrichten-Abschnitt dieses Dokuments.

Unterstützte Authentifizierungsmethoden

Der Konnektor unterstützt die folgenden Authentifizierungsmethoden.

  • SSL

  • SASL/SCRAM

  • SASL/PLAIN

  • SASL/PLAINTEXT

  • NEIN_ AUTH

  • Selbstverwaltete Kafka- und Confluent-Plattform —,, NO_ SSL SASL/SCRAM, SASL/PLAINTEXT AUTH

  • Selbstverwaltete Kafka und Confluent Cloud —/SASLPLAIN

Weitere Informationen finden Sie unter Konfigurieren der Authentifizierung für den Athena-Kafka-Konnektor.

Unterstützte Eingabedatenformate

Der Konnektor unterstützt die folgenden Eingabedatenformate.

  • JSON

  • CSV

  • AVRO

  • PROTOBUF (PROTOCOL BUFFERS)

Parameter

Verwenden Sie die Parameter in diesem Abschnitt, um den Athena Kafka-Konnektor zu konfigurieren.

  • auth_type – Gibt den Authentifizierungstyp des Clusters an. Der Konnektor unterstützt die folgenden Arten der Authentifizierung:

    • NO_ AUTH — Stellen Sie eine direkte Verbindung zu Kafka her (z. B. mit einem Kafka-Cluster, der über eine EC2 Instanz bereitgestellt wird, die keine Authentifizierung verwendet).

    • SASL_SSL_ PLAIN — Diese Methode verwendet das SASL_SSL Sicherheitsprotokoll und den Mechanismus. PLAIN SASL Weitere Informationen finden Sie unter SASLKonfiguration in der Apache Kafka-Dokumentation.

    • SASL_PLAINTEXT_ PLAIN — Diese Methode verwendet das SASL_PLAINTEXT Sicherheitsprotokoll und den PLAIN SASL Mechanismus. Weitere Informationen finden Sie unter SASLKonfiguration in der Apache Kafka-Dokumentation.

    • SASL_SSL_ SCRAM _ SHA512 — Sie können diesen Authentifizierungstyp verwenden, um den Zugriff auf Ihre Apache Kafka-Cluster zu kontrollieren. Diese Methode speichert den Benutzernamen und das Passwort in AWS Secrets Manager. Das Geheimnis muss dem Kafka-Cluster zugeordnet sein. Weitere Informationen finden Sie unter Authentifizierung mitSASL/SCRAMin der Apache Kafka-Dokumentation.

    • SASL_PLAINTEXT_ SCRAM _ SHA512 — Diese Methode verwendet das SASL_PLAINTEXT Sicherheitsprotokoll und den SCRAM_SHA512 SASL Mechanismus. Diese Methode verwendet Ihren Benutzernamen und Ihr Passwort, die in gespeichert sind AWS Secrets Manager. Weitere Informationen finden Sie im Abschnitt zur SASLKonfiguration der Apache Kafka-Dokumentation.

    • SSL— Die SSL Authentifizierung verwendet Schlüsselspeicher- und Vertrauensspeicherdateien, um eine Verbindung mit dem Apache Kafka-Cluster herzustellen. Sie müssen die Trust-Store- und Key-Store-Dateien generieren, sie in einen Amazon-S3-Bucket hochladen und die Referenz auf Amazon S3 angeben, wenn Sie den Konnektor bereitstellen. Der Schlüsselspeicher, der Vertrauensspeicher und der SSL Schlüssel werden in AWS Secrets Manager gespeichert. Ihr Client muss den AWS geheimen Schlüssel bereitstellen, wenn der Connector bereitgestellt wird. Weitere Informationen finden Sie unter Verschlüsselung und Authentifizierung SSL in der Apache Kafka-Dokumentation.

      Weitere Informationen finden Sie unter Konfigurieren der Authentifizierung für den Athena-Kafka-Konnektor.

  • certificates_s3_reference – Der Amazon-S3-Speicherort, der die Zertifikate enthält (die Schlüsselspeicher- und Vertrauensspeicherdateien).

  • disable_spill_encryption – (Optional) Bei Einstellung auf True, wird die Spill-Verschlüsselung deaktiviert. Die Standardeinstellung ist False so, dass Daten, die auf S3 übertragen werden, mit AES - GCM — entweder mit einem zufällig generierten Schlüssel oder KMS zur Generierung von Schlüsseln verschlüsselt werden. Das Deaktivieren der Überlauf-Verschlüsselung kann die Leistung verbessern, insbesondere wenn Ihr Überlauf-Standort eine serverseitige Verschlüsselung verwendet.

  • kafka_endpoint – Die Endpunktdetails, die Kafka bereitgestellt werden sollen.

  • schema_registry_url — Die URL Adresse für die Schemaregistrierung (zum Beispiel). http://schema-registry.example.org:8081 Gilt für die Datenformate undAVRO. PROTOBUF

  • secrets_manager_secret – Der Name des AWS -Geheimnisses, in dem die Anmeldeinformationen gespeichert sind.

  • Überlauf-Parameter – Lambda-Funktionen speichern vorübergehend („Überlauf“) Daten, die nicht in den Speicher von Amazon S3 passen. Alle Datenbank-Instances, auf die mit derselben Lambda-Funktion zugegriffen wird, werden an denselben Speicherort verschoben. Verwenden Sie die Parameter in der folgenden Tabelle, um den Überlauf-Standort anzugeben.

    Parameter Beschreibung
    spill_bucket Erforderlich Der Name des Amazon-S3-Buckets, in den die Lambda-Funktion Daten übertragen kann.
    spill_prefix Erforderlich Das Präfix innerhalb des Überlauf-Buckets, in dem die Lambda-Funktion Daten ausgeben kann.
    spill_put_request_headers (Optional) Eine JSON kodierte Zuordnung von Anforderungsheadern und Werten für die Amazon S3 putObject S3-Anfrage, die zum Verschicken verwendet wird (z. B.). {"x-amz-server-side-encryption" : "AES256"} Weitere mögliche Header finden Sie PutObjectin der Amazon Simple Storage Service API Reference.
  • Subnetz IDs — Ein oder mehrere SubnetzeIDs, die dem Subnetz entsprechen, das die Lambda-Funktion für den Zugriff auf Ihre Datenquelle verwenden kann.

    • Öffentlicher Kafka-Cluster oder standardmäßiger Confluent Cloud-Cluster — Ordnen Sie den Connector einem privaten Subnetz zu, das über ein Gateway verfügt. NAT

    • Confluent-Cloud-Cluster mit privater Konnektivität – Ordnen Sie den Konnektor einem privaten Subnetz zu, das eine Route zum Confluent-Cloud-Cluster hat.

      • Für AWS Transit Gateway müssen sich die Subnetze in einem befindenVPC, das mit demselben Transit-Gateway verbunden ist, das Confluent Cloud verwendet.

      • Für VPCPeering müssen sich die Subnetze in einem befinden, das über Peering mit Confluent Cloud VPC verbunden ist. VPC

      • Denn die Subnetze müssen sich in einem System befinden AWS PrivateLink, das eine Route zu den VPC Endpunkten hatVPC, die eine Verbindung zu Confluent Cloud herstellen.

Anmerkung

Wenn Sie den Connector VPC in einem bereitstellen, um auf private Ressourcen zuzugreifen und auch eine Verbindung zu einem öffentlich zugänglichen Dienst wie Confluent herstellen möchten, müssen Sie den Connector einem privaten Subnetz zuordnen, das über ein Gateway verfügt. NAT Weitere Informationen finden Sie unter NATGateways im VPC Amazon-Benutzerhandbuch.

Datentypunterstützung

Die folgende Tabelle zeigt die entsprechenden Datentypen, die für Kafka und Apache Arrow unterstützt werden.

Kafka Arrow
CHAR VARCHAR
VARCHAR VARCHAR
TIMESTAMP MILLISECOND
DATE DAY
BOOLEAN BOOL
SMALLINT SMALLINT
INTEGER INT
BIGINT BIGINT
DECIMAL FLOAT8
DOUBLE FLOAT8

Partitionen und Splits

Kafka-Themen sind in Partitionen unterteilt. Jede Partition ist geordnet. Jede Nachricht in einer Partition hat eine inkrementelle ID, die Offset genannt wird. Jede Kafka-Partition wird für die parallele Verarbeitung in weitere Splits unterteilt. Daten sind für den in Kafka-Clustern konfigurierten Aufbewahrungszeitraum verfügbar.

Bewährte Methoden

Verwenden Sie als bewährte Methode bei der Abfrage von Athena das Prädikat-Pushdown, wie in den folgenden Beispielen.

SELECT * FROM "kafka_catalog_name"."glue_schema_registry_name"."glue_schema_name" WHERE integercol = 2147483647
SELECT * FROM "kafka_catalog_name"."glue_schema_registry_name"."glue_schema_name" WHERE timestampcol >= TIMESTAMP '2018-03-25 07:30:58.878'

Den Kafka-Konnektor einrichten

Bevor Sie den Konnektor verwenden können, müssen Sie Ihren Apache-Kafka-Cluster einrichten, die AWS Glue -Schema-Registry verwenden, um Ihr Schema zu definieren, und die Authentifizierung für den Konnektor konfigurieren.

Beachten Sie bei der Arbeit mit der AWS Glue Schema Registry die folgenden Punkte:

  • Stellen Sie sicher, dass der Text im Feld Description (Beschreibung) des AWS Glue -Schema-Registry die Zeichenfolge {AthenaFederationKafka} enthält. Diese Markierungszeichenfolge ist für AWS Glue Registries erforderlich, die Sie mit dem Amazon Athena Kafka-Konnektor verwenden.

  • Verwenden Sie für Ihre Datenbanknamen und -tabellen nur Kleinbuchstaben, um eine optimale Leistung zu erzielen. Bei der Verwendung gemischter Groß- und Kleinschreibung führt der Konnektor eine Suche ohne Berücksichtigung der Groß- und Kleinschreibung durch, die rechenintensiver ist.

Um Ihre Apache Kafka-Umgebung und Schema Registry einzurichten AWS Glue
  1. Richten Sie Ihre Apache-Kafka-Umgebung ein.

  2. Laden Sie die Kafka-Themenbeschreibungsdatei (d. h. ihr Schema) im JSON Format in die AWS Glue Schemaregistry hoch. Weitere Informationen finden Sie unter Integration mit AWS Glue Schema Registry im AWS Glue Entwicklerhandbuch.

  3. Um das PROTOBUF Datenformat AVRO oder zu verwenden, wenn Sie das Schema in der AWS Glue Schemaregistrierung:

    • Geben Sie als Schemaname den Namen des Kafka-Themas in derselben Groß-/Kleinschreibung wie das Original ein.

    • Wählen Sie als Datenformat Apache Avro oder Protocol Buffers aus.

    Beispielschemata finden Sie im folgenden Abschnitt.

Verwenden Sie das Format der Beispiele in diesem Abschnitt, wenn Sie Ihr Schema in das AWS Glue -Schema Registry hochladen.

JSONBeispiel für ein Typschema

Im folgenden Beispiel wird das Schema, das in der AWS Glue Schemaregistry erstellt werden soll, json als Wert für angegeben dataFormat und datatypejson für verwendettopicName.

Anmerkung

Der Wert für topicName sollte die gleiche Schreibweise wie der Themenname in Kafka verwenden.

{ "topicName": "datatypejson", "message": { "dataFormat": "json", "fields": [ { "name": "intcol", "mapping": "intcol", "type": "INTEGER" }, { "name": "varcharcol", "mapping": "varcharcol", "type": "VARCHAR" }, { "name": "booleancol", "mapping": "booleancol", "type": "BOOLEAN" }, { "name": "bigintcol", "mapping": "bigintcol", "type": "BIGINT" }, { "name": "doublecol", "mapping": "doublecol", "type": "DOUBLE" }, { "name": "smallintcol", "mapping": "smallintcol", "type": "SMALLINT" }, { "name": "tinyintcol", "mapping": "tinyintcol", "type": "TINYINT" }, { "name": "datecol", "mapping": "datecol", "type": "DATE", "formatHint": "yyyy-MM-dd" }, { "name": "timestampcol", "mapping": "timestampcol", "type": "TIMESTAMP", "formatHint": "yyyy-MM-dd HH:mm:ss.SSS" } ] } }

CSVBeispiel für ein Typschema

Im folgenden Beispiel wird das Schema, das in der AWS Glue Schemaregistry erstellt werden soll, csv als Wert für angegeben dataFormat und datatypecsvbulk für verwendettopicName. Der Wert für topicName sollte die gleiche Schreibweise wie der Themenname in Kafka verwenden.

{ "topicName": "datatypecsvbulk", "message": { "dataFormat": "csv", "fields": [ { "name": "intcol", "type": "INTEGER", "mapping": "0" }, { "name": "varcharcol", "type": "VARCHAR", "mapping": "1" }, { "name": "booleancol", "type": "BOOLEAN", "mapping": "2" }, { "name": "bigintcol", "type": "BIGINT", "mapping": "3" }, { "name": "doublecol", "type": "DOUBLE", "mapping": "4" }, { "name": "smallintcol", "type": "SMALLINT", "mapping": "5" }, { "name": "tinyintcol", "type": "TINYINT", "mapping": "6" }, { "name": "floatcol", "type": "DOUBLE", "mapping": "7" } ] } }

AVROBeispiel für ein Typschema

Das folgende Beispiel wird verwendet, um ein AVRO basiertes Schema in der AWS Glue Schemaregistry zu erstellen. Wenn Sie das Schema in der AWS Glue Schemaregistry definieren, geben Sie für Schemaname den Namen des Kafka-Themas in derselben Groß-/Kleinschreibung wie das Original ein, und für Datenformat wählen Sie Apache Avro. Da Sie diese Informationen direkt in der Registrierung angeben, sind die topicName Felder dataformat und nicht erforderlich.

{ "type": "record", "name": "avrotest", "namespace": "example.com", "fields": [{ "name": "id", "type": "int" }, { "name": "name", "type": "string" } ] }

PROTOBUFBeispiel für ein Typschema

Das folgende Beispiel wird verwendet, um ein PROTOBUF basiertes Schema in der AWS Glue Schemaregistry zu erstellen. Wenn Sie das Schema in der AWS Glue Schemaregistry definieren, geben Sie für Schemaname den Namen des Kafka-Themas in derselben Groß-/Kleinschreibung wie das Original ein, und für Datenformat wählen Sie Protocol Buffers. Da Sie diese Informationen direkt in der Registrierung angeben, sind die topicName Felder dataformat und nicht erforderlich. Die erste Zeile definiert das Schema alsPROTOBUF.

syntax = "proto3"; message protobuftest { string name = 1; int64 calories = 2; string colour = 3; }

Weitere Informationen zum Hinzufügen einer Registrierung und von Schemas zur Schemaregistrierung finden Sie in der AWS Glue AWS Glue Dokumentation unter Erste Schritte mit Schema Registry.

Konfigurieren der Authentifizierung für den Athena-Kafka-Konnektor

Sie können eine Vielzahl von Methoden verwenden, um sich bei Ihrem Apache Kafka-Cluster zu authentifizieren, darunter. SSL SASL/SCRAM, SASL/PLAIN, and SASL/PLAINTEXT

In der folgenden Tabelle sind die Authentifizierungstypen für den Connector sowie das Sicherheitsprotokoll und der SASL Sicherheitsmechanismus für jeden Konnektor aufgeführt. Weitere Informationen zur Sicherheit von Kafka finden Sie unter Sicherheit in der Apache-Kafka-Dokumentation.

auth_type security.protocol sasl.mechanik Cluster-Typ-Kompatibilität
SASL_SSL_PLAIN SASL_SSL PLAIN
  • Selbstverwaltetes Kafka

  • Confluent-Plattform

  • Confluent-Cloud

SASL_PLAINTEXT_PLAIN SASL_PLAINTEXT PLAIN
  • Selbstverwaltetes Kafka

  • Confluent-Plattform

SASL_SSL_SCRAM_SHA512 SASL_SSL SCRAM-SHA-512
  • Selbstverwaltetes Kafka

  • Confluent-Plattform

SASL_PLAINTEXT_SCRAM_SHA512 SASL_PLAINTEXT SCRAM-SHA-512
  • Selbstverwaltetes Kafka

  • Confluent-Plattform

SSL SSL N/A
  • Selbstverwaltetes Kafka

  • Confluent-Plattform

SSL

Wenn der Cluster SSL authentifiziert ist, müssen Sie die Trust Store- und Key Store-Dateien generieren und sie in den Amazon S3 S3-Bucket hochladen. Sie müssen diese Amazon-S3-Referenz angeben, wenn Sie den Konnektor bereitstellen. Der Schlüsselspeicher, der Vertrauensspeicher und der SSL Schlüssel werden im AWS Secrets Manager gespeichert. Sie geben den AWS geheimen Schlüssel an, wenn Sie den Connector bereitstellen.

Informationen zum Erstellen eines Geheimnisses im Secrets Manager finden Sie unter Erstellen eines AWS Secrets Manager -Geheimnisses.

Um diesen Authentifizierungstyp zu verwenden, legen Sie die Umgebungsvariablen wie in der folgenden Tabelle gezeigt fest.

Parameter Wert
auth_type SSL
certificates_s3_reference Der Amazon-S3-Speicherort, der die Zertifikate enthält.
secrets_manager_secret Der Name Ihres AWS geheimen Schlüssels.

Nachdem Sie ein Geheimnis in Secrets Manager erstellt haben, können Sie es in der Secrets-Manager-Konsole anzeigen.

So zeigen Sie Ihr Geheimnis in Secrets Manager an
  1. Öffnen Sie die Secrets Manager Manager-Konsole unter https://console.aws.amazon.com/secretsmanager/.

  2. Wählen Sie im Navigationsbereich Secrets (Geheimnisse).

  3. Wählen Sie auf der Seite Secrets (Geheimnisse) den Link zu Ihrem Geheimnis aus.

  4. Wählen Sie auf der Detailseite für Ihr Geheimnis die Option Retrieve secret value (Geheimniswert abrufen).

    Das folgende Image zeigt ein Beispiel für ein Geheimnis mit drei Schlüssel/Wert-Paaren: keystore_password, truststore_password und ssl_key_password.

    Ein SSL Geheimnis in Secrets Manager abrufen

Weitere Informationen zur Verwendung SSL mit Kafka finden Sie unter Verschlüsselung und Authentifizierung SSL in der Apache Kafka-Dokumentation.

SASL/SCRAM

Wenn Ihr Cluster SCRAM Authentifizierung verwendet, geben Sie den Secrets Manager Manager-Schlüssel an, der dem Cluster zugeordnet ist, wenn Sie den Connector bereitstellen. Die AWS -Anmeldeinformationen des Benutzers (geheimer Schlüssel und Zugriffsschlüssel) werden für die Authentifizierung mit dem Cluster verwendet.

Legen Sie die Umgebungsvariablen wie in der folgenden Tabelle angegeben fest.

Parameter Wert
auth_type SASL_SSL_SCRAM_SHA512
secrets_manager_secret Der Name Ihres AWS geheimen Schlüssels.

Das folgende Image zeigt ein Beispiel für ein Geheimnis in der Secrets-Manager-Konsole mit zwei Schlüssel/Wert-Paaren: eines für username und eines für password.

Ein SCRAM Geheimnis in Secrets Manager abrufen

Weitere Informationen zur Verwendung vonSASL/SCRAMmit Kafka finden Sie unter Authentifizierung mitSASL/SCRAMin der Apache Kafka-Dokumentation.

Lizenzinformationen

Durch die Verwendung dieses Connectors erkennen Sie die Einbindung von Komponenten von Drittanbietern an. Eine Liste dieser Komponenten finden Sie in der Datei pom.xml für diesen Connector, und Sie stimmen den Bedingungen der jeweiligen Drittanbieterlizenzen zu, die in der LICENSE.txt-Datei auf .com bereitgestellt werden. GitHub

Weitere Ressourcen

Weitere Informationen zu diesem Connector finden Sie auf der entsprechenden Website auf GitHub .com.