Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Comprendi le chiavi di partizionamento
Con il partizionamento dinamico, crei set di dati mirati dai dati S3 in streaming partizionandoli in base alle chiavi di partizionamento. Le chiavi di partizionamento consentono di filtrare i dati in streaming in base a valori specifici. Ad esempio, se è necessario filtrare i dati in base all'ID cliente e al paese, è possibile specificare il campo dati customer_id
come una chiave di partizionamento e il campo dati country
come un'altra chiave di partizionamento. Quindi, specificare le espressioni (utilizzando i formati supportati) per definire i prefissi dei bucket S3 a cui devono essere distribuiti i record di dati partizionati in modo dinamico.
È possibile creare chiavi di partizionamento con i seguenti metodi.
-
Analisi in linea: questo metodo utilizza il meccanismo di supporto integrato di Firehose, un parser jq
, per estrarre le chiavi per il partizionamento dai record di dati in formato. JSON Attualmente jq 1.6
supportiamo solo la versione. -
AWS Funzione Lambda: questo metodo utilizza una AWS Lambda funzione specificata per estrarre e restituire i campi di dati necessari per il partizionamento.
Importante
Quando abiliti il partizionamento dinamico, devi configurare almeno uno di questi metodi per partizionare i dati. Puoi configurare uno di questi metodi per specificare le chiavi di partizionamento o entrambi contemporaneamente.
Crea chiavi di partizionamento con analisi in linea
Per configurare l'analisi in linea come metodo di partizionamento dinamico per i dati di streaming, è necessario scegliere i parametri del record di dati da utilizzare come chiavi di partizionamento e fornire un valore per ogni chiave di partizionamento specificata.
Il seguente record di dati di esempio mostra come definire le relative chiavi di partizionamento con l'analisi in linea. Nota che i dati devono essere codificati nel formato Base64. Puoi anche fare riferimento all'esempio. CLI
{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, #epoch timestamp "region": "sample_region" }
Ad esempio, puoi scegliere di partizionare i dati in base al parametro customer_id
o al parametro event_timestamp
. Ciò significa che desideri che il valore del parametro customer_id
o del parametro event_timestamp
in ogni record venga utilizzato per determinare il prefisso S3 a cui deve essere distribuito il record. Puoi anche scegliere un parametro nidificato, ad esempio device
con un'espressione .type.device
. La logica di partizionamento dinamico può dipendere da più parametri.
Dopo aver selezionato i parametri dei dati per le chiavi di partizionamento, mappa ogni parametro a un'espressione jq valida. La tabella seguente mostra una tale mappatura dei parametri alle espressioni jq:
Parametro | espressione jq |
---|---|
customer_id |
.customer_id |
device |
.type.device |
year |
.event_timestamp| strftime("%Y") |
month |
.event_timestamp| strftime("%m") |
day |
.event_timestamp| strftime("%d") |
hour |
.event_timestamp| strftime("%H") |
In fase di esecuzione, Firehose utilizza la colonna destra in alto per valutare i parametri in base ai dati di ogni record.
Crea chiavi di partizionamento con una funzione AWS Lambda
Per i record di dati compressi o crittografati o i dati in qualsiasi formato di file diverso da quelloJSON, è possibile utilizzare la AWS Lambda funzione integrata con il proprio codice personalizzato per decomprimere, decrittografare o trasformare i record al fine di estrarre e restituire i campi di dati necessari per il partizionamento. Si tratta di un'espansione della funzione di trasformazione Lambda esistente oggi disponibile con Firehose. Puoi quindi trasformare, analizzare e restituire i campi di dati da utilizzare quindi per il partizionamento dinamico usando la stessa funzione Lambda.
Di seguito è riportato un esempio di funzione Lambda di elaborazione del flusso Firehose in Python che riproduce ogni record letto dall'input all'output ed estrae le chiavi di partizionamento dai record.
from __future__ import print_function import base64 import json import datetime # Signature for all Lambda functions that user must implement def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'] + ", Region: " + firehose_records_input['region'] + ", and InvocationId: " + firehose_records_input['invocationId']) # Create return value. firehose_records_output = {'records': []} # Create result object. # Go through records and process them for firehose_record_input in firehose_records_input['records']: # Get user payload payload = base64.b64decode(firehose_record_input['data']) json_value = json.loads(payload) print("Record that was received") print(json_value) print("\n") # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {} event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp']) partition_keys = {"customerId": json_value['customerId'], "year": event_timestamp.strftime('%Y'), "month": event_timestamp.strftime('%m'), "day": event_timestamp.strftime('%d'), "hour": event_timestamp.strftime('%H'), "minute": event_timestamp.strftime('%M') } # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {'recordId': firehose_record_input['recordId'], 'data': firehose_record_input['data'], 'result': 'Ok', 'metadata': { 'partitionKeys': partition_keys }} # Must set proper record ID # Add the record to the list of output records. firehose_records_output['records'].append(firehose_record_output) # At the end return processed records return firehose_records_output
Di seguito è riportato un esempio di funzione Lambda di elaborazione del flusso Firehose in Go che riproduce ogni record letto dall'input all'output ed estrae le chiavi di partizionamento dai record.
package main import ( "fmt" "encoding/json" "time" "strconv" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type DataFirehoseEventRecordData struct { CustomerId string `json:"customerId"` } func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) { fmt.Printf("InvocationID: %s\n", evnt.InvocationID) fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn) fmt.Printf("Region: %s\n", evnt.Region) var response events.DataFirehoseResponse for _, record := range evnt.Records { fmt.Printf("RecordID: %s\n", record.RecordID) fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp) var transformedRecord events.DataFirehoseResponseRecord transformedRecord.RecordID = record.RecordID transformedRecord.Result = events.DataFirehoseTransformedStateOk transformedRecord.Data = record.Data var metaData events.DataFirehoseResponseRecordMetadata var recordData DataFirehoseEventRecordData partitionKeys := make(map[string]string) currentTime := time.Now() json.Unmarshal(record.Data, &recordData) partitionKeys["customerId"] = recordData.CustomerId partitionKeys["year"] = strconv.Itoa(currentTime.Year()) partitionKeys["month"] = strconv.Itoa(int(currentTime.Month())) partitionKeys["date"] = strconv.Itoa(currentTime.Day()) partitionKeys["hour"] = strconv.Itoa(currentTime.Hour()) partitionKeys["minute"] = strconv.Itoa(currentTime.Minute()) metaData.PartitionKeys = partitionKeys transformedRecord.Metadata = metaData response.Records = append(response.Records, transformedRecord) } return response, nil } func main() { lambda.Start(handleRequest) }