Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ruby/lib/ci/queue/redis/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ class Base
CONNECTION_ERRORS = [
::Redis::BaseConnectionError,
::SocketError, # https://github.com/redis/redis-rb/pull/631
].freeze
]

# https://github.com/redis/redis-rb/pull/1312
CONNECTION_ERRORS << RedisClient::ConnectionError if defined?(RedisClient::ConnectionError)
CONNECTION_ERRORS.freeze

module RedisInstrumentation
def call(command, redis_config)
Expand Down
76 changes: 50 additions & 26 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -201,43 +201,67 @@ def try_to_reserve_lost_test
def push(tests)
@total = tests.size

if @master = redis.setnx(key('master-status'), 'setup')
puts "Worker electected as leader, pushing #{@total} tests to the queue."
puts

attempts = 0
duration = measure do
with_redis_timeout(5) do
redis.without_reconnect do
redis.multi do |transaction|
transaction.lpush(key('queue'), tests) unless tests.empty?
transaction.set(key('total'), @total)
transaction.set(key('master-status'), 'ready')

transaction.expire(key('queue'), config.redis_ttl)
transaction.expire(key('total'), config.redis_ttl)
transaction.expire(key('master-status'), config.redis_ttl)
with_redis_timeout(5) do
redis.without_reconnect do
@master = leader_election do
puts "Worker elected as leader, pushing #{@total} tests to the queue."
puts

attempts = 0
duration = measure do
begin
redis.multi do |transaction|
transaction.lpush(key('queue'), tests) unless tests.empty?
transaction.set(key('total'), @total, ex: config.redis_ttl)
transaction.set(key('master-status'), 'ready', ex: config.redis_ttl)

transaction.expire(key('queue'), config.redis_ttl)
end
rescue ::Redis::BaseError, RedisClient::Error => error
if !queue_initialized? && attempts < 3
puts "Retrying pushing #{@total} tests to the queue... (#{error})"
attempts += 1
retry
end

raise if !queue_initialized?
end
end
rescue ::Redis::BaseError => error
if !queue_initialized? && attempts < 3
puts "Retrying pushing #{@total} tests to the queue... (#{error})"
attempts += 1
retry
end

raise if !queue_initialized?
puts "Finished pushing #{@total} tests to the queue in #{duration.round(2)}s."
end
end

puts "Finished pushing #{@total} tests to the queue in #{duration.round(2)}s."
end
register
redis.expire(key('workers'), config.redis_ttl)
rescue *CONNECTION_ERRORS
raise if @master
end

def leader_election
attempts = 0
value = key('setup', worker_id)

begin
if master = redis.setnx(key('master-status'), value)
yield
end
rescue ::Redis::BaseError, RedisClient::Error => error
puts "Error during leader election: #{error}"
if redis.get(key('master-status')) == value
master = true
yield
elsif attempts < 3
puts "Retrying leader election... (#{error})"
attempts += 1
retry
else
raise
end
end

master
end

def register
redis.sadd(key('workers'), [worker_id])
end
Expand Down
10 changes: 5 additions & 5 deletions ruby/test/integration/rspec_redis_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_redis_runner

assert_empty err
expected_output = strip_heredoc <<-EOS
Worker electected as leader, pushing 3 tests to the queue.
Worker elected as leader, pushing 3 tests to the queue.

Finished pushing 3 tests to the queue in X.XXs.

Expand Down Expand Up @@ -91,7 +91,7 @@ def test_redis_runner_retry

assert_empty err
expected_output = strip_heredoc <<-EOS
Worker electected as leader, pushing 3 tests to the queue.
Worker elected as leader, pushing 3 tests to the queue.

Finished pushing 3 tests to the queue in X.XXs.

Expand Down Expand Up @@ -273,7 +273,7 @@ def test_before_suite_errors

assert_empty err
expected_output = strip_heredoc <<-EOS
Worker electected as leader, pushing 2 tests to the queue.
Worker elected as leader, pushing 2 tests to the queue.

Finished pushing 2 tests to the queue in X.XXs.

Expand Down Expand Up @@ -317,7 +317,7 @@ def test_report

assert_empty err
expected_output = strip_heredoc <<-EOS
Worker electected as leader, pushing 3 tests to the queue.
Worker elected as leader, pushing 3 tests to the queue.

Finished pushing 3 tests to the queue in X.XXs.

Expand Down Expand Up @@ -418,7 +418,7 @@ def test_world_wants_to_quit

assert_empty err
expected_output = strip_heredoc <<-EOS
Worker electected as leader, pushing 1 tests to the queue.
Worker elected as leader, pushing 1 tests to the queue.

Finished pushing 1 tests to the queue in X.XXs.

Expand Down
Loading