Comprendre les clés de partitionnement - Amazon Data Firehose

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Comprendre les clés de partitionnement

Avec le partitionnement dynamique, vous créez des jeux de données ciblés à partir des données S3 de streaming en partitionnant les données en fonction des clés de partitionnement. Les clés de partitionnement vous permettent de filtrer vos données de streaming en fonction de valeurs spécifiques. Par exemple, si vous devez filtrer vos données en fonction de l'ID du client et du pays, vous pouvez spécifier le champ de données de customer_id comme une clé de partitionnement et le champ de données de country comme une autre clé de partitionnement. Ensuite, vous spécifiez les expressions (en utilisant les formats pris en charge) pour définir les préfixes de compartiment S3 auxquels les enregistrements de données partitionnés dynamiquement doivent être livrés.

Vous pouvez créer des clés de partitionnement à l'aide des méthodes suivantes.

  • Analyse en ligne : cette méthode utilise le mécanisme de support intégré de Firehose, un analyseur jq, pour extraire les clés de partitionnement à partir d'enregistrements de données formatés. JSON Actuellement, nous ne prenons en charge que jq 1.6 la version.

  • AWS Fonction Lambda : cette méthode utilise une AWS Lambda fonction spécifiée pour extraire et renvoyer les champs de données nécessaires au partitionnement.

Important

Lorsque vous activez le partitionnement dynamique, vous devez configurer au moins l'une de ces méthodes pour partitionner vos données. Vous pouvez configurer l'une de ces méthodes pour spécifier vos clés de partitionnement ou les deux en même temps.

Création de clés de partitionnement avec analyse syntaxique intégrée

Pour configurer l'analyse en ligne comme méthode de partitionnement dynamique pour vos données de streaming, vous devez choisir les paramètres d'enregistrement de données à utiliser comme clés de partitionnement et fournir une valeur pour chaque clé de partitionnement spécifiée.

L'exemple d'enregistrement de données suivant montre comment vous pouvez définir des clés de partitionnement pour celui-ci à l'aide de l'analyse en ligne. Notez que les données doivent être codées au format Base64. Vous pouvez également vous référer à l'CLIexemple.

{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, #epoch timestamp "region": "sample_region" }

Par exemple, vous pouvez choisir de partitionner vos données en fonction du paramètre customer_id ou du paramètre event_timestamp. Cela signifie que vous souhaitez que la valeur du paramètre customer_id ou du paramètre event_timestamp de chaque enregistrement soit utilisée pour déterminer le préfixe S3 auquel l'enregistrement doit être livré. Vous pouvez également choisir un paramètre imbriqué, comme device avec une expression .type.device. Votre logique de partitionnement dynamique peut dépendre de plusieurs paramètres.

Après avoir sélectionné les paramètres de données pour vos clés de partitionnement, vous mappez chaque paramètre à une expression jq valide. Le tableau suivant montre un tel mappage de paramètres avec des expressions jq :

Paramètre expression jq
customer_id .customer_id
device

.type.device

year

.event_timestamp| strftime("%Y")

month

.event_timestamp| strftime("%m")

day

.event_timestamp| strftime("%d")

hour

.event_timestamp| strftime("%H")

Au moment de l'exécution, Firehose utilise la colonne de droite ci-dessus pour évaluer les paramètres en fonction des données de chaque enregistrement.

Création de clés de partitionnement avec une fonction AWS Lambda

Pour les enregistrements de données compressés ou chiffrés, ou pour les données sous un autre format de fichierJSON, vous pouvez utiliser la AWS Lambda fonction intégrée avec votre propre code personnalisé pour décompresser, déchiffrer ou transformer les enregistrements afin d'extraire et de renvoyer les champs de données nécessaires au partitionnement. Il s'agit d'une extension de la fonction Lambda de transformation existante qui est disponible aujourd'hui avec Firehose. Vous pouvez transformer, analyser et renvoyer les champs de données que vous pouvez ensuite utiliser pour le partitionnement dynamique à l'aide de la même fonction Lambda.

Voici un exemple de fonction Lambda de traitement de flux Firehose en Python qui rejoue chaque enregistrement lu de l'entrée à la sortie et extrait les clés de partitionnement des enregistrements.

from __future__ import print_function import base64 import json import datetime # Signature for all Lambda functions that user must implement def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'] + ", Region: " + firehose_records_input['region'] + ", and InvocationId: " + firehose_records_input['invocationId']) # Create return value. firehose_records_output = {'records': []} # Create result object. # Go through records and process them for firehose_record_input in firehose_records_input['records']: # Get user payload payload = base64.b64decode(firehose_record_input['data']) json_value = json.loads(payload) print("Record that was received") print(json_value) print("\n") # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {} event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp']) partition_keys = {"customerId": json_value['customerId'], "year": event_timestamp.strftime('%Y'), "month": event_timestamp.strftime('%m'), "day": event_timestamp.strftime('%d'), "hour": event_timestamp.strftime('%H'), "minute": event_timestamp.strftime('%M') } # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {'recordId': firehose_record_input['recordId'], 'data': firehose_record_input['data'], 'result': 'Ok', 'metadata': { 'partitionKeys': partition_keys }} # Must set proper record ID # Add the record to the list of output records. firehose_records_output['records'].append(firehose_record_output) # At the end return processed records return firehose_records_output

Voici un exemple de fonction Lambda de traitement de flux Firehose dans Go qui rejoue chaque enregistrement lu de l'entrée vers la sortie et extrait les clés de partitionnement des enregistrements.

package main import ( "fmt" "encoding/json" "time" "strconv" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type DataFirehoseEventRecordData struct { CustomerId string `json:"customerId"` } func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) { fmt.Printf("InvocationID: %s\n", evnt.InvocationID) fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn) fmt.Printf("Region: %s\n", evnt.Region) var response events.DataFirehoseResponse for _, record := range evnt.Records { fmt.Printf("RecordID: %s\n", record.RecordID) fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp) var transformedRecord events.DataFirehoseResponseRecord transformedRecord.RecordID = record.RecordID transformedRecord.Result = events.DataFirehoseTransformedStateOk transformedRecord.Data = record.Data var metaData events.DataFirehoseResponseRecordMetadata var recordData DataFirehoseEventRecordData partitionKeys := make(map[string]string) currentTime := time.Now() json.Unmarshal(record.Data, &recordData) partitionKeys["customerId"] = recordData.CustomerId partitionKeys["year"] = strconv.Itoa(currentTime.Year()) partitionKeys["month"] = strconv.Itoa(int(currentTime.Month())) partitionKeys["date"] = strconv.Itoa(currentTime.Day()) partitionKeys["hour"] = strconv.Itoa(currentTime.Hour()) partitionKeys["minute"] = strconv.Itoa(currentTime.Minute()) metaData.PartitionKeys = partitionKeys transformedRecord.Metadata = metaData response.Records = append(response.Records, transformedRecord) } return response, nil } func main() { lambda.Start(handleRequest) }