Seleziona le tue preferenze relative ai cookie

Utilizziamo cookie essenziali e strumenti simili necessari per fornire il nostro sito e i nostri servizi. Utilizziamo i cookie prestazionali per raccogliere statistiche anonime in modo da poter capire come i clienti utilizzano il nostro sito e apportare miglioramenti. I cookie essenziali non possono essere disattivati, ma puoi fare clic su \"Personalizza\" o \"Rifiuta\" per rifiutare i cookie prestazionali.

Se sei d'accordo, AWS e le terze parti approvate utilizzeranno i cookie anche per fornire utili funzionalità del sito, ricordare le tue preferenze e visualizzare contenuti pertinenti, inclusa la pubblicità pertinente. Per continuare senza accettare questi cookie, fai clic su \"Continua\" o \"Rifiuta\". Per effettuare scelte più dettagliate o saperne di più, fai clic su \"Personalizza\".

AWS Glue Concetti di streaming

Modalità Focus
AWS Glue Concetti di streaming - AWS Glue

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Le seguenti sezioni forniscono informazioni sui concetti di AWS Glue Streaming.

Anatomia di un lavoro AWS Glue in streaming

AWS Glue i lavori di streaming si basano sul paradigma dello streaming Spark e sfruttano lo streaming strutturato del framework Spark. I processi di streaming effettuano costantemente il polling dall'origine dati di streaming a un intervallo di tempo specifico per recuperare i record sotto forma di microbatch. Le seguenti sezioni esaminano le diverse parti di un processo di streaming. AWS Glue

La schermata mostra un registro di Amazon CloudWatch monitoraggio, AWS Glue per l'esempio fornito sopra, che esamina il numero di esecutori necessari (linea arancione) e ridimensiona gli esecutori (linea blu) in modo che corrispondano a tale numero senza bisogno di regolazioni manuali.

forEachBatch

Il forEachBatch metodo è il punto di ingresso dell'esecuzione di un processo di streaming. AWS Glue AWS Glue streaming jobs utilizza il forEachBatch metodo per eseguire il polling dei dati funzionando come un iteratore che rimane attivo durante il ciclo di vita del processo di streaming e interroga regolarmente la fonte di streaming alla ricerca di nuovi dati ed elabora i dati più recenti in microbatch.

glueContext.forEachBatch( frame=dataFrame_AmazonKinesis_node1696872487972, batch_function=processBatch, options={ "windowSize": "100 seconds", "checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/", }, )

Configura la proprietà frame di forEachBatch per specificare un'origine di streaming. In questo esempio, il nodo di origine creato nell'area di disegno vuota durante la creazione del lavoro viene popolato con l'impostazione predefinita del lavoro. DataFrame Imposta la proprietà batch_function come function che decidi di richiamare per ogni operazione di microbatch. Per gestire la trasformazione in batch sui dati in entrata è necessario definire una funzione.

Origine

Nella prima fase della processBatch funzione, il programma verifica il conteggio dei DataFrame record definiti come proprietà frame di. forEachBatch Il programma aggiunge un timestamp di inserimento a un valore non vuoto. DataFrame La clausola data_frame.count()>0 determina se l'ultimo microbatch non è vuoto ed è pronto per un'ulteriore elaborazione.

def processBatch(data_frame, batchId): if data_frame.count() >0: AmazonKinesis_node1696872487972 = DynamicFrame.fromDF( glueContext.add_ingestion_time_columns(data_frame, "hour"), glueContext, "from_data_frame", )

Mapping

La sezione successiva del programma consiste nell'applicare la mappatura. Il Mapping.apply metodo su Spark DataFrame consente di definire una regola di trasformazione relativa agli elementi di dati. In genere è possibile rinominare, modificare il tipo di dati o applicare una funzione personalizzata alla colonna di dati di origine e mapparli alle colonne di destinazione.

#Script generated for node ChangeSchema ChangeSchema_node16986872679326 = ApplyMapping.apply( frame = AmazonKinesis_node1696872487972, mappings = [ ("eventtime", "string", "eventtime", "string"), ("manufacturer", "string", "manufacturer", "string"), ("minutevolume", "long", "minutevolume", "int"), ("o2stats", "long", "OxygenSaturation", "int"), ("pressurecontrol", "long", "pressurecontrol", "int"), ("serialnumber", "string", "serialnumber", "string"), ("ventilatorid", "long", "ventilatorid", "long"), ("ingest_year", "string", "ingest_year", "string"), ("ingest_month", "string", "ingest_month", "string"), ("ingest_day", "string", "ingest_day", "string"), ("ingest_hour", "string", "ingest_hour", "string"), ], transformation_ctx="ChangeSchema_node16986872679326", ) )

Sink

In questa sezione, il set di dati in entrata dall'origine di streaming viene archiviato in una posizione di destinazione. In questo esempio scriveremo i dati in una posizione Amazon S3. I dettagli della proprietà AmazonS3_node_path sono precompilati in base alle impostazioni utilizzate durante la creazione del processo dal canvas. È possibile impostare updateBehavior in base al proprio caso d'uso e decidere di non aggiornare la tabella del Catalogo dati, creare il Catalogo dati e aggiornare il relativo schema nelle esecuzioni successive oppure creare una tabella di catalogo e non aggiornare la definizione dello schema nelle esecuzioni successive.

La proprietà partitionKeys definisce l'opzione della partizione di archiviazione. Il comportamento predefinito consiste nel partizionare i dati in base al valore ingestion_time_columns fornito nella sezione di origine. La proprietà compression consente di impostare l'algoritmo di compressione da applicare durante la scrittura della destinazione. Hai delle opzioni per impostare Snappy o GZIP come LZO tecnica di compressione. La proprietà enableUpdateCatalog controlla se la tabella del catalogo AWS Glue deve essere aggiornata. Le opzioni disponibili per questa proprietà sono True o False.

#Script generated for node Amazon S3 AmazonS3_node1696872743449 = glueContext.getSink( path = AmazonS3_node1696872743449_path, connection_type = "s3", updateBehavior = "UPDATE_IN_DATABASE", partitionKeys = ["ingest_year", "ingest_month", "ingest_day", "ingest_hour"], compression = "snappy", enableUpdateCatalog = True, transformation_ctx = "AmazonS3_node1696872743449", )

AWS Glue Lavello da catalogo

Questa sezione del lavoro controlla il comportamento di aggiornamento della tabella del AWS Glue catalogo. Imposta catalogDatabase una catalogTableName proprietà in base al nome del database del AWS Glue catalogo e al nome della tabella associata al AWS Glue lavoro che stai progettando. È possibile definire il formato di file dei dati di destinazione tramite la proprietà setFormat. Per questo esempio, i dati verranno archiviati in formato Parquet.

Una volta configurato ed eseguito il processo di AWS Glue streaming che fa riferimento a questo tutorial, i dati di streaming prodotti Amazon Kinesis Data Streams verranno archiviati nella sede di Amazon S3 in un formato parquet con compressione rapida. Una volta eseguito correttamente il processo di streaming, potrai interrogare i dati tramite Amazon Athena.

AmazonS3_node1696872743449 = setCatalogInfo( catalogDatabase = "demo", catalogTableName = "demo_stream_transform_result" ) AmazonS3_node1696872743449.setFormat("glueparquet") AmazonS3_node1696872743449.writeFormat("ChangeSchema_node16986872679326") )
PrivacyCondizioni del sitoPreferenze cookie
© 2025, Amazon Web Services, Inc. o società affiliate. Tutti i diritti riservati.