Class: Aws::S3::TransferManager
- Inherits:
-
Object
- Object
- Aws::S3::TransferManager
- Defined in:
- gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb
Overview
A high-level S3 transfer utility that provides enhanced upload and download capabilities with automatic multipart handling, progress tracking, and handling of large files. The following features are supported:
- upload a file with multipart upload
- upload a stream with multipart upload
- download a S3 object with multipart download
- track transfer progress by using progress listener
Executor Management
TransferManager uses executors to handle concurrent operations during multipart transfers. You can control concurrency behavior by providing a custom executor or relying on the default executor management.
Default Behavior
When no :executor
is provided, TransferManager creates a new DefaultExecutor for each individual
operation (download_file
, upload_file
, etc.) and automatically shuts it down when that operation completes.
Each operation gets its own isolated thread pool with the specified :thread_count
(default 10 threads).
Custom Executor
You can provide your own executor (e.g., Concurrent::ThreadPoolExecutor
) for fine-grained control over thread
pools and resource management. When using a custom executor, you are responsible for shutting it down
when finished. The executor may be reused across multiple TransferManager operations.
Custom executors must implement the same interface as DefaultExecutor.
Required methods:
post(*args, &block)
- Execute a task with given arguments and blockkill
- Immediately terminate all running tasks
Optional methods:
shutdown(timeout = nil)
- Gracefully shutdown the executor with optional timeout
Instance Attribute Summary collapse
-
#client ⇒ S3::Client
readonly
-
#executor ⇒ Object
readonly
Instance Method Summary collapse
-
#download_file(destination, bucket:, key:, **options) ⇒ Boolean
Downloads a file in S3 to a path on disk.
-
#initialize(options = {}) ⇒ TransferManager
constructor
A new instance of TransferManager.
-
#upload_file(source, bucket:, key:, **options) {|response| ... } ⇒ Boolean
Uploads a file from disk to S3.
-
#upload_stream(bucket:, key:, **options, &block) ⇒ Boolean
Uploads a stream in a streaming fashion to S3.
Constructor Details
#initialize(options = {}) ⇒ TransferManager
Returns a new instance of TransferManager.
62 63 64 65 |
# File 'gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb', line 62 def initialize( = {}) @client = [:client] || Client.new @executor = [:executor] end |
Instance Attribute Details
#client ⇒ S3::Client (readonly)
68 69 70 |
# File 'gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb', line 68 def client @client end |
#executor ⇒ Object (readonly)
71 72 73 |
# File 'gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb', line 71 def executor @executor end |
Instance Method Details
#download_file(destination, bucket:, key:, **options) ⇒ Boolean
Downloads a file in S3 to a path on disk.
# small files (< 5MB) are downloaded in a single API call
tm = TransferManager.new
tm.download_file('/path/to/file', bucket: 'bucket', key: 'key')
Files larger than 5MB are downloaded using multipart method:
# large files are split into parts and the parts are downloaded in parallel
tm.download_file('/path/to/large_file', bucket: 'bucket', key: 'key')
You can provide a callback to monitor progress of the download:
# bytes and part_sizes are each an array with 1 entry per part
# part_sizes may not be known until the first bytes are retrieved
progress = proc do |bytes, part_sizes, file_size|
bytes.map.with_index do |b, i|
puts "Part #{i + 1}: #{b} / #{part_sizes[i]}".join(' ') + "Total: #{100.0 * bytes.sum / file_size}%"
end
end
tm.download_file('/path/to/file', bucket: 'bucket', key: 'key', progress_callback: progress)
148 149 150 151 152 153 154 155 |
# File 'gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb', line 148 def download_file(destination, bucket:, key:, **) download_opts = .merge(bucket: bucket, key: key) executor = @executor || DefaultExecutor.new(max_threads: download_opts.delete(:thread_count)) downloader = FileDownloader.new(client: @client, executor: executor) downloader.download(destination, download_opts) executor.shutdown unless @executor true end |
#upload_file(source, bucket:, key:, **options) {|response| ... } ⇒ Boolean
Uploads a file from disk to S3.
# a small file are uploaded with PutObject API
tm = TransferManager.new
tm.upload_file('/path/to/small_file', bucket: 'bucket', key: 'key')
Files larger than or equal to :multipart_threshold
are uploaded using multipart upload APIs.
# large files are automatically split into parts and the parts are uploaded in parallel
tm.upload_file('/path/to/large_file', bucket: 'bucket', key: 'key')
The response of the S3 upload API is yielded if a block given.
# API response will have etag value of the file
tm.upload_file('/path/to/file', bucket: 'bucket', key: 'key') do |response|
etag = response.etag
end
You can provide a callback to monitor progress of the upload:
# bytes and totals are each an array with 1 entry per part
progress = proc do |bytes, totals|
bytes.map.with_index do |b, i|
puts "Part #{i + 1}: #{b} / #{totals[i]} " + "Total: #{100.0 * bytes.sum / totals.sum}%"
end
end
tm.upload_file('/path/to/file', bucket: 'bucket', key: 'key', progress_callback: progress)
223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb', line 223 def upload_file(source, bucket:, key:, **) upload_opts = .merge(bucket: bucket, key: key) executor = @executor || DefaultExecutor.new(max_threads: upload_opts.delete(:thread_count)) uploader = FileUploader.new( multipart_threshold: upload_opts.delete(:multipart_threshold), client: @client, executor: executor ) response = uploader.upload(source, upload_opts) yield response if block_given? executor.shutdown unless @executor true end |
#upload_stream(bucket:, key:, **options, &block) ⇒ Boolean
Uploads a stream in a streaming fashion to S3.
Passed chunks automatically split into multipart upload parts and the parts are uploaded in parallel. This allows for streaming uploads that never touch the disk.
Note: There are known issues in JRuby until jruby-9.1.15.0, so avoid using this with older JRuby versions.
289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb', line 289 def upload_stream(bucket:, key:, **, &block) upload_opts = .merge(bucket: bucket, key: key) executor = @executor || DefaultExecutor.new(max_threads: upload_opts.delete(:thread_count)) uploader = MultipartStreamUploader.new( client: @client, executor: executor, tempfile: upload_opts.delete(:tempfile), part_size: upload_opts.delete(:part_size) ) uploader.upload(upload_opts, &block) executor.shutdown unless @executor true end |