Skip to content

Commit 4aaf178

Browse files
committed
Make leader election idempotent
1 parent ef514cb commit 4aaf178

File tree

2 files changed

+53
-26
lines changed

2 files changed

+53
-26
lines changed

ruby/lib/ci/queue/redis/base.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ class Base
88
TEN_MINUTES = 60 * 10
99
CONNECTION_ERRORS = [
1010
::Redis::BaseConnectionError,
11+
RedisClient::ConnectionError,
1112
::SocketError, # https://github.com/redis/redis-rb/pull/631
1213
].freeze
1314

ruby/lib/ci/queue/redis/worker.rb

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -201,43 +201,69 @@ def try_to_reserve_lost_test
201201
def push(tests)
202202
@total = tests.size
203203

204-
if @master = redis.setnx(key('master-status'), 'setup')
205-
puts "Worker electected as leader, pushing #{@total} tests to the queue."
206-
puts
207-
208-
attempts = 0
209-
duration = measure do
210-
with_redis_timeout(5) do
211-
redis.without_reconnect do
212-
redis.multi do |transaction|
213-
transaction.lpush(key('queue'), tests) unless tests.empty?
214-
transaction.set(key('total'), @total)
215-
transaction.set(key('master-status'), 'ready')
216-
217-
transaction.expire(key('queue'), config.redis_ttl)
218-
transaction.expire(key('total'), config.redis_ttl)
219-
transaction.expire(key('master-status'), config.redis_ttl)
204+
with_redis_timeout(5) do
205+
redis.without_reconnect do
206+
@master = leader_election do
207+
puts "Worker electected as leader, pushing #{@total} tests to the queue."
208+
puts
209+
210+
attempts = 0
211+
duration = measure do
212+
begin
213+
redis.multi do |transaction|
214+
transaction.lpush(key('queue'), tests) unless tests.empty?
215+
transaction.set(key('total'), @total)
216+
transaction.set(key('master-status'), 'ready')
217+
218+
transaction.expire(key('queue'), config.redis_ttl)
219+
transaction.expire(key('total'), config.redis_ttl)
220+
transaction.expire(key('master-status'), config.redis_ttl)
221+
end
222+
rescue ::Redis::BaseError, RedisClient::Error => error
223+
if !queue_initialized? && attempts < 3
224+
puts "Retrying pushing #{@total} tests to the queue... (#{error})"
225+
attempts += 1
226+
retry
227+
end
228+
229+
raise if !queue_initialized?
220230
end
221231
end
222-
rescue ::Redis::BaseError => error
223-
if !queue_initialized? && attempts < 3
224-
puts "Retrying pushing #{@total} tests to the queue... (#{error})"
225-
attempts += 1
226-
retry
227-
end
228-
229-
raise if !queue_initialized?
232+
puts "Finished pushing #{@total} tests to the queue in #{duration.round(2)}s."
230233
end
231234
end
232-
233-
puts "Finished pushing #{@total} tests to the queue in #{duration.round(2)}s."
234235
end
235236
register
236237
redis.expire(key('workers'), config.redis_ttl)
237238
rescue *CONNECTION_ERRORS
238239
raise if @master
239240
end
240241

242+
def leader_election
243+
attempts = 0
244+
value = key('setup')
245+
246+
begin
247+
if master = redis.setnx(key('master-status'), value)
248+
yield
249+
end
250+
rescue ::Redis::BaseError, RedisClient::Error => error
251+
puts "Error during leader election: #{error}"
252+
if redis.get(key('master-status')) == value
253+
master = true
254+
yield
255+
elsif attempts < 3
256+
puts "Retrying leader election... (#{error})"
257+
attempts += 1
258+
retry
259+
else
260+
raise error
261+
end
262+
end
263+
264+
master
265+
end
266+
241267
def register
242268
redis.sadd(key('workers'), [worker_id])
243269
end

0 commit comments

Comments
 (0)