

# Using Neptune Streams
<a name="streams-using"></a>

With the Neptune Streams feature, you can generate a complete sequence of change-log entries that record every change made to your graph data as it happens. For an overview of this feature, see [Capturing graph changes in real time using Neptune streams](streams.md).

**Topics**
+ [Enabling Neptune Streams](streams-using-enabling.md)
+ [Disabling Neptune Streams](streams-using-disabling.md)
+ [Calling the Neptune Streams REST API](streams-using-api-call.md)
+ [Neptune Streams API Response Format](streams-using-api-reponse.md)
+ [Neptune Streams API Exceptions](streams-using-api-exceptions.md)

# Enabling Neptune Streams
<a name="streams-using-enabling"></a>

You can enable or disable Neptune Streams at any time by setting the [`neptune_streams` DB cluster parameter](parameters.md#parameters-db-cluster-parameters-neptune_streams). Setting the parameter to `1` enables Streams, and setting it to `0` disables Streams.

**Note**  
After changing the `neptune_streams` DB cluster parameter, you must reboot all DB instances in the cluster for the change to take effect.

You can set the [neptune\$1streams\$1expiry\$1days](parameters.md#parameters-db-cluster-parameters-neptune_streams_expiry_days) DB cluster parameter to control how many days, from 1 to 90, that stream records remain on the server before being deleted. The default is 7.

Neptune Streams was initially introduced as an experimental feature that you enabled or disabled in Lab Mode using the DB Cluster `neptune_lab_mode` parameter (see [Neptune Lab Mode](features-lab-mode.md)). Using Lab Mode to enable Streams is now deprecated and will be disabled in the future.

# Disabling Neptune Streams
<a name="streams-using-disabling"></a>

You can turn Neptune Streams off any time that it is running.

To turn Streams off, update the DB Cluster parameter group so that the value of the `neptune_streams` parameter is set to 0.

**Important**  
As soon as Streams is turned off, you can't access the change-log data any more. Be sure to read what you are interested in *before* turning Streams off.

# Calling the Neptune Streams REST API
<a name="streams-using-api-call"></a>

You access Neptune Streams using a REST API that sends an HTTP GET request to one of the following local endpoints:
+ For a SPARQL graph DB:   `https://Neptune-DNS:8182/sparql/stream`.
+ For a Gremlin or openCypher graph DB:   `https://Neptune-DNS:8182/propertygraph/stream` or `https://Neptune-DNS:8182/pg/stream`.

**Note**  
The Gremlin stream endpoint (`https://Neptune-DNS:8182/gremlin/stream`) is deprecated, along with its associated output format (`GREMLIN_JSON`). It is still supported for backward compatibility but may be removed in future releases.

Only an HTTP `GET` operation is allowed.

Neptune supports `gzip` compression of the response, provided that the HTTP request includes an `Accept-Encoding` header that specifies `gzip` as an accepted compression format (that is, `"Accept-Encoding: gzip"`).

**Parameters**
+ `limit`   –   long, optional. Range: 1–100,000. Default: 10.

  Specifies the maximum number of records to return. There is also a size limit of 10 MB on the response that can't be modified and that takes precedence over the number of records specified in the `limit` parameter. The response does include a threshold-breaching record if the 10 MB limit was reached.
+ `iteratorType`   –   String, optional.

  This parameter can take one of the following values:
  + `AT_SEQUENCE_NUMBER`(default)   –   Indicates that reading should start from the event sequence number specified jointly by the `commitNum` and `opNum` parameters.
  + `AFTER_SEQUENCE_NUMBER`   –   Indicates that reading should start right after the event sequence number specified jointly by the `commitNum` and `opNum` parameters.
  + `TRIM_HORIZON`   –   Indicates that reading should start at the last untrimmed record in the system, which is the oldest unexpired (not yet deleted) record in the change-log stream. This mode is useful during application startup, when you don't have a specific starting event sequence number.
  + `LATEST`   –   Indicates that reading should start at the most recent record in the system, which is the latest unexpired (not yet deleted) record in the change-log stream. This is useful when there is a need to read records from current top of the streams so as not to process older records, such as during disaster recovery or a zero-downtime upgrade. Note that in this mode, there is at most only one record returned.
+ `commitNum`   –   long, required when iteratorType is `AT_SEQUENCE_NUMBER` or `AFTER_SEQUENCE_NUMBER`.

  The commit number of the starting record to read from the change-log stream.

  This parameter is ignored when `iteratorType` is `TRIM_HORIZON` or `LATEST`.
+ `opNum`   –   long, optional (the default is `1`).

  The operation sequence number within the specified commit to start reading from in the change-log stream data.

Operations that change SPARQL graph data generally only generate a single change record per operation. However, operations that change Gremlin graph data can generate multiple change records per operation, as in the following examples:
+ `INSERT`   –   A Gremlin vertex can have multiple labels, and a Gremlin element can have multiple properties. A separate change record is generated for each label and property when an element is inserted.
+ `UPDATE`   –   When a Gremlin element property is changed, two change records are generated: the first for removing the previous value, and the second for inserting the new value.
+ `DELETE`   –   A separate change record is generated for each element property that is deleted. For example, when a Gremlin edge with properties is deleted, one change record is generated for each of the properties, and after that, one is generated for deletion of the edge label.

  When a Gremlin vertex is deleted, all the incoming and outgoing edge properties are deleted first, then the edge labels, then the vertex properties, and finally the vertex labels. Each of these deletions generates a change record.

# Neptune Streams API Response Format
<a name="streams-using-api-reponse"></a>

A response to a Neptune Streams REST API request has the following fields:
+ `lastEventId`   –   Sequence identifier of the last change in the stream response. An event ID is composed of two fields: A `commitNum` identifies a transaction that changed the graph, and an `opNum` identifies a specific operation within that transaction. This is shown in the following example.

  ```
    "eventId": {
      "commitNum": 12,
      "opNum": 1
    }
  ```
+ `lastTrxTimestamp`   –   The time at which the commit for the transaction was requested, in milliseconds from the Unix epoch.
+ `format`   –   Serialization format for the change records being returned. The possible values are `PG_JSON` for Gremlin or openCypher change records, and `NQUADS` for SPARQL change records.
+ `records`   –   An array of serialized change-log stream records included in the response. Each record in the `records` array contains these fields:
  + `commitTimestamp`   –   The time at which the commit for the transaction was requested, in milliseconds from the Unix epoch.
  + `eventId`   –   The sequence identifier of the stream change record.
  + `data`   –   The serialized Gremlin, SPARQL, or OpenCypher change record. The serialization formats of each record are described in more detail in the next section, [Serialization Formats in Neptune Streams](streams-change-formats.md).
  + `op`   –   The operation that created the change.
  + `isLastOp`   –   Only present if this operation is the last one in its transaction. When present, it is set to `true`. Useful for ensuring that an entire transaction is consumed.
+ `totalRecords`   –   The total number of records in the response.

For example, the following response returns Gremlin change data, for a transaction that contains more than one operation:

```
{
  "lastEventId": {
    "commitNum": 12,
    "opNum": 1
  },
  "lastTrxTimestamp": 1560011610678,
  "format": "PG_JSON",
  "records": [
    {
      "commitTimestamp": 1560011610678,
      "eventId": {
        "commitNum": 1,
        "opNum": 1
      },
      "data": {
        "id": "d2b59bf8-0d0f-218b-f68b-2aa7b0b1904a",
        "type": "vl",
        "key": "label",
        "value": {
          "value": "vertex",
          "dataType": "String"
        }
      },
      "op": "ADD"
    }
  ],
  "totalRecords": 1
}
```

The following response returns SPARQL change data for the last operation in a transaction (the operation identified by `EventId(97, 1)` in transaction number 97).

```
{
  "lastEventId": {
    "commitNum": 97,
    "opNum": 1
  },
  "lastTrxTimestamp": 1561489355102,
  "format": "NQUADS",
  "records": [
    {
      "commitTimestamp": 1561489355102,
      "eventId": {
        "commitNum": 97,
        "opNum": 1
      },
      "data": {
        "stmt": "<https://test.com/s> <https://test.com/p> <https://test.com/o> .\n"
      },
      "op": "ADD",
      "isLastOp": true
    }
  ],
  "totalRecords": 1
}
```

# Neptune Streams API Exceptions
<a name="streams-using-api-exceptions"></a>

The following table describes Neptune Streams exceptions.


| Error Code | HTTP Code | OK to Retry? | Message | 
| --- | --- | --- | --- | 
| `InvalidParameterException` | 400 | No | An invalid or out-of-range value was supplied as an input parameter. | 
| `ExpiredStreamException` | 400 | No | All of the requested records exceed the maximum age allowed and have expired. | 
| `ThrottlingException` | 500 | Yes | Rate of requests exceeds the maximum throughput. | 
| `StreamRecordsNotFoundException` | 404 | No | The requested resource could not be found. The stream may not be specified correctly. | 
| `MemoryLimitExceededException` | 500 | Yes | The request processing did not succeed due to lack of memory, but can be retried when the server is less busy. | 