-
Notifications
You must be signed in to change notification settings - Fork 584
Multi-threaded topology recovery #370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This is pretty interesting and I like the way you avoid natural race conditions here. It would be interesting to see what kind of tricky scenarios we can add to the test suite before we put it through a few 100s runs ;) |
@vikinghawk @acogoluegnes and I discussed this and we'd like to add a couple of things:
Also, can you share any background on this or numbers around how much quicker does parallel recovery finish? Thank you! |
// Support recovering entities in parallel for connections that have a lot of queues, bindings, & consumers | ||
// A channel is single threaded, so group things by channel and recover 1 entity at a time per channel | ||
// We still need to recover 1 type of entity at a time in case channel1 has a binding to a queue that is currently owned and being recovered by channel2 for example | ||
final ExecutorService executor = Executors.newFixedThreadPool(recoveryThreads, delegate.getThreadFactory()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating a new pool of threads can be a concern in some environments, so maybe we should just add (yet) another executor service in the connection factory. Parallel topology recovery would occur only if this executor service is not null (so we could get rid of the recoveryThreads
property).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
TopologyRecoveryException e = new TopologyRecoveryException(message, cause); | ||
this.getExceptionHandler().handleTopologyRecoveryException(delegate, x.getDelegateChannel(), e); | ||
// invokeAll will block until all callables are completed | ||
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedExchanges).values())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could get the Future
s back, check there's the same number as the inputs and they are all done, log a warning if not.
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedBindings))); | ||
executor.invokeAll(groupEntitiesByChannel(Utility.copy(consumers).values())); | ||
} finally { | ||
executor.shutdownNow(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check it returns an empty list and log a warning if not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no longer calling shutdown since the executor is passed in and needs to be reused on subsequent recoveries
@vikinghawk Thanks for this contribution. I added few comments that worth addressing I guess. The CI build also failed:
I haven't run it yet, we're kind of busy this week with a team meeting. |
changes made as requested. I believe I fixed the test as well, but I haven't had time to setup my new machine yet to run tests. My testing scenario: Testing on a single connection with 190 channels, 1500 non-durable/auto-delete queues, and 200 durable/mirrored queues. 1 topic exchange binding and 1 consumer per queue.
Testing on a single connection with 190 channels and 1700 durable/mirrored queues. 1 topic exchange binding and 1 consumer per queue. Note: this client was on a small VM with 2 vCPUs so its possible >8 threads could be beneficial on a bigger machine.
I did try to recover Channels in parallel as well at one time, but it did not appear to be any faster so I backed it out. I assume there is some synchronization point on the connection either client or server side. |
Thanks, those numbers suggest the difference is pretty significant (which is expected but always nice to see some benchmark evidence). We will continue with QA. |
@vikinghawk thank you for the (as always) quality contribution! |
@vikinghawk Merged, thanks! This will be in 4.7.0, we'll produce a release candidate in the next few days. |
Proposed Changes
Decrease recovery time for connections that have lots of queues and bindings by recovering entities (exchanges, queues, bindings, and consumers) in parallel using multiple threads. One of our applications has 100s of channels and 1000s of queues and bindings on a single connection. This PR makes it configurable and opt-in to use multiple threads. Default is still single threaded on the main connection thread. I can add javadoc and clean it up a bit if the team is interested in pulling this in.
In my testing with our workflow I have found recovery time to improve by about 5x using 8 threads vs 1. More than 8 threads seems to have diminishing returns (could be related to the VM i was testing from which only had 2 cores). In one case recovery time went from 26 seconds to 5 seconds and from 119 seconds to 25 in another.
Types of Changes
What types of changes does your code introduce to this project?
Put an
x
in the boxes that applyChecklist
Put an
x
in the boxes that apply. You can also fill these out after creatingthe PR. If you're unsure about any of them, don't hesitate to ask on the
mailing list. We're here to help! This is simply a reminder of what we are
going to look for before merging your code.
CONTRIBUTING.md
documentFurther Comments