Utilisation de Neptune Streams - Amazon Neptune

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation de Neptune Streams

La fonctionnalité Neptune Streams vous permet de générer une séquence complète d'entrées de journal des modifications qui enregistrent chaque modification apportée aux données de votre graphe au fur et à mesure qu'elles sont appliquées. Pour obtenir une présentation de cette fonction, veuillez consulter Capture des modifications de graphe en temps réel à l'aide des flux Neptune.

Activation de Neptune Streams

Vous pouvez activer ou désactiver Neptune Streams à tout moment en définissant le paramètre de cluster de bases de données neptune_streams. Si le paramètre est défini sur 1, Streams est activé, et s'il est défini sur 0, Streams est désactivé.

Note

Après avoir modifié le paramètre neptune_streams du cluster de bases de données, vous devez redémarrer toutes les instances de base de données du cluster pour que la modification soit effective.

Vous pouvez définir le paramètre de cluster de bases de données neptune_streams_expiry_days pour contrôler le nombre de jours (compris entre 1 et 90) pendant lesquels les enregistrements de flux resteront sur le serveur avant d'être supprimés. La valeur par défaut est 7.

Initialement, Neptune Streams a été ajouté en tant que fonctionnalité expérimentale que vous activiez ou désactiviez en mode laboratoire à l'aide du paramètre neptune_lab_mode du cluster de bases de données (voir Mode expérimental Neptune). L'utilisation du mode Lab pour activer Streams est désormais obsolète et sera désactivée à l'avenir.

Désactivation de Neptune Streams

Vous pouvez désactiver Neptune Streams à tout moment.

Pour désactiver Streams, mettez à jour le groupe de paramètres de cluster de bases de données afin que la valeur du paramètre neptune_streams soit définie sur 0.

Important

Dès que Streams est désactivé, vous ne pouvez plus accéder aux données du journal des modifications. Veillez à lire les informations qui vous intéressent avant de désactiver Streams.

Appeler les Neptune Streams REST API

Vous accédez à Neptune Streams à l'aide d'un REST API qui envoie une HTTP GET demande à l'un des points de terminaison locaux suivants :

  • Pour une base de données de SPARQL graphes :https://Neptune-DNS:8182/sparql/stream.

  • Pour un G705 ou une base de données de openCypher graphes : https://Neptune-DNS:8182/propertygraph/stream ouhttps://Neptune-DNS:8182/pg/stream.

Note

