Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Simultanéité
Il Kit AWS SDK pour Rust ne fournit pas de contrôle de simultanéité, mais les utilisateurs ont de nombreuses options pour implémenter le leur.
Conditions
Les termes liés à ce sujet sont faciles à confondre et certains termes sont devenus synonymes même s'ils représentaient à l'origine des concepts distincts. Dans ce guide, nous allons définir les éléments suivants :
-
Tâche : Une « unité de travail » que votre programme exécutera ou tentera d'exécuter jusqu'à la fin.
-
Calcul séquentiel : lorsque plusieurs tâches sont exécutées les unes après les autres.
-
Calcul simultané : lorsque plusieurs tâches sont exécutées pendant des périodes qui se chevauchent.
-
Concurrence : capacité d'un ordinateur à effectuer plusieurs tâches dans un ordre arbitraire.
-
Multitâche : capacité d'un ordinateur à exécuter plusieurs tâches simultanément.
-
Condition de course : lorsque le comportement de votre programme change en fonction du moment où une tâche est démarrée ou du temps nécessaire pour traiter une tâche.
-
Conflit : conflit concernant l'accès à une ressource partagée. Lorsque deux tâches ou plus souhaitent accéder à une ressource en même temps, cette ressource est « en conflit ».
-
Impasse : état dans lequel il n'est plus possible de progresser. Cela se produit généralement parce que deux tâches veulent acquérir les ressources de l'autre, mais aucune des tâches ne libérera ses ressources tant que les ressources de l'autre ne seront pas disponibles. Les blocages font en sorte qu'un programme ne répond pas totalement ou partiellement.
Un exemple simple
Notre premier exemple est un programme séquentiel. Dans les exemples suivants, nous modifierons ce code à l'aide de techniques de simultanéité. Les exemples suivants réutilisent la même build_client_and_list_objects_to_download()
méthode et y apportent des modificationsmain()
.
L'exemple de tâche suivant consiste à télécharger tous les fichiers d'un bucket Amazon Simple Storage Service :
-
Commencez par répertorier tous les fichiers. Enregistrez les clés dans une liste.
-
Parcourez la liste en téléchargeant chaque fichier à tour de rôle
const EXAMPLE_BUCKET: &str = "<an-example-bucket>"; // This initialization function won't be reproduced in // examples following this one, in order to save space. async fn build_client_and_list_objects_to_download() { let cfg = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; let client = Client::new(&cfg); let objects_to_download: Vec<_> = client .list_objects_v2() .bucket(EXAMPLE_BUCKET) .send() .await .expect("listing objects succeeds") .contents() .into_iter() .flat_map(aws_sdk_s3::types::Object::key) .map(ToString::to_string) .collect(); (client, objects_to_download) }
#[tokio::main] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; for object in objects_to_download { let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("reading body succeeds").into_bytes(); std::fs::write(object, body).expect("write succeeds"); } }
Note
Dans ces exemples, nous ne gérerons pas les erreurs et nous partons du principe que le bucket d'exemple ne contient aucun objet dont les clés ressemblent à des chemins de fichiers. Nous n'aborderons donc pas la création de répertoires imbriqués.
Grâce à l'architecture des ordinateurs modernes, nous pouvons réécrire ce programme pour qu'il soit beaucoup plus efficace. Nous le ferons dans un exemple ultérieur, mais commençons par apprendre quelques concepts supplémentaires.
Propriété et mutabilité
Chaque valeur dans Rust a un seul propriétaire. Lorsqu'un propriétaire sort du champ d'application, toutes les valeurs qu'il possède sont également supprimées. Le propriétaire peut fournir une ou plusieurs références immuables à une valeur ou une seule référence mutable. Le compilateur Rust est chargé de s'assurer qu'aucune référence ne survit à son propriétaire.
Une planification et une conception supplémentaires sont nécessaires lorsque plusieurs tâches doivent accéder de manière mutable à la même ressource. Dans le calcul séquentiel, chaque tâche peut accéder de manière mutable à la même ressource sans conflit, car elles s'exécutent les unes après les autres dans une séquence. Toutefois, dans le cadre du calcul simultané, les tâches peuvent être exécutées dans n'importe quel ordre et en même temps. Par conséquent, nous devons faire davantage pour prouver au compilateur que de multiples références mutables sont impossibles (ou du moins qu'elles peuvent se bloquer si elles se produisent).
La bibliothèque standard Rust fournit de nombreux outils pour nous aider à y parvenir. Pour plus d'informations sur ces sujets, consultez le livre Variables and Mutability
Plus de termes !
Vous trouverez ci-dessous des listes d' « objets de synchronisation ». Ensemble, ce sont les outils nécessaires pour convaincre le compilateur que notre programme concurrent n'enfreindra pas les règles de propriété.
Objets de synchronisation de bibliothèques standard
-
Arc
: un pointeur compté par une référence R atomique. Lorsque les données sont encapsulées dans un Arc
, elles peuvent être partagées librement, sans craindre qu'un propriétaire en particulier n'en perde la valeur plus tôt. En ce sens, la propriété de la valeur devient « partagée ». Les valeurs comprises dans unArc
ne peuvent pas être mutables, mais peuvent présenter une mutabilité interne. -
Barrière
: garantit que plusieurs threads attendront l'un de l'autre pour atteindre un point du programme avant de poursuivre l'exécution en même temps. -
Condvar
: variable de condition permettant de bloquer un thread en attendant qu'un événement se produise. -
Mutex
: un mécanisme d'exclusion mutuelle qui garantit qu'au plus un thread à la fois est en mesure d'accéder à certaines données. D'une manière générale, un Mutex
cadenas ne doit jamais être maintenu en travers d'un.await
point du code.
Objets de synchronisation Tokio
Bien qu' AWS SDKs ils soient conçus pour être indépendants de l'async
exécution, nous recommandons l'utilisation d'objets de tokio
synchronisation dans des cas spécifiques.
Réécriture de notre exemple pour le rendre plus efficace (simultanéité à un seul thread)
Dans l'exemple modifié suivant, nous avons l'habitude futures_util::future::join_all
get_object
demandes simultanément.
#[tokio::main] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let get_object_futures = objects_to_download.into_iter().map(|object| { let req = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET); async { let res = req .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); // Note that we MUST use the async runtime's preferred way // of writing files. Otherwise, this call would block, // potentially causing a deadlock. tokio::fs::write(object, body).await.expect("write succeeds"); } }); futures_util::future::join_all(get_object_futures).await; }
C'est le moyen le plus simple de tirer parti de la simultanéité, mais il présente également quelques problèmes qui peuvent ne pas être évidents à première vue :
-
Nous créons toutes les entrées de demande en même temps. Si nous n'avons pas assez de mémoire pour contenir toutes les entrées de
get_object
demande, nous allons rencontrer une erreur d'allocation out-of-memory « ». -
Nous créons et attendons tous les avenirs en même temps. Amazon S3 limite les demandes si nous essayons d'en télécharger trop à la fois.
Pour résoudre ces deux problèmes, nous devons limiter le nombre de demandes que nous envoyons à la fois. Nous allons le faire avec un tokio
sémaphore
const CONCURRENCY_LIMIT: usize = 50; #[tokio::main(flavor = "current_thread")] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let concurrency_semaphore = Arc::new(Semaphore::new(CONCURRENCY_LIMIT)); let get_object_futures = objects_to_download.into_iter().map(|object| { // Since each future needs to acquire a permit, we need to clone // the Arc'd semaphore before passing it in. let semaphore = concurrency_semaphore.clone(); // We also need to clone the client so each task has its own handle. let client = client.clone(); async move { let permit = semaphore .acquire() .await .expect("we'll get a permit if we wait long enough"); let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); tokio::fs::write(object, body).await.expect("write succeeds"); std::mem::drop(permit); } }); futures_util::future::join_all(get_object_futures).await; }
Nous avons résolu le problème potentiel d'utilisation de la mémoire en déplaçant la création de la demande dans le async
bloc. De cette façon, les demandes ne seront créées qu'au moment de les envoyer.
Note
Si vous avez la mémoire nécessaire, il peut être plus efficace de créer toutes vos entrées de demande en une seule fois et de les conserver en mémoire jusqu'à ce qu'elles soient prêtes à être envoyées. Pour essayer cela, déplacez la création d'entrées de demande en dehors du async
bloc.
Nous avons également résolu le problème lié à l'envoi d'un trop grand nombre de demandes à la fois en limitant le nombre de demandes en cours àCONCURRENCY_LIMIT
.
Note
La bonne valeur pour CONCURRENCY_LIMIT
est différente pour chaque projet. Lorsque vous créez et envoyez vos propres demandes, essayez de les définir le plus haut possible sans rencontrer d'erreurs de régulation. Bien qu'il soit possible de mettre à jour dynamiquement votre limite de simultanéité en fonction du ratio de réponses réussies par rapport aux réponses limitées renvoyées par un service, cela n'entre pas dans le cadre de ce guide en raison de sa complexité.
Réécriture de notre exemple pour le rendre plus efficace (simultanéité multithread)
Dans les deux exemples précédents, nous avons effectué nos demandes simultanément. Bien que cela soit plus efficace que de les exécuter de manière synchrone, nous pouvons rendre les choses encore plus efficaces en utilisant le multithreading. Pour ce fairetokio
, nous devrons les créer sous forme de tâches distinctes.
Note
Cet exemple nécessite que vous utilisiez le runtime multithread. tokio
Ce runtime est bloqué par la rt-multi-thread
fonctionnalité. Et, bien entendu, vous devrez exécuter votre programme sur une machine multicœur.
// Set this based on the amount of cores your target machine has. const THREADS: usize = 8; #[tokio::main(flavor = "multi_thread")] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let concurrency_semaphore = Arc::new(Semaphore::new(THREADS)); let get_object_task_handles = objects_to_download.into_iter().map(|object| { // Since each future needs to acquire a permit, we need to clone // the Arc'd semaphore before passing it in. let semaphore = concurrency_semaphore.clone(); // We also need to clone the client so each task has its own handle. let client = client.clone(); // Note this difference! We're using `tokio::task::spawn` to // immediately begin running these requests. tokio::task::spawn(async move { let permit = semaphore .acquire() .await .expect("we'll get a permit if we wait long enough"); let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); tokio::fs::write(object, body).await.expect("write succeeds"); std::mem::drop(permit); }) }); futures_util::future::join_all(get_object_task_handles).await; }
Diviser le travail en tâches peut s'avérer complexe. Les opérations d'E/S (entrée/sortie) sont généralement bloquantes. Les environnements d'exécution peuvent avoir du mal à trouver un équilibre entre les besoins des tâches de longue durée et ceux des tâches de courte durée. Quel que soit l'environnement d'exécution que vous choisissez, assurez-vous de lire leurs recommandations pour savoir comment diviser votre travail en tâches le plus efficacement possible. Pour les recommandations tokio
d'exécution, voir Module tokio::task
Débogage d'applications multithread
Les tâches exécutées simultanément peuvent être exécutées dans n'importe quel ordre. Ainsi, les journaux des programmes concurrents peuvent être très difficiles à lire. Dans le SDK pour Rust, nous recommandons d'utiliser le système de tracing
journalisation. Il peut regrouper les journaux avec leurs tâches spécifiques, quel que soit le moment où ils sont exécutés. Pour de plus amples informations, consultez Activer la journalisation du Kit AWS SDK pour Rust code.
Un outil très utile pour identifier les tâches bloquées est tokio-console
tokio-console
application, vous pouvez voir en direct les tâches exécutées par votre programme. Cette vue inclut des informations utiles telles que le temps passé par une tâche à attendre avant d'acquérir des ressources partagées ou le nombre de fois où elle a été interrogée.