Skip to content

S3 Multipart upload/download update #3275

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 34 commits into
base: version-3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
859f13c
Clean up existing multipart file uploader
jterapin Jul 21, 2025
bbbe1ec
Add upload part validation
jterapin Jul 21, 2025
9a8da62
Clean up existing file downloader
jterapin Jul 22, 2025
f77e63d
Remove unused methods
jterapin Jul 22, 2025
4cb24b4
Fix failing specs
jterapin Jul 22, 2025
c0009e6
Validate total number of parts in file downloader
jterapin Jul 23, 2025
3171acd
Validate content range when ranged get is used
jterapin Jul 24, 2025
df56fba
Update failing spec
jterapin Jul 24, 2025
128118d
Remove comment
jterapin Jul 24, 2025
53140d8
Delete scratch files
jterapin Jul 24, 2025
2c807ed
Improve existing tests
jterapin Jul 25, 2025
9579598
Remove temp file usage
jterapin Jul 25, 2025
96b9a9b
Remove file if any error surfaces
jterapin Jul 25, 2025
1ebb926
Delete file if it exists
jterapin Jul 25, 2025
00b7de8
Add mpu_object_size param
jterapin Jul 25, 2025
20b68f0
Minor updates
jterapin Jul 25, 2025
91a3749
Add comment
jterapin Jul 25, 2025
e233e2e
Fix test
jterapin Jul 25, 2025
08b150d
Update error naming
jterapin Jul 28, 2025
79a2712
Address download opts
jterapin Jul 28, 2025
da3e9d6
Remove mutex from file downloader
jterapin Jul 28, 2025
7965081
Update exsting errors
jterapin Jul 28, 2025
1996de0
Extract validating ranges into its own method
jterapin Jul 28, 2025
4db69b0
Improve raises
jterapin Jul 28, 2025
c30e8fc
Reformat downloader specs
jterapin Jul 28, 2025
dd7ebaf
Update raises
jterapin Jul 28, 2025
cec12f7
Update raise message
jterapin Jul 28, 2025
7366296
Add new validations tests to downloader
jterapin Jul 28, 2025
7500ff2
Format upload file spec
jterapin Jul 28, 2025
bcc2954
Fix flaky spec
jterapin Jul 28, 2025
d445c1a
Improve flakey spec
jterapin Jul 28, 2025
e29fa6b
Add remaining specs
jterapin Jul 28, 2025
b585abc
Add changelog entry
jterapin Jul 28, 2025
f460aa8
merge from version-3
jterapin Jul 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions gems/aws-sdk-s3/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
Unreleased Changes
------------------

* Issue - Add integrity validation to multipart upload/download to ensure all parts are successfully processed.

* Issue - Remove partially downloaded files when multipart `download_file` fail.

1.194.0 (2025-07-21)
------------------

Expand Down
1 change: 1 addition & 0 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module S3
autoload :FileUploader, 'aws-sdk-s3/file_uploader'
autoload :FileDownloader, 'aws-sdk-s3/file_downloader'
autoload :LegacySigner, 'aws-sdk-s3/legacy_signer'
autoload :MultipartDownloadError, 'aws-sdk-s3/file_downloader'
autoload :MultipartFileUploader, 'aws-sdk-s3/multipart_file_uploader'
autoload :MultipartStreamUploader, 'aws-sdk-s3/multipart_stream_uploader'
autoload :MultipartUploadError, 'aws-sdk-s3/multipart_upload_error'
Expand Down
121 changes: 56 additions & 65 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
# frozen_string_literal: true

require 'pathname'
require 'thread'
require 'set'
require 'tmpdir'

module Aws
module S3
# Raised when an error is encountered during multipart download
class MultipartDownloadError < StandardError; end

# @api private
class FileDownloader

MIN_CHUNK_SIZE = 5 * 1024 * 1024
MAX_PARTS = 10_000
THREAD_COUNT = 10

def initialize(options = {})
@client = options[:client] || Client.new
Expand All @@ -24,58 +24,57 @@ def initialize(options = {})
def download(destination, options = {})
@path = destination
@mode = options[:mode] || 'auto'
@thread_count = options[:thread_count] || THREAD_COUNT
@thread_count = options[:thread_count] || 10
@chunk_size = options[:chunk_size]
@params = {
bucket: options[:bucket],
key: options[:key]
}
@params[:version_id] = options[:version_id] if options[:version_id]
@params = param_opts(options)
@on_checksum_validated = options[:on_checksum_validated]
@progress_callback = options[:progress_callback]

validate!

Aws::Plugins::UserAgent.metric('S3_TRANSFER') do
case @mode
when 'auto' then multipart_download
when 'single_request' then single_request
when 'get_range'
if @chunk_size
resp = @client.head_object(@params)
multithreaded_get_by_ranges(resp.content_length, resp.etag)
else
msg = 'In :get_range mode, :chunk_size must be provided'
raise ArgumentError, msg
end
raise ArgumentError, 'In :get_range mode, :chunk_size must be provided' unless @chunk_size

resp = @client.head_object(@params)
multithreaded_get_by_ranges(resp.content_length, resp.etag)
else
msg = "Invalid mode #{@mode} provided, "\
'mode should be :single_request, :get_range or :auto'
raise ArgumentError, msg
raise ArgumentError, "Invalid mode #{@mode} provided, mode should be :single_request, :get_range or :auto"
end
end
rescue StandardError => e
File.delete(@path) if File.exist?(@path)
raise e
end

private

