AWS SDK for PHP 版本 3 中的 Promise - AWS SDK for PHP

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

AWS SDK for PHP 版本 3 中的 Promise

AWS SDK for PHP使用 Promise 支持异步工作流,这种异步性允许同时发送 HTTP 请求。开发工具包使用的 Promise 规范为 Promises/A+

什么是 Promise?

Promise 表示异步操作的最终结果。与 Promise 交互的主要方式是通过其 then 方法。此方法注册回调以接收 Promise 的最终值或无法执行 Promise 的原因。

AWS SDK for PHP依赖 guzzlehttp/promises Composer 程序包来实施 Promise。Guzzle Promise 支持阻止和非阻止性工作流,并可与任何非阻止性事件循环一起使用。

注意

使用单一线程在 AWS SDK for PHP 中同时发送 HTTP 请求,其中非阻止性调用用于在响应状态更改(例如,执行或拒绝 Promise)时传输一个或多个 HTTP 请求。

开发工具包中的 Promise

Promise 的使用贯穿整个开发工具包。例如,Promise 用于开发工具包提供的大多数高级别抽象化处理:PaginatorWaiter命令池分段上传S3 目录/存储桶传输,等等。

当您调用任何 Async 后缀的方法时,该开发工具包提供的所有客户端均返回 Promise。例如,以下代码展示了如何创建 Promise 以获取 Amazon DynamoDBDescribeTable 操作的结果。

$client = new Aws\DynamoDb\DynamoDbClient([ 'region' => 'us-west-2', 'version' => 'latest', ]); // This will create a promise that will eventually contain a result $promise = $client->describeTableAsync(['TableName' => 'mytable']);

请注意,您可以调用 describeTabledescribeTableAsync。这些方法是客户端上的魔术 __call 方法,受到与该客户端关联的 API 模型和 version 号的支持。通过调用 describeTable 之类的没有 Async 后缀的方法,客户端将阻止其发送 HTTP 请求,并返回 Aws\ResultInterface 对象或引发 Aws\Exception\AwsException。通过在操作名称后面添加 Async 后缀(即 describeTableAsync),客户端将创建一个最终使用 Aws\ResultInterface 对象执行或因 Aws\Exception\AwsException 而被拒绝的 Promise。

重要

返回 Promise 时,结果可能已到达(例如,使用模拟处理程序时),或者 HTTP 请求可能未被启动。

您可以通过使用 then 方法向 Promise 注册回调。此方法接受两个回调($onFulfilled$onRejected),两者均为可选项。如果执行 Promise,则调用 $onFulfilled 回调;如果 Promise 被拒绝(表示失败),则调用 $onRejected 回调。

$promise->then( function ($value) { echo "The promise was fulfilled with {$value}"; }, function ($reason) { echo "The promise was rejected with {$reason}"; } );

同时执行命令

多个 Promise 可以组合在一起,以便同时执行。这可通过将开发工具包与非阻止性事件循环集成或者通过构建多个 Promise 并等待它们同时完成来实现。

use GuzzleHttp\Promise\Utils; $sdk = new Aws\Sdk([ 'version' => 'latest', 'region' => 'us-east-1' ]); $s3 = $sdk->createS3(); $ddb = $sdk->createDynamoDb(); $promises = [ 'buckets' => $s3->listBucketsAsync(), 'tables' => $ddb->listTablesAsync(), ]; // Wait for both promises to complete. $results = Utils::unwrap($promises); // Notice that this method will maintain the input array keys. var_dump($results['buckets']->toArray()); var_dump($results['tables']->toArray());
注意

CommandPool 提供更强大的机制用于同时执行多个 API 操作。

串联 Promise

Promise 非常棒的一个方面是它们可以组合,从而允许您创建转换管道。Promise 是通过将 then 回调与后续 then 回调串联在一起而构成的。then 方法的返回值是根据所提供回调的结果而执行或拒绝的 Promise。