Depuis la version 1.1.0.0 du moteur, le point de terminaison du flux Gremlin (https://Neptune-DNS:8182/gremlin/stream) est obsolète, ainsi que son format de sortie associé (GREMLIN_JSON). Il reste pris en charge pour des raisons de rétrocompatibilité, mais il pourrait être supprimé dans les futures versions.

Seule une HTTP GET opération est autorisée.

Neptune prend en charge la gzip compression de la réponse, à condition que la HTTP demande inclue un Accept-Encoding en-tête qui indique gzip un format de compression accepté (c'est-à-dire,"Accept-Encoding: gzip").

Paramètres
  • limit : long, facultatif. Plage : de 1 à 100 000. Par défaut : 10

    Spécifie le nombre maximal d'enregistrements à renvoyer. Il existe également une limite de taille de 10 Mo pour la réponse qui ne peut pas être modifiée et qui est prioritaire sur le nombre d'enregistrements spécifié dans le paramètre limit. La réponse inclut un enregistrement de dépassement de seuil si la limite de 10 Mo a été atteinte.

  • iteratorType : chaîne, facultatif.

    Ce paramètre peut avoir l'une des valeurs suivantes :

    • AT_SEQUENCE_NUMBER (valeur par défaut) : indique que la lecture doit commencer à partir du numéro de séquence d'événement spécifié conjointement par les paramètres commitNum et opNum.

    • AFTER_SEQUENCE_NUMBER : indique que la lecture doit commencer juste après le numéro de séquence d'événement spécifié conjointement par les paramètres commitNum et opNum.

    • TRIM_HORIZON : indique que la lecture doit commencer au niveau du dernier enregistrement non tronqué du système, qui est le plus ancien enregistrement n'ayant pas expiré (pas encore supprimé) dans le flux de journaux des modifications. Ce mode est utile lors du démarrage de l'application, lorsque vous n'avez pas de numéro de séquence d'événement de démarrage spécifique.

    • LATEST : indique que la lecture doit commencer au niveau de l'enregistrement le plus récent dans le système, qui est le dernier enregistrement n'ayant pas expiré (pas encore supprimé) dans le flux de journaux des modifications. Cela est utile lorsqu'il est nécessaire de lire les enregistrements à partir du haut actuel des flux afin de ne pas traiter les anciens enregistrements, par exemple lors d'une reprise après sinistre ou d'une mise à niveau sans interruption de service. Notez que dans ce mode, un seul enregistrement est renvoyé au maximum.

  • commitNum— long, obligatoire quand iteratorType c'est le cas AT_SEQUENCE_NUMBER ouAFTER_SEQUENCE_NUMBER.

    Numéro de validation de l'enregistrement de départ à lire à partir du flux du journal des modifications.

    Ce paramètre est ignoré quand iteratorType a la valeur TRIM_HORIZON ou LATEST.

  • opNum : long, facultatif (la valeur par défaut est 1).

    Numéro de séquence d'opération au sein de la validation spécifiée à partir duquel commencer la lecture dans les données du flux du journal des modifications.

Les opérations qui modifient les données du SPARQL graphe ne génèrent généralement qu'un seul enregistrement de modification par opération. Cependant, les opérations qui modifient les données de graphe Gremlin peuvent générer plusieurs enregistrements de modification par opération, comme dans les exemples suivants :

  • INSERT : un sommet Gremlin peut avoir plusieurs étiquettes, et un élément Gremlin peut avoir plusieurs propriétés. Un enregistrement de modification distinct est généré pour chaque étiquette et propriété lorsqu'un élément est inséré.

  • UPDATE : lorsqu'une propriété d'élément Gremlin est modifiée, deux enregistrements de modification sont générés : le premier pour supprimer la valeur précédente et le second pour insérer la nouvelle valeur.

  • DELETE : un enregistrement de modification distinct est généré pour chaque propriété d'élément supprimée. Par exemple, lorsqu'un arc Gremlin avec des propriétés est supprimé, un enregistrement de modification est généré pour chacune des propriétés, puis un autre enregistrement est généré pour la suppression de l'étiquette d'arc.

    Lorsqu'un sommet Gremlin est supprimé, toutes les propriétés d'arc entrant et sortant sont supprimées en premier, puis viennent les étiquettes d'arc, les propriétés de sommet et enfin les étiquettes de sommet. Chacune de ces suppressions génère un enregistrement de modification.

Format de réponse de Neptune Streams API

Une réponse à une REST API demande de Neptune Streams contient les champs suivants :

  • lastEventId : identifiant de séquence de la dernière modification dans la réponse du flux. Un ID d'événement se compose de deux champs : un commitNum qui identifie une transaction ayant modifié le graphe et un opNum qui identifie une opération spécifique au sein de cette transaction. Voici un exemple :

    "eventId": { "commitNum": 12, "opNum": 1 }
  • lastTrxTimestamp : heure à laquelle la validation de la transaction a été demandée, en millisecondes, à partir de l'Unix Epoch.

  • format : format de sérialisation pour les enregistrements de modification renvoyés. Les valeurs possibles concernent les enregistrements PG_JSON de G705 ou de openCypher modification, et NQUADS les enregistrements de SPARQL modifications.

  • records : tableau des enregistrements sérialisés du flux de journaux de modifications inclus dans la réponse. Chaque enregistrement du tableau records contient les champs suivants :

    • commitTimestamp : heure à laquelle la validation de la transaction a été demandée, en millisecondes, à partir de l'Unix Epoch.

    • eventId : identifiant de séquence de l'enregistrement de dernière modification du flux.

    • data— Le G705 sérialiséSPARQL, ou enregistrement des OpenCypher modifications. Les formats de sérialisation de chaque enregistrement sont décrits plus en détail dans la section suivante, Formats de sérialisation dans Neptune Streams.

    • op : opération à l'origine de la modification.

    • isLastOp : présent uniquement si cette opération est la dernière dans sa transaction. Lorsqu'il est présent, il est défini sur true. Utile pour s'assurer qu'une transaction est consommée dans son intégralité.

  • totalRecords : nombre total d'enregistrements dans la réponse.

Par exemple, la réponse suivante renvoie les données de modification Gremlin pour une transaction contenant plusieurs opérations :

{ "lastEventId": { "commitNum": 12, "opNum": 1 }, "lastTrxTimestamp": 1560011610678, "format": "PG_JSON", "records": [ { "commitTimestamp": 1560011610678, "eventId": { "commitNum": 1, "opNum": 1 }, "data": { "id": "d2b59bf8-0d0f-218b-f68b-2aa7b0b1904a", "type": "vl", "key": "label", "value": { "value": "vertex", "dataType": "String" } }, "op": "ADD" } ], "totalRecords": 1 }

La réponse suivante renvoie les données de SPARQL modification relatives à la dernière opération d'une transaction (opération identifiée par EventId(97, 1) le numéro de transaction 97).

{ "lastEventId": { "commitNum": 97, "opNum": 1 }, "lastTrxTimestamp": 1561489355102, "format": "NQUADS", "records": [ { "commitTimestamp": 1561489355102, "eventId": { "commitNum": 97, "opNum": 1 }, "data": { "stmt": "<https://test.com/s> <https://test.com/p> <https://test.com/o> .\n" }, "op": "ADD", "isLastOp": true } ], "totalRecords": 1 }

Exceptions relatives à Neptune Streams API

Le tableau suivant décrit les exceptions Neptune Streams.

Code d’erreur HTTPCode OK pour réessayer ? Message

InvalidParameterException

400

Non

Une valeur non valide ou une out-of-range valeur a été fournie en tant que paramètre d'entrée.

ExpiredStreamException

400

Non

Tous les enregistrements demandés dépassent l'âge maximum autorisé et ont expiré.

ThrottlingException

500

Oui

La fréquence des demandes dépasse le débit maximum.

StreamRecordsNotFoundException

404

Non

La ressource demandée est introuvable. Le flux n'est peut-être pas spécifié correctement.

MemoryLimitExceededException

500

Oui

Le traitement de la demande a échoué en raison d'un manque de mémoire, mais pourra être réessayé lorsque le serveur sera moins occupé.