def param_opts(options)
download_opts = %i[mode chunk_size thread_count on_checksum_validated progress_callback]
options.reject { |k, _v| download_opts.include?(k) }
end

def validate!
if @on_checksum_validated && !@on_checksum_validated.respond_to?(:call)
raise ArgumentError, 'on_checksum_validated must be callable'
end
return unless @on_checksum_validated && !@on_checksum_validated.respond_to?(:call)

raise ArgumentError, 'on_checksum_validated must be callable'
end

def multipart_download
resp = @client.head_object(@params.merge(part_number: 1))
count = resp.parts_count

if count.nil? || count <= 1
if resp.content_length <= MIN_CHUNK_SIZE
single_request
else
multithreaded_get_by_ranges(resp.content_length, resp.etag)
end
else
# partNumber is an option
resp = @client.head_object(@params)
# covers cases when given object is not uploaded via UploadPart API
resp = @client.head_object(@params) # partNumber is an option
if resp.content_length <= MIN_CHUNK_SIZE
single_request
else
Expand All @@ -86,40 +85,18 @@ def multipart_download

def compute_mode(file_size, count, etag)
chunk_size = compute_chunk(file_size)
part_size = (file_size.to_f / count.to_f).ceil
part_size = (file_size.to_f / count).ceil
if chunk_size < part_size
multithreaded_get_by_ranges(file_size, etag)
else
multithreaded_get_by_parts(count, file_size, etag)
end
end

def construct_chunks(file_size)
offset = 0
default_chunk_size = compute_chunk(file_size)
chunks = []
while offset < file_size
progress = offset + default_chunk_size
progress = file_size if progress > file_size
chunks << "bytes=#{offset}-#{progress - 1}"
offset = progress
end
chunks
end

def compute_chunk(file_size)
if @chunk_size && @chunk_size > file_size
raise ArgumentError, ":chunk_size shouldn't exceed total file size."
else
@chunk_size || [
(file_size.to_f / MAX_PARTS).ceil, MIN_CHUNK_SIZE
].max.to_i
end
end
raise ArgumentError, ":chunk_size shouldn't exceed total file size." if @chunk_size && @chunk_size > file_size

def batches(chunks, mode)
chunks = (1..chunks) if mode.eql? 'part_number'
chunks.each_slice(@thread_count).to_a
@chunk_size || [(file_size.to_f / MAX_PARTS).ceil, MIN_CHUNK_SIZE].max.to_i
end

def multithreaded_get_by_ranges(file_size, etag)
Expand All @@ -130,12 +107,8 @@ def multithreaded_get_by_ranges(file_size, etag)
while offset < file_size
progress = offset + default_chunk_size
progress = file_size if progress > file_size
range = "bytes=#{offset}-#{progress - 1}"
chunks << Part.new(
part_number: part_number,
size: (progress-offset),
params: @params.merge(range: range, if_match: etag)
)
params = @params.merge(range: "bytes=#{offset}-#{progress - 1}", if_match: etag)
chunks << Part.new(part_number: part_number, size: (progress - offset), params: params)
part_number += 1
offset = progress
end
Expand All @@ -151,50 +124,63 @@ def multithreaded_get_by_parts(n_parts, total_size, etag)

def download_in_threads(pending, total_size)
threads = []
max_requests = pending.count
total_requests = 0
progress = MultipartProgress.new(pending, total_size, @progress_callback) if @progress_callback
@thread_count.times do
thread = Thread.new do
begin
while part = pending.shift
while (part = pending.shift)
if progress
part.params[:on_chunk_received] =
proc do |_chunk, bytes, total|
progress.call(part.part_number, bytes, total)
end
end
resp = @client.get_object(part.params)
validate_range(resp, part) if part.params[:range]
write(resp)
if @on_checksum_validated && resp.checksum_validated
@on_checksum_validated.call(resp.checksum_validated, resp)
end
total_requests += 1
end
nil
rescue => error
# keep other threads from downloading other parts
pending.clear!
raise error
rescue StandardError => e
pending.clear! # keep other threads from downloading other parts
raise e
end
end
threads << thread
end
threads.map(&:value).compact
return if max_requests == total_requests

msg = "multipart download failed: expected #{max_requests} parts but received #{total_requests}"
raise MultipartDownloadError, msg
end

def validate_range(resp, part)
range = resp.content_range.split(' ').last.split('/').first
expected_range = part.params[:range].split('=').last
return if expected_range == range

raise MultipartDownloadError, 'multipart download failed: file integrity checked failed'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could be more descriptive - like "failed to validate content range of a part" or something.

end

def write(resp)
range, _ = resp.content_range.split(' ').last.split('/')
head, _ = range.split('-').map {|s| s.to_i}
range = resp.content_range.split(' ').last.split('/').first
head = range.split('-').map(&:to_i).first
File.write(@path, resp.body.read, head)
end

def single_request
params = @params.merge(response_target: @path)
params[:on_chunk_received] = single_part_progress if @progress_callback
resp = @client.get_object(params)

return resp unless @on_checksum_validated

@on_checksum_validated.call(resp.checksum_validated, resp) if resp.checksum_validated

resp
end

Expand All @@ -204,6 +190,7 @@ def single_part_progress
end
end

# @api private
class Part < Struct.new(:part_number, :size, :params)
include Aws::Structure
end
Expand All @@ -216,6 +203,10 @@ def initialize(parts = [])
@mutex = Mutex.new
end

def count
@mutex.synchronize { @parts.count }
end

def shift
@mutex.synchronize { @parts.shift }
end
Expand Down
Loading