diff --git a/src/main/java/com/rabbitmq/client/impl/ConsumerWorkService.java b/src/main/java/com/rabbitmq/client/impl/ConsumerWorkService.java index b3810aae2f..f607e816e3 100644 --- a/src/main/java/com/rabbitmq/client/impl/ConsumerWorkService.java +++ b/src/main/java/com/rabbitmq/client/impl/ConsumerWorkService.java @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -22,10 +22,13 @@ import java.util.concurrent.ThreadFactory; import com.rabbitmq.client.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; final public class ConsumerWorkService { + private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerWorkService.class); private static final int MAX_RUNNABLE_BLOCK_SIZE = 16; - private static final int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2; + private static final int DEFAULT_NUM_THREADS = Math.max(1, Utils.availableProcessors()); private final ExecutorService executor; private final boolean privateExecutor; private final WorkPool workPool; @@ -33,8 +36,12 @@ final public class ConsumerWorkService { public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int queueingTimeout, int shutdownTimeout) { this.privateExecutor = (executor == null); - this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory) - : executor; + if (executor == null) { + LOGGER.debug("Creating executor service with {} thread(s) for consumer work service", DEFAULT_NUM_THREADS); + this.executor = Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory); + } else { + this.executor = executor; + } this.workPool = new WorkPool<>(queueingTimeout); this.shutdownTimeout = shutdownTimeout; } diff --git a/src/main/java/com/rabbitmq/client/impl/Utils.java b/src/main/java/com/rabbitmq/client/impl/Utils.java new file mode 100644 index 0000000000..d3e3412ee4 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/Utils.java @@ -0,0 +1,31 @@ +// Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl; + +final class Utils { + + private static final int AVAILABLE_PROCESSORS = + Integer.parseInt( + System.getProperty( + "rabbitmq.amqp.client.availableProcessors", + String.valueOf(Runtime.getRuntime().availableProcessors()))); + + static int availableProcessors() { + return AVAILABLE_PROCESSORS; + } + + private Utils() {} +}