Verwenden der Ereignisfilterung mit einer Kinesis-Ereignisquelle - AWS Lambda

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verwenden der Ereignisfilterung mit einer Kinesis-Ereignisquelle

Sie können die Ereignisfilterung verwenden, um zu steuern, welche Datensätze aus einem Stream oder einer Warteschlange Lambda an Ihre Funktion sendet. Allgemeine Informationen zur Funktionsweise der Ereignisfilterung finden Sie unterSteuern Sie, welche Ereignisse Lambda an Ihre Funktion sendet.

Dieser Abschnitt konzentriert sich auf die Ereignisfilterung für Kinesis-Ereignisquellen.

Grundlagen der Kinesis-Ereignisfilterung

Angenommen, ein Hersteller fügt JSON formatierte Daten in Ihren Kinesis-Datenstrom ein. Ein Beispieldatensatz würde wie folgt aussehen, wobei die JSON Daten in dem Feld in eine Base64-kodierte Zeichenfolge konvertiert werden. data

{ "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }

Solange die Daten, die der Producer in den Stream eingibtJSON, gültig sind, können Sie mithilfe der Ereignisfilterung Datensätze anhand des data Schlüssels filtern. Angenommen, ein Producer fügt Datensätze im folgenden JSON Format in Ihren Kinesis-Stream ein.

{ "record": 12345, "order": { "type": "buy", "stock": "ANYCO", "quantity": 1000 } }

Um nur die Datensätze zu filtern, bei denen der Auftragstyp "Kaufen" ist, würde das FilterCriteria-Objekt wie folgt aussehen.

{ "Filters": [ { "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }" } ] }

Der besseren Übersichtlichkeit halber ist der Wert des Filters Pattern im Klartext JSON dargestellt.

{ "data": { "order": { "type": [ "buy" ] } } }

Sie können Ihren Filter über die Konsole AWS CLI oder eine AWS SAM Vorlage hinzufügen.

Console

Um diesen Filter mithilfe der Konsole hinzuzufügen, folgen Sie den Anweisungen unter Anhängen von Filterkriterien an eine Ereignisquellenzuordnung (Konsole) und geben Sie die folgende Zeichenfolge für die Filterkriterien ein.

{ "data" : { "order" : { "type" : [ "buy" ] } } }
AWS CLI

Führen Sie den folgenden Befehl aus, um mithilfe von AWS Command Line Interface (AWS CLI) eine neue Ereignisquellenzuordnung mit diesen Filterkriterien zu erstellen.

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'

Führen Sie den folgenden Befehl aus, um diese Filterkriterien zu einer vorhandenen Zuordnung von Ereignisquellen hinzuzufügen.

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
AWS SAM

Um diesen Filter mithilfe hinzuzufügen AWS SAM, fügen Sie der YAML Vorlage für Ihre Ereignisquelle den folgenden Ausschnitt hinzu.

FilterCriteria: Filters: - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'

Um Ereignisse aus Kinesis-Quellen ordnungsgemäß zu filtern, müssen sowohl das Datenfeld als auch Ihre Filterkriterien für das Datenfeld ein gültiges JSON Format haben. Wenn eines der Felder kein gültiges JSON Format hat, löscht Lambda die Nachricht oder löst eine Ausnahme aus. In der folgenden Tabelle ist das Verhalten zusammengefasst:

Format der eingehenden Daten Filtermusterformat für Dateneigenschaften Resultierende Aktion

Gültig JSON

Gültig JSON

Lambda filtert basierend auf Ihren Filterkriterien.

Gültig JSON

Kein Filtermuster für Dateneigenschaften

Lambda filtert (nur für die anderen Metadateneigenschaften) basierend auf Ihren Filterkriterien.

Gültig JSON

Nicht- JSON

Lambda gibt zum Zeitpunkt der Erstellung oder Aktualisierung der Ereignisquellenzuordnung eine Ausnahme aus. Das Filtermuster für Dateneigenschaften muss ein gültiges JSON Format haben.

Nicht- JSON

Gültig JSON

Lambda verwirft den Datensatz.

Nicht- JSON

Kein Filtermuster für Dateneigenschaften

Lambda filtert (nur für die anderen Metadateneigenschaften) basierend auf Ihren Filterkriterien.

Nicht- JSON

Nicht- JSON

Lambda gibt zum Zeitpunkt der Erstellung oder Aktualisierung der Ereignisquellenzuordnung eine Ausnahme aus. Das Filtermuster für Dateneigenschaften muss ein gültiges JSON Format haben.

Filtern von aggregierten Kinesis-Datensätzen

Mit Kinesis können Sie mehrere Datensätze in einem einzigen Kinesis-Datenstrom-Datensatz zusammenfassen, um Ihren Datendurchsatz zu erhöhen. Lambda kann Filterkriterien nur auf aggregierte Datensätze anwenden, wenn Sie Kinesis Enhanced Fanout verwenden. Das Filtern aggregierter Datensätze mit Standard-Kinesis wird nicht unterstützt. Bei der Verwendung von Enhanced Fan-Out konfigurieren Sie einen dedizierten Kinesis-Durchsatzverbraucher, der als Auslöser für Ihre Lambda-Funktion dient. Lambda filtert dann die aggregierten Datensätze und übergibt nur die Datensätze, die Ihren Filterkriterien entsprechen.

Weitere Informationen zur Kinesis-Datensatzaggregation finden Sie im Abschnitt Aggregation auf der Seite mit den wichtigsten Konzepten der Kinesis Producer Library (KPL). Weitere Informationen zur Verwendung von Lambda mit Kinesis Enhanced Fan-Out finden Sie im Compute-Blog unter Steigerung der Echtzeit-Stream-Verarbeitungsleistung mit Amazon Kinesis Data Streams Enhanced Fan-Out und AWS Lambda. AWS