Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Konkurensi
AWS SDK for Rust Itu tidak memberikan kontrol konkurensi tetapi pengguna memiliki banyak opsi untuk mengimplementasikannya sendiri.
Ketentuan
Istilah yang terkait dengan subjek ini mudah membingungkan dan beberapa istilah telah menjadi sinonim meskipun awalnya mewakili konsep yang terpisah. Dalam panduan ini, kami akan mendefinisikan yang berikut:
-
Tugas: Beberapa “unit kerja” yang program Anda akan jalankan sampai selesai, atau mencoba untuk berjalan sampai selesai.
-
Sequential Computing: Ketika beberapa tugas dijalankan satu demi satu.
-
Komputasi Bersamaan: Ketika beberapa tugas dijalankan dalam periode waktu yang tumpang tindih.
-
Konkurensi: Kemampuan komputer untuk menyelesaikan banyak tugas dalam urutan yang sewenang-wenang.
-
Multitasking: Kemampuan komputer untuk menjalankan beberapa tugas secara bersamaan.
-
Kondisi Balapan: Ketika perilaku program Anda berubah berdasarkan kapan tugas dimulai atau berapa lama waktu yang dibutuhkan untuk memproses tugas.
-
Konflik: Konflik atas akses ke sumber daya bersama. Ketika dua atau lebih tugas ingin mengakses sumber daya pada saat yang sama, sumber daya itu “dalam pertengkaran”.
-
Deadlock: Keadaan di mana tidak ada lagi kemajuan yang dapat dibuat. Ini biasanya terjadi karena dua tugas ingin memperoleh sumber daya masing-masing tetapi tidak ada tugas yang akan melepaskan sumber daya mereka sampai sumber daya yang lain tersedia. Kebuntuan menyebabkan program menjadi sebagian atau seluruhnya tidak responsif.
Contoh sederhana
Contoh pertama kami adalah program sekuensial. Dalam contoh selanjutnya, kita akan mengubah kode ini menggunakan teknik konkurensi. Contoh selanjutnya menggunakan kembali build_client_and_list_objects_to_download()
metode yang sama dan membuat perubahan di dalamnyamain()
.
Contoh tugas berikut adalah mengunduh semua file di bucket Amazon Simple Storage Service:
-
Mulailah dengan mencantumkan semua file. Simpan kunci dalam daftar.
-
Ulangi daftar, unduh setiap file secara bergantian
const EXAMPLE_BUCKET: &str = "<an-example-bucket>"; // This initialization function won't be reproduced in // examples following this one, in order to save space. async fn build_client_and_list_objects_to_download() { let cfg = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; let client = Client::new(&cfg); let objects_to_download: Vec<_> = client .list_objects_v2() .bucket(EXAMPLE_BUCKET) .send() .await .expect("listing objects succeeds") .contents() .into_iter() .flat_map(aws_sdk_s3::types::Object::key) .map(ToString::to_string) .collect(); (client, objects_to_download) }
#[tokio::main] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; for object in objects_to_download { let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("reading body succeeds").into_bytes(); std::fs::write(object, body).expect("write succeeds"); } }
catatan
Dalam contoh ini, kami tidak akan menangani kesalahan, dan kami berasumsi bahwa bucket contoh tidak memiliki objek dengan kunci yang terlihat seperti jalur file. Dengan demikian, kita tidak akan membahas pembuatan direktori bersarang.
Karena arsitektur komputer modern, kita dapat menulis ulang program ini menjadi jauh lebih efisien. Kita akan melakukannya dalam contoh selanjutnya, tetapi pertama-tama, mari kita pelajari beberapa konsep lagi.
Kepemilikan dan mutabilitas
Setiap nilai di Rust memiliki satu pemilik. Ketika pemilik keluar dari ruang lingkup, semua nilai yang dimilikinya juga akan dihapus. Pemilik dapat memberikan satu atau lebih referensi yang tidak dapat diubah ke nilai atau referensi tunggal yang dapat berubah. Kompiler Rust bertanggung jawab untuk memastikan bahwa tidak ada referensi yang hidup lebih lama dari pemiliknya.
Perencanaan dan desain tambahan diperlukan ketika beberapa tugas perlu mengakses sumber daya yang sama secara berubah-ubah. Dalam komputasi sekuensial, setiap tugas dapat mengakses sumber daya yang sama tanpa pertengkaran karena mereka berjalan satu demi satu secara berurutan. Namun, dalam komputasi bersamaan, tugas dapat berjalan dalam urutan apa pun, dan pada saat yang sama. Oleh karena itu, kita harus berbuat lebih banyak untuk membuktikan kepada kompiler bahwa beberapa referensi yang dapat berubah tidak mungkin (atau setidaknya macet jika memang terjadi).
Pustaka standar Rust menyediakan banyak alat untuk membantu kami mencapai hal ini. Untuk informasi lebih lanjut tentang topik ini, lihat Variabel dan Mutabilitas dan
Lebih banyak istilah!
Berikut ini adalah daftar “objek sinkronisasi”. Secara keseluruhan, mereka adalah alat yang diperlukan untuk meyakinkan kompiler bahwa program bersamaan kami tidak akan melanggar aturan kepemilikan.
Objek sinkronisasi perpustakaan standar
-
Busur
: Penunjuk yang dimatikan R eferensi-C secara tomik. Ketika data dibungkus dalam sebuah Arc
, itu dapat dibagikan secara bebas, tanpa khawatir pemilik tertentu menjatuhkan nilai lebih awal. Dalam pengertian ini, kepemilikan nilai menjadi “dibagikan”. Nilai dalam anArc
tidak dapat berubah, tetapi mungkin memiliki mutabilitas interior. -
Barrier
: Memastikan beberapa thread akan menunggu satu sama lain untuk mencapai titik dalam program, sebelum melanjutkan eksekusi bersama-sama. -
Condvar
: a Cond ition Var iable menyediakan kemampuan untuk memblokir utas sambil menunggu peristiwa terjadi. -
Mutex
: mekanisme klusi Mut ual Ex yang memastikan bahwa paling banyak satu utas pada satu waktu dapat mengakses beberapa data. Secara umum, Mutex
kunci tidak boleh dipegang melintasi.await
titik dalam kode.
Objek sinkronisasi Tokio
Meskipun AWS SDKs dimaksudkan untuk menjadi async
-runtime-agnostik, kami merekomendasikan penggunaan objek tokio
sinkronisasi untuk kasus tertentu.
Menulis ulang contoh kita menjadi lebih efisien (konkurensi ulir tunggal)
Dalam contoh modifikasi berikut, kita gunakan futures_util::future::join_all
get_object
permintaan secara bersamaan.
#[tokio::main] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let get_object_futures = objects_to_download.into_iter().map(|object| { let req = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET); async { let res = req .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); // Note that we MUST use the async runtime's preferred way // of writing files. Otherwise, this call would block, // potentially causing a deadlock. tokio::fs::write(object, body).await.expect("write succeeds"); } }); futures_util::future::join_all(get_object_futures).await; }
Ini adalah cara paling sederhana untuk mendapatkan keuntungan dari konkurensi, tetapi juga memiliki beberapa masalah yang mungkin tidak jelas pada pandangan pertama:
-
Kami membuat semua input permintaan secara bersamaan. Jika kita tidak memiliki cukup memori untuk menyimpan semua input
get_object
permintaan maka kita akan mengalami kesalahan alokasi out-of-memory "". -
Kami membuat dan menunggu semua masa depan pada saat yang bersamaan. Amazon S3 membatasi permintaan jika kami mencoba mengunduh terlalu banyak sekaligus.
Untuk memperbaiki kedua masalah ini, kami harus membatasi jumlah permintaan yang kami kirim pada satu waktu. Kami akan melakukan ini dengan tokio
semaphore
const CONCURRENCY_LIMIT: usize = 50; #[tokio::main(flavor = "current_thread")] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let concurrency_semaphore = Arc::new(Semaphore::new(CONCURRENCY_LIMIT)); let get_object_futures = objects_to_download.into_iter().map(|object| { // Since each future needs to acquire a permit, we need to clone // the Arc'd semaphore before passing it in. let semaphore = concurrency_semaphore.clone(); // We also need to clone the client so each task has its own handle. let client = client.clone(); async move { let permit = semaphore .acquire() .await .expect("we'll get a permit if we wait long enough"); let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); tokio::fs::write(object, body).await.expect("write succeeds"); std::mem::drop(permit); } }); futures_util::future::join_all(get_object_futures).await; }
Kami telah memperbaiki masalah penggunaan memori potensial dengan memindahkan pembuatan permintaan ke dalam async
blok. Dengan cara ini, permintaan tidak akan dibuat sampai tiba waktunya untuk mengirimnya.
catatan
Jika Anda memiliki memori untuk itu, mungkin lebih efisien untuk membuat semua input permintaan Anda sekaligus dan menyimpannya di memori sampai siap dikirim. Untuk mencoba ini, pindahkan pembuatan input permintaan di luar async
blok.
Kami juga telah memperbaiki masalah pengiriman terlalu banyak permintaan sekaligus dengan membatasi permintaan dalam penerbanganCONCURRENCY_LIMIT
.
catatan
Nilai yang tepat CONCURRENCY_LIMIT
untuk setiap proyek berbeda. Saat membuat dan mengirim permintaan Anda sendiri, cobalah untuk mengaturnya setinggi mungkin tanpa mengalami kesalahan pelambatan. Meskipun dimungkinkan untuk memperbarui batas konkurensi Anda secara dinamis berdasarkan rasio respons yang berhasil dan dibatasi yang dikirim kembali oleh layanan, itu berada di luar cakupan panduan ini karena kompleksitasnya.
Menulis ulang contoh kita menjadi lebih efisien (konkurensi multi-utas)
Dalam dua contoh sebelumnya, kami melakukan permintaan kami secara bersamaan. Meskipun ini lebih efisien daripada menjalankannya secara sinkron, kita dapat membuat segalanya lebih efisien dengan menggunakan multi-threading. Untuk melakukan initokio
, kita harus menelurkannya sebagai tugas terpisah.
catatan
Contoh ini mengharuskan Anda menggunakan runtime multi-threadedtokio
. Runtime ini terjaga keamanannya di belakang rt-multi-thread
fitur. Dan, tentu saja, Anda harus menjalankan program Anda pada mesin multi-core.
// Set this based on the amount of cores your target machine has. const THREADS: usize = 8; #[tokio::main(flavor = "multi_thread")] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let concurrency_semaphore = Arc::new(Semaphore::new(THREADS)); let get_object_task_handles = objects_to_download.into_iter().map(|object| { // Since each future needs to acquire a permit, we need to clone // the Arc'd semaphore before passing it in. let semaphore = concurrency_semaphore.clone(); // We also need to clone the client so each task has its own handle. let client = client.clone(); // Note this difference! We're using `tokio::task::spawn` to // immediately begin running these requests. tokio::task::spawn(async move { let permit = semaphore .acquire() .await .expect("we'll get a permit if we wait long enough"); let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); tokio::fs::write(object, body).await.expect("write succeeds"); std::mem::drop(permit); }) }); futures_util::future::join_all(get_object_task_handles).await; }
Membagi pekerjaan menjadi tugas bisa jadi rumit. Melakukan I/O (input/output) biasanya memblokir. Runtime mungkin berjuang untuk menyeimbangkan kebutuhan tugas yang berjalan lama dengan tugas-tugas jangka pendek. Apa pun runtime yang Anda pilih, pastikan untuk membaca rekomendasi mereka untuk cara paling efisien untuk membagi pekerjaan Anda menjadi tugas. Untuk rekomendasi tokio
runtime, lihat Modul tokio::task
Mendebug aplikasi multi-utas
Tugas yang berjalan secara bersamaan dapat dijalankan dalam urutan apa pun. Dengan demikian, log program bersamaan bisa sangat sulit dibaca. Di SDK untuk Rust, kami sarankan menggunakan sistem tracing
logging. Itu dapat mengelompokkan log dengan tugas khusus mereka, tidak peduli kapan mereka berjalan. Untuk panduan, lihat Aktifkan pencatatan AWS SDK for Rust kode.
Alat yang sangat berguna untuk mengidentifikasi tugas yang telah terkunci adalah tokio-console
tokio-console
aplikasi, Anda dapat melihat tampilan langsung dari tugas yang dijalankan program Anda. Tampilan ini mencakup informasi bermanfaat seperti jumlah waktu yang dihabiskan tugas untuk menunggu untuk memperoleh sumber daya bersama atau berapa kali tugas tersebut telah disurvei.