Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Sie können eine Kafka-Verbindung verwenden, um Kafka-Datenströme zu lesen und in sie zu schreiben, indem Sie Informationen verwenden, die in einer Datenkatalogtabelle gespeichert sind, oder indem Sie Informationen für den direkten Zugriff auf den Datenstrom bereitstellen. Die Verbindung unterstützt einen Kafka-Cluster oder einen Amazon Managed Streaming for Apache Kafka Kafka-Cluster. Sie können Informationen aus Kafka in einen Spark einlesen DataFrame und sie dann in einen AWS Glue DynamicFrame umwandeln. Sie können in einem DynamicFrames JSON Format an Kafka schreiben. Wenn Sie direkt auf den Datenstrom zugreifen, verwenden Sie diese Optionen, um Informationen zum Zugriff auf den Datenstrom bereitzustellen.
Wenn Sie getCatalogSource
oder verwenden, create_data_frame_from_catalog
um Datensätze aus einer Kafka-Streaming-Quelle zu konsumieren getCatalogSink
oder write_dynamic_frame_from_catalog
um Datensätze in Kafka zu schreiben, und der Job verfügt über die Datenkatalogdatenbank und Informationen zum Tabellennamen und kann diese verwenden, um einige grundlegende Parameter für das Lesen aus der Kafka-Streaming-Quelle abzurufen. Wenn SiegetSource
,,getCatalogSink
, createDataFrameFromOptions
oder getSourceWithFormat
getSinkWithFormat
, oder verwendencreate_data_frame_from_options
, müssen Sie diese grundlegenden Parameter mithilfe der hier beschriebenen Verbindungsoptionen angeben. write_dynamic_frame_from_catalog
Sie können die Verbindungsoptionen für Kafka mithilfe der folgenden Argumente für die angegebenen Methoden in der GlueContext
Klasse angeben.
-
Scala
-
connectionOptions
: mitgetSource
,createDataFrameFromOptions
,getSink
verwenden -
additionalOptions
: mitgetCatalogSource
,getCatalogSink
verwenden -
options
: mitgetSourceWithFormat
,getSinkWithFormat
verwenden
-
-
Python
-
connection_options
: mitcreate_data_frame_from_options
,write_dynamic_frame_from_options
verwenden -
additional_options
: mitcreate_data_frame_from_catalog
,write_dynamic_frame_from_catalog
verwenden -
options
: mitgetSource
,getSink
verwenden
-
Hinweise und Einschränkungen zu ETL Streaming-Jobs finden Sie unter. ETLHinweise und Einschränkungen beim Streamen
Themen
Kafka konfigurieren
Es gibt keine AWS Voraussetzungen, um eine Verbindung zu Kafka-Streams herzustellen, die über das Internet verfügbar sind.
Sie können eine AWS Glue Kafka-Verbindung erstellen, um Ihre Verbindungsdaten zu verwalten. Weitere Informationen finden Sie unter Erstellen eines AWS Glue Verbindung für einen Apache Kafka-Datenstrom. Geben Sie in Ihrer AWS Glue-Job-Konfiguration Folgendes an connectionName
als zusätzliche Netzwerkverbindung und geben Sie dann in Ihrem Methodenaufruf Folgendes an connectionName
zum connectionName
Parameter.
In bestimmten Fällen müssen Sie zusätzliche Voraussetzungen konfigurieren:
-
Wenn Sie Amazon Managed Streaming for Apache Kafka mit IAM Authentifizierung verwenden, benötigen Sie eine entsprechende IAM Konfiguration.
-
Wenn Sie Amazon Managed Streaming for Apache Kafka in einem Amazon verwendenVPC, benötigen Sie eine entsprechende VPC Amazon-Konfiguration. Sie müssen eine AWS Glue-Verbindung erstellen, die VPC Amazon-Verbindungsinformationen bereitstellt. Sie benötigen in Ihrer Jobkonfiguration die AWS Glue-Verbindung als zusätzliche Netzwerkverbindung.
Weitere Informationen zu den Voraussetzungen für ETL Streaming-Jobs finden Sie unterETLJobs streamen in AWS Glue.
Beispiel: Aus Kafka-Streams lesen
Verwendet in Verbindung mit forEachBatch.
Beispiel für die Kafka-Streaming-Quelle:
kafka_options =
{ "connectionName": "ConfluentKafka",
"topicName": "kafka-auth-topic",
"startingOffsets": "earliest",
"inferSchema": "true",
"classification": "json"
}
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
Beispiel: In Kafka-Streams schreiben
Beispiele für das Schreiben an Kafka:
Beispiel mit der getSink
Methode:
data_frame_datasource0 =
glueContext.getSink(
connectionType="kafka",
connectionOptions={
JsonOptions("""{
"connectionName": "ConfluentKafka",
"classification": "json",
"topic": "kafka-auth-topic",
"typeOfData": "kafka"}
""")},
transformationContext="dataframe_ApacheKafka_node1711729173428")
.getDataFrame()
Beispiel mit der write_dynamic_frame.from_options
Methode:
kafka_options =
{ "connectionName": "ConfluentKafka",
"topicName": "kafka-auth-topic",
"classification": "json"
}
data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)
Referenz zur Kafka-Verbindungsoption
Verwenden Sie beim Lesen die folgenden Verbindungsoptionen mit"connectionType": "kafka"
:
-
"bootstrap.servers"
(Erforderlich) Eine Liste von URLs Bootstrap-Servern, z. B. alsb-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094
. Diese Option muss im API Aufruf angegeben oder in den Tabellenmetadaten im Datenkatalog definiert werden. -
"security.protocol"
(Erforderlich) Das Protokoll, das für die Kommunikation mit Brokern verwendet wird. Die möglichen Werte sind"SSL"
oder"PLAINTEXT"
. -
"topicName"
(Erforderlich) Eine durch Kommas getrennte Liste von Themen, die abonniert werden sollen. Sie müssen nur eines der folgenden"topicName"
,"assign"
oder"subscribePattern"
angeben. -
"assign"
: (Erforderlich) Eine JSON Zeichenfolge, die angibt, welche DatenTopicPartitions
verwendet werden sollen. Sie müssen nur eines der folgenden"topicName"
,"assign"
oder"subscribePattern"
angeben.Beispiel: '{"topicA":[0,1],"topicB":[2,4]}'
-
"subscribePattern"
: (Erforderlich) Eine Java-Regex-Zeichenfolge, die die Themenliste identifiziert, die abonniert werden soll. Sie müssen nur eines der folgenden"topicName"
,"assign"
oder"subscribePattern"
angeben.Beispiel: 'topic.*'
-
"classification"
(Erforderlich) Das von den Daten im Datensatz verwendete Dateiformat. Erforderlich, sofern nicht in Data Catalog angegeben. -
"delimiter"
(Optional) Das Werttrennzeichen, das verwendet wird, wennclassification
istCSV. Der Standardwert ist „,
„. -
"startingOffsets"
: (Optional) Die Ausgangsposition im Kafka-Thema, aus dem Daten gelesen werden sollen. Die möglichen Werte sind"earliest"
oder"latest"
. Der Standardwert ist"latest"
. -
"startingTimestamp"
: (Optional, nur für AWS Glue Version 4.0 oder höher unterstützt) Der Zeitstempel des Datensatzes im Kafka-Thema, aus dem Daten gelesen werden sollen. Der mögliche Wert ist eine Timestamp-Zeichenfolge im UTC Format des Mustersyyyy-mm-ddTHH:MM:SSZ
(wobei es sich um einen UTC Zeitzonen-Offset mit einem +/-Z
handelt). Zum Beispiel: „2023-04-04T 08:00:00-04:00 „).Hinweis: In der Liste der Verbindungsoptionen des AWS Glue-Streaming-Skripts kann nur eine von startingTimestamp '' oder '' vorhanden sein. Wenn Sie diese beiden Eigenschaften angeben, schlägt der Job fehl. startingOffsets
-
"endingOffsets"
: (Optional) Der Endpunkt, wenn eine Batchabfrage beendet wird. Mögliche Werte sind entweder"latest"
oder eine JSON Zeichenfolge, die jeweils einen Endversatz angibtTopicPartition
.Für die JSON Zeichenfolge lautet das Format
{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
. Der Wert-1
als Offset steht für"latest"
. -
"pollTimeoutMs"
: (Optional) Das Timeout in Millisekunden, um Daten von Kafka in Spark-Auftragsausführungen abzufragen. Der Standardwert ist512
. -
"numRetries"
: (Optional) Die Anzahl, wie oft erneute Versuche durchgeführt werden sollen, bevor Kafka-Offsets nicht abgerufen werden. Der Standardwert ist3
. -
"retryIntervalMs"
: (Optional) Die Wartezeit in Millisekunden, bevor Sie erneut versuchen, Kafka-Offsets abzurufen. Der Standardwert ist10
. -
"maxOffsetsPerTrigger"
: (Optional) Die Ratengrenze für die maximale Anzahl von Offsets, die pro Triggerintervall verarbeitet werden. Die angegebene Gesamtzahl der Offsets wird proportional auftopicPartitions
von verschiedenen Volumes aufgeteilt. Der Standardwert ist null, was bedeutet, dass der Verbraucher alle Offsets bis zum bekannten letzten Offset liest. -
"minPartitions"
: (Optional) Die gewünschte Mindestanzahl an Partitionen, die von Kafka gelesen werden sollen. Der Standardwert ist null, was bedeutet, dass die Anzahl der Spark-Partitionen gleich der Anzahl der Kafka-Partitionen ist. -
"includeHeaders"
: (Optional) Gibt an, ob die Kafka-Header eingeschlossen werden sollen. Wenn die Option auf „true“ gesetzt ist, enthält die Datenausgabe eine zusätzliche Spalte mit dem Namen „glue_streaming_kafka_headers“ mit dem TypArray[Struct(key: String, value: String)]
. Der Standardwert ist „false“. Diese Option ist nur in AWS Glue Version 3.0 oder höher verfügbar. -
"schema"
: (Erforderlich, wenn auf false inferSchema gesetzt) Das Schema, das zur Verarbeitung der Nutzlast verwendet werden soll. Wenn die Klassifizierungavro
ist, muss das bereitgestellte Schema im Avro-Schemaformat vorliegen. Wenn die Klassifizierung nicht erfolgt, mussavro
das bereitgestellte Schema im DDL Schemaformat vorliegen.Im Folgenden finden Sie Beispiele für Schemata.
'column1' INT, 'column2' STRING , 'column3' FLOAT
-
"inferSchema"
: (Optional) Der Standardwert ist „false“. Wenn auf „true“ gesetzt, wird das Schema zur Laufzeit von der Nutzlast inforeachbatch
erkannt. -
"avroSchema"
: (Veraltet) Parameter, der verwendet wird, um ein Schema von Avro-Daten anzugeben, wenn das Avro-Format verwendet wird. Dieser Parameter ist jetzt veraltet. Verwenden Sie den Parameterschema
. -
"addRecordTimestamp"
: (Optional) Wenn diese Option auf „true“ gesetzt ist, enthält die Datenausgabe eine zusätzliche Spalte mit dem Namen „__src_timestamp“, die den Zeitpunkt angibt, zu dem der entsprechende Datensatz beim Thema eingegangen ist. Der Standardwert von "false". Diese Option wird in AWS Glue Version 4.0 oder höher unterstützt. -
"emitConsumerLagMetrics"
: (Optional) Wenn die Option auf „true“ gesetzt ist, werden für jeden Batch die Metriken für den Zeitraum zwischen dem ältesten Datensatz, den das Thema empfangen hat, und dem Zeitpunkt, AWS Glue zu dem es eingeht, ausgegeben CloudWatch. Der Name der Metrik lautet „glue.driver.streaming“. maxConsumerLagInMs“. Der Standardwert von "false". Diese Option wird in AWS Glue Version 4.0 oder höher unterstützt.
Verwenden Sie beim Schreiben die folgenden Verbindungsoptionen mit"connectionType": "kafka"
:
-
"connectionName"
(Erforderlich) Name der AWS Glue-Verbindung, die für die Verbindung mit dem Kafka-Cluster verwendet wird (ähnlich der Kafka-Quelle). -
"topic"
(Erforderlich) Wenn eine Themenspalte existiert, wird ihr Wert als Thema verwendet, wenn die angegebene Zeile in Kafka geschrieben wird, sofern die Themenkonfigurationsoption nicht festgelegt ist. Das heißt, dietopic
Konfigurationsoption überschreibt die Themenspalte. -
"partition"
(Optional) Wenn eine gültige Partitionsnummer angegeben ist,partition
wird diese beim Senden des Datensatzes verwendet.Wenn keine Partition angegeben ist, aber eine vorhanden
key
ist, wird eine Partition anhand eines Hashs des Schlüssels ausgewählt.Falls
key
weder noch vorhandenpartition
ist, wird eine Partition auf der Grundlage von Sticky-Partitionierung ausgewählt. Diese Änderungen werden erst dann vorgenommen, wenn für die Partition mindestens batch.size-Byte erzeugt werden. -
"key"
(Optional) Wird für die Partitionierung verwendet, wenn der Wert Null ist.partition
-
"classification"
(Optional) Das von den Daten im Datensatz verwendete Dateiformat. Wir unterstützen nurJSON, CSV und Avro.Mit dem Avro-Format können wir ein benutzerdefiniertes avroSchema Format für die Serialisierung bereitstellen. Beachten Sie jedoch, dass dies für die Deserialisierung auch in der Quelle angegeben werden muss. Andernfalls verwendet es standardmäßig den Apache für die Serialisierung. AvroSchema
Darüber hinaus können Sie die Kafka-Senke nach Bedarf feinabstimmen, indem Sie die Konfigurationsparameter des Kafka-Producers
Es gibt jedoch eine kleine Liste von Optionen, die nicht wirksam werden. Weitere Informationen finden Sie unter Kafka-spezifische Konfigurationen