

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用将直播上传到 Amazon S3 AWS SDK for Java 2.x
<a name="best-practices-s3-uploads"></a>

当您通过 [https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/S3Client.html#putObject(software.amazon.awssdk.services.s3.model.PutObjectRequest,software.amazon.awssdk.core.sync.RequestBody)](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/S3Client.html#putObject(software.amazon.awssdk.services.s3.model.PutObjectRequest,software.amazon.awssdk.core.sync.RequestBody)) 或 [https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/S3Client.html#uploadPart(software.amazon.awssdk.services.s3.model.UploadPartRequest,software.amazon.awssdk.core.sync.RequestBody)](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/S3Client.html#uploadPart(software.amazon.awssdk.services.s3.model.UploadPartRequest,software.amazon.awssdk.core.sync.RequestBody)) 方法，使用流将内容上传到 S3 时，可以使用 `RequestBody` 工厂类作为同步 API 来提供流。对于异步 API，`AsyncRequestBody` 是等效的工厂类。

## 哪些方法可上传流？
<a name="s3-stream-upload-methods"></a>

对于同步 API，您可以使用 `RequestBody` 的以下工厂方法来提供流：
+ `[fromInputStream](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/core/sync/RequestBody.html#fromInputStream(java.io.InputStream,long))(InputStream inputStream, long contentLength)`

  `[fromContentProvider](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/core/sync/RequestBody.html#fromContentProvider(software.amazon.awssdk.http.ContentStreamProvider,long,java.lang.String))(ContentStreamProvider provider, long contentLength, String mimeType)`
  + `ContentStreamProvider` 有 `fromInputStream(InputStream inputStream)` 工厂方法
+ `[fromContentProvider](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/core/sync/RequestBody.html#fromContentProvider(software.amazon.awssdk.http.ContentStreamProvider,java.lang.String))(ContentStreamProvider provider, String mimeType)`

对于异步 API，您可以使用 `AsyncRequestBody` 的以下工厂方法：
+ `[fromInputStream](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/core/async/AsyncRequestBody.html#fromInputStream(java.io.InputStream,java.lang.Long,java.util.concurrent.ExecutorService))(InputStream inputStream, Long contentLength, ExecutorService executor)` 
+ `[fromInputStream](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/core/async/AsyncRequestBody.html#fromInputStream(software.amazon.awssdk.core.async.AsyncRequestBodyFromInputStreamConfiguration))(AsyncRequestBodyFromInputStreamConfiguration configuration)`
  + 你可以使用 AsyncRequestBodyFromInputStreamConfiguration .Builder 来提供直播
+ `[fromInputStream](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/core/async/AsyncRequestBody.html#fromInputStream(java.util.function.Consumer))(Consumer<AsyncRequestBodyFromInputStreamConfiguration.Builder> configuration)`
+ `[forBlockingInputStream](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/core/async/AsyncRequestBody.html#forBlockingInputStream(java.lang.Long))(Long contentLength)`
  + 生成的 `[BlockingInputStreamAsyncRequestBody](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/core/async/BlockingInputStreamAsyncRequestBody.html)` 包含 `writeInputStream(InputStream inputStream)` 方法，您可以使用该方法来提供流

## 执行上传
<a name="s3-upload-stream-perform"></a>

### 如果您知道流的长度
<a name="s3-stream-upload-supply-content-length"></a>

从前面所示方法的签名中可以看出，大多数方法都接受内容长度参数。

如果您知道以字节为单位的内容长度，请提供确切值：

```
// Always provide the exact content length when it's available.
long contentLength = 1024; // Exact size in bytes.
s3Client.putObject(req -> req
    .bucket("amzn-s3-demo-bucket")
    .key("my-key"),
RequestBody.fromInputStream(inputStream, contentLength));
```

**警告**  
 当您从输入流上传时，如果指定的内容长度与实际字节数不匹配，则您可能会遇到以下情况：  
如果指定的长度太小，则将截断对象
如果指定的长度太大，则上传失败或连接挂起

### 如果您不知道流的长度
<a name="s3-stream-upload-unknown-length"></a>

#### 使用同步 API
<a name="s3-upload-unknown-sync-client"></a>

使用 `fromContentProvider(ContentStreamProvider provider, String mimeType)`：

```
public PutObjectResponse syncClient_stream_unknown_size(String bucketName, String key, InputStream inputStream) {

    S3Client s3Client = S3Client.create();

    RequestBody body = RequestBody.fromContentProvider(ContentStreamProvider.fromInputStream(inputStream), "text/plain");
    PutObjectResponse putObjectResponse = s3Client.putObject(b -> b.bucket(BUCKET_NAME).key(KEY_NAME), body);
    return putObjectResponse;
}
```

由于 SDK 会将整个流缓冲到内存中来计算内容长度，所以在处理大型流时，可能会遇到内存不足的问题。如果您需要使用同步客户端上传大型流，请考虑使用分段 API：

##### 使用同步客户端 API 和分段 API 上传流
<a name="sync-multipart-upload-stream"></a>

```
public static void uploadStreamToS3(String bucketName, String key, InputStream inputStream) {
    // Create S3 client
    S3Client s3Client = S3Client.create();
    try {
        // Step 1: Initiate the multipart upload
        CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder()
                .bucket(bucketName)
                .key(key)
                .build();

        CreateMultipartUploadResponse createResponse = s3Client.createMultipartUpload(createMultipartUploadRequest);
        String uploadId = createResponse.uploadId();
        System.out.println("Started multipart upload with ID: " + uploadId);

        // Step 2: Upload parts
        List<CompletedPart> completedParts = new ArrayList<>();
        int partNumber = 1;
        byte[] buffer = new byte[PART_SIZE];
        int bytesRead;

        try {
            while ((bytesRead = readFullyOrToEnd(inputStream, buffer)) > 0) {
                // Create request to upload a part
                UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
                        .bucket(bucketName)
                        .key(key)
                        .uploadId(uploadId)
                        .partNumber(partNumber)
                        .build();

                // If we didn't read a full buffer, create a properly sized byte array
                RequestBody requestBody;
                if (bytesRead < PART_SIZE) {
                    byte[] lastPartBuffer = new byte[bytesRead];
                    System.arraycopy(buffer, 0, lastPartBuffer, 0, bytesRead);
                    requestBody = RequestBody.fromBytes(lastPartBuffer);
                } else {
                    requestBody = RequestBody.fromBytes(buffer);
                }

                // Upload the part and save the response's ETag
                UploadPartResponse uploadPartResponse = s3Client.uploadPart(uploadPartRequest, requestBody);
                CompletedPart part = CompletedPart.builder()
                        .partNumber(partNumber)
                        .eTag(uploadPartResponse.eTag())
                        .build();
                completedParts.add(part);

                System.out.println("Uploaded part " + partNumber + " with size " + bytesRead + " bytes");
                partNumber++;
            }

            // Step 3: Complete the multipart upload
            CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder()
                    .parts(completedParts)
                    .build();

            CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder()
                    .bucket(bucketName)
                    .key(key)
                    .uploadId(uploadId)
                    .multipartUpload(completedMultipartUpload)
                    .build();

            CompleteMultipartUploadResponse completeResponse = s3Client.completeMultipartUpload(completeRequest);
            System.out.println("Multipart upload completed. Object URL: " + completeResponse.location());

        } catch (Exception e) {
            // If an error occurs, abort the multipart upload
            System.err.println("Error during multipart upload: " + e.getMessage());
            AbortMultipartUploadRequest abortRequest = AbortMultipartUploadRequest.builder()
                    .bucket(bucketName)
                    .key(key)
                    .uploadId(uploadId)
                    .build();
            s3Client.abortMultipartUpload(abortRequest);
            System.err.println("Multipart upload aborted");
        } finally {
            try {
                inputStream.close();
            } catch (IOException e) {
                System.err.println("Error closing input stream: " + e.getMessage());
            }
        }
    } finally {
        s3Client.close();
    }
}

/**
 * Reads from the input stream into the buffer, attempting to fill the buffer completely
 * or until the end of the stream is reached.
 *
 * @param inputStream the input stream to read from
 * @param buffer      the buffer to fill
 * @return the number of bytes read, or -1 if the end of the stream is reached before any bytes are read
 * @throws IOException if an I/O error occurs
 */
private static int readFullyOrToEnd(InputStream inputStream, byte[] buffer) throws IOException {
    int totalBytesRead = 0;
    int bytesRead;

    while (totalBytesRead < buffer.length) {
        bytesRead = inputStream.read(buffer, totalBytesRead, buffer.length - totalBytesRead);
        if (bytesRead == -1) {
            break; // End of stream
        }
        totalBytesRead += bytesRead;
    }

    return totalBytesRead > 0 ? totalBytesRead : -1;
}
```

**注意**  
对于大多数使用案例，建议为未知大小的流使用异步客户端 API。这种方法支持并行传输，并提供更简单的编程接口，因为如果流很大，SDK 在处理流时会将其分割成多个分段。  
启用了分段功能的标准 S3 异步客户端和基于 AWS CRT 的 S3 客户端都实现了这种方法。我们将在下一节介绍此方法的示例。

#### 使用异步 API
<a name="s3-stream-upload-unknown-async-client"></a>

您可以将 `contentLength` 参数 `null` 提供给 `fromInputStream(InputStream inputStream, Long contentLength, ExecutorService executor)`

**Example 使用基于 C AWS RT 的异步客户端：**  

```
public PutObjectResponse crtClient_stream_unknown_size(String bucketName, String key, InputStream inputStream) {

    S3AsyncClient s3AsyncClient = S3AsyncClient.crtCreate();
    ExecutorService executor = Executors.newSingleThreadExecutor();
    AsyncRequestBody body = AsyncRequestBody.fromInputStream(inputStream, null, executor);  // 'null' indicates that the
                                                                                            // content length is unknown.
    CompletableFuture<PutObjectResponse> responseFuture =
            s3AsyncClient.putObject(r -> r.bucket(bucketName).key(key), body)
                    .exceptionally(e -> {
                        if (e != null){
                            logger.error(e.getMessage(), e);
                        }
                        return null;
                    });

    PutObjectResponse response = responseFuture.join(); // Wait for the response.
    executor.shutdown();
    return response;
}
```

**Example 使用启用了分段的标准异步客户端：**  

```
public PutObjectResponse asyncClient_multipart_stream_unknown_size(String bucketName, String key, InputStream inputStream) {

    S3AsyncClient s3AsyncClient = S3AsyncClient.builder().multipartEnabled(true).build();
    ExecutorService executor = Executors.newSingleThreadExecutor();
    AsyncRequestBody body = AsyncRequestBody.fromInputStream(inputStream, null, executor); // 'null' indicates that the
                                                                                           // content length is unknown.
    CompletableFuture<PutObjectResponse> responseFuture =
            s3AsyncClient.putObject(r -> r.bucket(bucketName).key(key), body)
                    .exceptionally(e -> {
                        if (e != null) {
                            logger.error(e.getMessage(), e);
                        }
                        return null;
                    });

    PutObjectResponse response = responseFuture.join(); // Wait for the response.
    executor.shutdown();
    return response;
}
```