$promise = $client->describeTableAsync(['TableName' => 'mytable']); $promise ->then( function ($value) { $value['AddedAttribute'] = 'foo'; return $value; }, function ($reason) use ($client) { // The call failed. You can recover from the error here and // return a value that will be provided to the next successful // then() callback. Let's retry the call. return $client->describeTableAsync(['TableName' => 'mytable']); } )->then( function ($value) { // This is only invoked when the previous then callback is // fulfilled. If the previous callback returned a promise, then // this callback is invoked only after that promise is // fulfilled. echo $value['AddedAttribute']; // outputs "foo" }, function ($reason) { // The previous callback was rejected (failed). } );
注意

Promise 回调的返回值是提供给下游 Promise 的 $value 参数。如果您想为下游 Promise 链提供一个值,必须在回调函数中返回一个值。

拒绝转发

您可以注册在 Promise 被拒绝时要调用的回调。如果任何回调引发异常,Promise 会因该异常而被拒绝,链中接下来的 Promise 也会因该异常而被拒绝。如果您成功从 $onRejected 回调返回一个值,则 Promise 链中接下来的 Promise 将使用 $onRejected 回调的返回值执行。

正在等待 Promise

您可以通过使用 Promise 的 wait 方法来同步强制完成 Promise。

$promise = $client->listTablesAsync(); $result = $promise->wait();

如果在调用 wait 函数时遇到异常,Promise 将因该异常而被拒绝,并且会引发该异常。

use Aws\Exception\AwsException; $promise = $client->listTablesAsync(); try { $result = $promise->wait(); } catch (AwsException $e) { // Handle the error }

对已执行的 Promise 调用 wait 不会触发 wait 函数。只是返回之前提供的值。

$promise = $client->listTablesAsync(); $result = $promise->wait(); assert($result ### $promise->wait());

对已拒绝的 Promise 调用 wait 会引发异常。如果拒绝原因是 \Exception 的实例,则会引发该原因。否则会引发 GuzzleHttp\Promise\RejectionException,并且该原因可通过调用异常的 getReason 方法获得。

注意

AWS SDK for PHP中的 API 操作调用因 Aws\Exception\AwsException 类的子类而被拒绝。但是,由于添加了会修改拒绝原因的自定义中间件,因此发送到 then 方法的原因可能会不同。

取消 Promise

可以使用 Promise 的 cancel() 方法取消 Promise。如果 Promise 已解析,则调用 cancel() 将没有任何作用。取消 Promise 会取消该 Promise 以及任何等待从该 Promise 发送的 Promise。已取消的 Promise 被拒绝并显示 GuzzleHttp\Promise\RejectionException

组合 Promise

您可以将 Promise 组合成聚合 Promise,以构建更复杂的工作流。guzzlehttp/promise 程序包包含各种可用于组合 Promise 的函数。

您可以在 namespace-GuzzleHttp.Promise 中找到适用于所有 Promise 集合函数的 API 文档。

each 和 each_limit

当您需要同时执行 Aws\CommandInterface 命令的任务队列并且池大小固定(这些命令可位于内存中,也可由延迟迭代器生成)时,请使用 CommandPoolCommandPool 可确保同时发送固定数量的命令,直到提供的迭代器用尽。

CommandPool 只使用由同一客户端执行的命令。您可以使用 GuzzleHttp\Promise\each_limit 函数同时执行不同客户端的发送命令(使用固定的池大小)。

use GuzzleHttp\Promise; $sdk = new Aws\Sdk([ 'version' => 'latest', 'region' => 'us-west-2' ]); $s3 = $sdk->createS3(); $ddb = $sdk->createDynamoDb(); // Create a generator that yields promises $promiseGenerator = function () use ($s3, $ddb) { yield $s3->listBucketsAsync(); yield $ddb->listTablesAsync(); // yield other promises as needed... }; // Execute the tasks yielded by the generator concurrently while limiting the // maximum number of concurrent promises to 5 $promise = Promise\each_limit($promiseGenerator(), 5); // Waiting on an EachPromise will wait on the entire task queue to complete $promise->wait();

Promise 协同例程

Guzzle Promise 库的一个更强大的功能是允许您使用协同例程 Promise,这使编写异步工作流似乎更像编写传统同步工作流。事实上,AWS SDK for PHP 在大多数高级别抽象化处理中使用协同例程 Promise。

假设您要创建多个存储桶并在存储桶变为可用时将文件上传到存储桶,并且您想同时执行这些操作,以便尽快完成操作。您可以通过使用 all() Promise 函数将多个协同例程 Promise 组合在一起,从而轻松完成这一操作。

use GuzzleHttp\Promise; $uploadFn = function ($bucket) use ($s3Client) { return Promise\coroutine(function () use ($bucket, $s3Client) { // You can capture the result by yielding inside of parens $result = (yield $s3Client->createBucket(['Bucket' => $bucket])); // Wait on the bucket to be available $waiter = $s3Client->getWaiter('BucketExists', ['Bucket' => $bucket]); // Wait until the bucket exists yield $waiter->promise(); // Upload a file to the bucket yield $s3Client->putObjectAsync([ 'Bucket' => $bucket, 'Key' => '_placeholder', 'Body' => 'Hi!' ]); }); }; // Create the following buckets $buckets = ['foo', 'baz', 'bar']; $promises = []; // Build an array of promises foreach ($buckets as $bucket) { $promises[] = $uploadFn($bucket); } // Aggregate the promises into a single "all" promise $aggregate = Promise\all($promises); // You can then() off of this promise or synchronously wait $aggregate->wait();