

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用适用于 Java 的 SDK 2.x 的 Firehose 示例
<a name="java_firehose_code_examples"></a>

以下代码示例向您展示了如何使用 AWS SDK for Java 2.x 与 Firehose 配合使用来执行操作和实现常见场景。

*操作*是大型程序的代码摘录，必须在上下文中运行。您可以通过操作了解如何调用单个服务函数，还可以通过函数相关场景的上下文查看操作。

*场景*是向您演示如何通过在一个服务中调用多个函数或与其他 AWS 服务结合来完成特定任务的代码示例。

每个示例都包含一个指向完整源代码的链接，您可以从中找到有关如何在上下文中设置和运行代码的说明。

**Topics**
+ [操作](#actions)
+ [场景](#scenarios)

## 操作
<a name="actions"></a>

### `PutRecord`
<a name="firehose_PutRecord_java_topic"></a>

以下代码示例演示了如何使用 `PutRecord`。

**适用于 Java 的 SDK 2.x**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/javav2/example_code/firehose#code-examples)中查找完整示例，了解如何进行设置和运行。

```
    /**
     * Puts a record to the specified Amazon Kinesis Data Firehose delivery stream.
     *
     * @param record The record to be put to the delivery stream. The record must be a {@link Map} of String keys and Object values.
     * @param deliveryStreamName The name of the Amazon Kinesis Data Firehose delivery stream to which the record should be put.
     * @throws IllegalArgumentException if the input record or delivery stream name is null or empty.
     * @throws RuntimeException if there is an error putting the record to the delivery stream.
     */
    public static void putRecord(Map<String, Object> record, String deliveryStreamName) {
        if (record == null || deliveryStreamName == null || deliveryStreamName.isEmpty()) {
            throw new IllegalArgumentException("Invalid input: record or delivery stream name cannot be null/empty");
        }
        try {
            String jsonRecord = new ObjectMapper().writeValueAsString(record);
            Record firehoseRecord = Record.builder()
                .data(SdkBytes.fromByteArray(jsonRecord.getBytes(StandardCharsets.UTF_8)))
                .build();

            PutRecordRequest putRecordRequest = PutRecordRequest.builder()
                .deliveryStreamName(deliveryStreamName)
                .record(firehoseRecord)
                .build();

            getFirehoseClient().putRecord(putRecordRequest);
            System.out.println("Record sent: " + jsonRecord);
        } catch (Exception e) {
            throw new RuntimeException("Failed to put record: " + e.getMessage(), e);
        }
    }
```
+  有关 API 的详细信息，请参阅 *AWS SDK for Java 2.x API 参考[PutRecord](https://docs.aws.amazon.com/goto/SdkForJavaV2/firehose-2015-08-04/PutRecord)*中的。

### `PutRecordBatch`
<a name="firehose_PutRecordBatch_java_topic"></a>

以下代码示例演示了如何使用 `PutRecordBatch`。

**适用于 Java 的 SDK 2.x**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/javav2/example_code/firehose#code-examples)中查找完整示例，了解如何进行设置和运行。

```
    /**
     * Puts a batch of records to an Amazon Kinesis Data Firehose delivery stream.
     *
     * @param records           a list of maps representing the records to be sent
     * @param batchSize         the maximum number of records to include in each batch
     * @param deliveryStreamName the name of the Kinesis Data Firehose delivery stream
     * @throws IllegalArgumentException if the input parameters are invalid (null or empty)
     * @throws RuntimeException         if there is an error putting the record batch
     */
    public static void putRecordBatch(List<Map<String, Object>> records, int batchSize, String deliveryStreamName) {
        if (records == null || records.isEmpty() || deliveryStreamName == null || deliveryStreamName.isEmpty()) {
            throw new IllegalArgumentException("Invalid input: records or delivery stream name cannot be null/empty");
        }
        ObjectMapper objectMapper = new ObjectMapper();

        try {
            for (int i = 0; i < records.size(); i += batchSize) {
                List<Map<String, Object>> batch = records.subList(i, Math.min(i + batchSize, records.size()));

                List<Record> batchRecords = batch.stream().map(record -> {
                    try {
                        String jsonRecord = objectMapper.writeValueAsString(record);
                        return Record.builder()
                            .data(SdkBytes.fromByteArray(jsonRecord.getBytes(StandardCharsets.UTF_8)))
                            .build();
                    } catch (Exception e) {
                        throw new RuntimeException("Error creating Firehose record", e);
                    }
                }).collect(Collectors.toList());

                PutRecordBatchRequest request = PutRecordBatchRequest.builder()
                    .deliveryStreamName(deliveryStreamName)
                    .records(batchRecords)
                    .build();

                PutRecordBatchResponse response = getFirehoseClient().putRecordBatch(request);

                if (response.failedPutCount() > 0) {
                    response.requestResponses().stream()
                        .filter(r -> r.errorCode() != null)
                        .forEach(r -> System.err.println("Failed record: " + r.errorMessage()));
                }
                System.out.println("Batch sent with size: " + batchRecords.size());
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to put record batch: " + e.getMessage(), e);
        }
    }
```
+  有关 API 的详细信息，请参阅 *AWS SDK for Java 2.x API 参考[PutRecordBatch](https://docs.aws.amazon.com/goto/SdkForJavaV2/firehose-2015-08-04/PutRecordBatch)*中的。

## 场景
<a name="scenarios"></a>

### 将记录放入 Firehose
<a name="firehose_Scenario_PutRecords_java_topic"></a>

以下代码示例演示如何使用 Firehose 处理单个记录和批量记录。

**适用于 Java 的 SDK 2.x**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/javav2/example_code/firehose#code-examples)中查找完整示例，了解如何进行设置和运行。
此示例将单个记录和批量记录放入 Firehose。  

```
/**
 * Amazon Firehose Scenario example using Java V2 SDK.
 *
 * Demonstrates individual and batch record processing,
 * and monitoring Firehose delivery stream metrics.
 */
public class FirehoseScenario {

    private static FirehoseClient firehoseClient;
    private static CloudWatchClient cloudWatchClient;

    public static void main(String[] args) {
        final String usage = """
                Usage:
                    <deliveryStreamName>
                Where:
                    deliveryStreamName - The Firehose delivery stream name.
                """;

        if (args.length != 1) {
            System.out.println(usage);
            return;
        }

        String deliveryStreamName = args[0];

        try {
            // Read and parse sample data.
            String jsonContent = readJsonFile("sample_records.json");
            ObjectMapper objectMapper = new ObjectMapper();
            List<Map<String, Object>> sampleData = objectMapper.readValue(jsonContent, new TypeReference<>() {});

            // Process individual records.
            System.out.println("Processing individual records...");
            sampleData.subList(0, 100).forEach(record -> {
                try {
                    putRecord(record, deliveryStreamName);
                } catch (Exception e) {
                    System.err.println("Error processing record: " + e.getMessage());
                }
            });

            // Monitor metrics.
            monitorMetrics(deliveryStreamName);

            // Process batch records.
            System.out.println("Processing batch records...");
            putRecordBatch(sampleData.subList(100, sampleData.size()), 500, deliveryStreamName);
            monitorMetrics(deliveryStreamName);

        } catch (Exception e) {
            System.err.println("Scenario failed: " + e.getMessage());
        } finally {
            closeClients();
        }
    }

    private static FirehoseClient getFirehoseClient() {
        if (firehoseClient == null) {
            firehoseClient = FirehoseClient.builder()
                    .region(Region.US_EAST_1)
                    .build();
        }
        return firehoseClient;
    }

    private static CloudWatchClient getCloudWatchClient() {
        if (cloudWatchClient == null) {
            cloudWatchClient = CloudWatchClient.builder()
                    .region(Region.US_EAST_1)
                    .build();
        }
        return cloudWatchClient;
    }

    /**
     * Puts a record to the specified Amazon Kinesis Data Firehose delivery stream.
     *
     * @param record The record to be put to the delivery stream. The record must be a {@link Map} of String keys and Object values.
     * @param deliveryStreamName The name of the Amazon Kinesis Data Firehose delivery stream to which the record should be put.
     * @throws IllegalArgumentException if the input record or delivery stream name is null or empty.
     * @throws RuntimeException if there is an error putting the record to the delivery stream.
     */
    public static void putRecord(Map<String, Object> record, String deliveryStreamName) {
        if (record == null || deliveryStreamName == null || deliveryStreamName.isEmpty()) {
            throw new IllegalArgumentException("Invalid input: record or delivery stream name cannot be null/empty");
        }
        try {
            String jsonRecord = new ObjectMapper().writeValueAsString(record);
            Record firehoseRecord = Record.builder()
                .data(SdkBytes.fromByteArray(jsonRecord.getBytes(StandardCharsets.UTF_8)))
                .build();

            PutRecordRequest putRecordRequest = PutRecordRequest.builder()
                .deliveryStreamName(deliveryStreamName)
                .record(firehoseRecord)
                .build();

            getFirehoseClient().putRecord(putRecordRequest);
            System.out.println("Record sent: " + jsonRecord);
        } catch (Exception e) {
            throw new RuntimeException("Failed to put record: " + e.getMessage(), e);
        }
    }


    /**
     * Puts a batch of records to an Amazon Kinesis Data Firehose delivery stream.
     *
     * @param records           a list of maps representing the records to be sent
     * @param batchSize         the maximum number of records to include in each batch
     * @param deliveryStreamName the name of the Kinesis Data Firehose delivery stream
     * @throws IllegalArgumentException if the input parameters are invalid (null or empty)
     * @throws RuntimeException         if there is an error putting the record batch
     */
    public static void putRecordBatch(List<Map<String, Object>> records, int batchSize, String deliveryStreamName) {
        if (records == null || records.isEmpty() || deliveryStreamName == null || deliveryStreamName.isEmpty()) {
            throw new IllegalArgumentException("Invalid input: records or delivery stream name cannot be null/empty");
        }
        ObjectMapper objectMapper = new ObjectMapper();

        try {
            for (int i = 0; i < records.size(); i += batchSize) {
                List<Map<String, Object>> batch = records.subList(i, Math.min(i + batchSize, records.size()));

                List<Record> batchRecords = batch.stream().map(record -> {
                    try {
                        String jsonRecord = objectMapper.writeValueAsString(record);
                        return Record.builder()
                            .data(SdkBytes.fromByteArray(jsonRecord.getBytes(StandardCharsets.UTF_8)))
                            .build();
                    } catch (Exception e) {
                        throw new RuntimeException("Error creating Firehose record", e);
                    }
                }).collect(Collectors.toList());

                PutRecordBatchRequest request = PutRecordBatchRequest.builder()
                    .deliveryStreamName(deliveryStreamName)
                    .records(batchRecords)
                    .build();

                PutRecordBatchResponse response = getFirehoseClient().putRecordBatch(request);

                if (response.failedPutCount() > 0) {
                    response.requestResponses().stream()
                        .filter(r -> r.errorCode() != null)
                        .forEach(r -> System.err.println("Failed record: " + r.errorMessage()));
                }
                System.out.println("Batch sent with size: " + batchRecords.size());
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to put record batch: " + e.getMessage(), e);
        }
    }

    public static void monitorMetrics(String deliveryStreamName) {
        Instant endTime = Instant.now();
        Instant startTime = endTime.minusSeconds(600);

        List<String> metrics = List.of("IncomingBytes", "IncomingRecords", "FailedPutCount");
        metrics.forEach(metric -> monitorMetric(metric, startTime, endTime, deliveryStreamName));
    }

    private static void monitorMetric(String metricName, Instant startTime, Instant endTime, String deliveryStreamName) {
        try {
            GetMetricStatisticsRequest request = GetMetricStatisticsRequest.builder()
                .namespace("AWS/Firehose")
                .metricName(metricName)
                .dimensions(Dimension.builder().name("DeliveryStreamName").value(deliveryStreamName).build())
                .startTime(startTime)
                .endTime(endTime)
                .period(60)
                .statistics(Statistic.SUM)
                .build();

            GetMetricStatisticsResponse response = getCloudWatchClient().getMetricStatistics(request);
            double totalSum = response.datapoints().stream().mapToDouble(Datapoint::sum).sum();
            System.out.println(metricName + ": " + totalSum);
        } catch (Exception e) {
            System.err.println("Failed to monitor metric " + metricName + ": " + e.getMessage());
        }
    }

    public static String readJsonFile(String fileName) throws IOException {
        try (InputStream inputStream = FirehoseScenario.class.getResourceAsStream("/" + fileName);
             Scanner scanner = new Scanner(inputStream, StandardCharsets.UTF_8)) {
            return scanner.useDelimiter("\\\\A").next();
        } catch (Exception e) {
            throw new RuntimeException("Error reading file: " + fileName, e);
        }
    }

    private static void closeClients() {
        try {
            if (firehoseClient != null) firehoseClient.close();
            if (cloudWatchClient != null) cloudWatchClient.close();
        } catch (Exception e) {
            System.err.println("Error closing clients: " + e.getMessage());
        }
    }
}
```
+ 有关 API 详细信息，请参阅《AWS SDK for Java 2.x API Reference》**中的以下主题。
  + [PutRecord](https://docs.aws.amazon.com/goto/SdkForJavaV2/firehose-2015-08-04/PutRecord)
  + [PutRecordBatch](https://docs.aws.amazon.com/goto/SdkForJavaV2/firehose-2015-08-04/PutRecordBatch)