> pubQueue, String retryKey, int retryLimit,
+ final String invalidKey,
+ final ManagedDataStore dataStore, final Codec codec,
+ final Runnable disconnectCallback, final WorkerConfiguration workerConfiguration)
{
this.callback = Objects.requireNonNull(callback);
this.metrics = Objects.requireNonNull(metrics);
@@ -63,57 +106,247 @@ public WorkerQueueConsumerImpl(TaskCallback callback, RabbitMetricsReporter metr
this.publisherEventQueue = Objects.requireNonNull(pubQueue);
this.retryRoutingKey = Objects.requireNonNull(retryKey);
this.retryLimit = retryLimit;
+ this.invalidRoutingKey = Objects.requireNonNull(invalidKey);
+ this.dataStore = Objects.requireNonNull(dataStore);
+ this.codec = Objects.requireNonNull(codec);
+ this.disconnectCallback = Objects.requireNonNull(disconnectCallback);
+ this.offloadedPayloadsToDelete = Collections.synchronizedSortedMap(new TreeMap<>());
+ this.workerConfiguration = Objects.requireNonNull(workerConfiguration);
}
/**
* {@inheritDoc}
- *
+ *
* If an incoming message is marked as redelivered, hand it off to another method to deal with retry/rejection. Otherwise, hand it off
- * to worker-core, and potentially repbulish or reject it depending upon exceptions thrown.
+ * to worker-core, and potentially republish or reject it depending upon exceptions thrown.
*/
@Override
public void processDelivery(Delivery delivery)
{
- final int retries = delivery.getHeaders().containsKey(RabbitHeaders.RABBIT_HEADER_CAF_DELIVERY_COUNT) ?
- Integer.parseInt(String.valueOf(delivery.getHeaders()
- .getOrDefault(RabbitHeaders.RABBIT_HEADER_CAF_DELIVERY_COUNT, "0"))) :
- Integer.parseInt(String.valueOf(delivery.getHeaders()
- .getOrDefault(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_RETRY, "0")));
-
+ final long inboundMessageId = delivery.getEnvelope().getDeliveryTag();
+ final String routingKey = delivery.getEnvelope().getRoutingKey();
+ final Map deliveryHeaders = delivery.getHeaders();
+ final boolean isRedelivered = delivery.getEnvelope().isRedeliver();
+ final int retries = deliveryHeaders.containsKey(RabbitHeaders.RABBIT_HEADER_CAF_DELIVERY_COUNT)
+ ? Integer.parseInt(String.valueOf(deliveryHeaders.getOrDefault(RabbitHeaders.RABBIT_HEADER_CAF_DELIVERY_COUNT, "0")))
+ : Integer.parseInt(String.valueOf(deliveryHeaders.getOrDefault(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_RETRY, "0")));
+ final Optional taskMessageStorageRefOpt
+ = Optional.ofNullable(deliveryHeaders.get(RABBIT_HEADER_CAF_PAYLOAD_OFFLOADING_STORAGE_REF)).map(Object::toString);
metrics.incrementReceived();
- final boolean isPoison;
- if (delivery.getEnvelope().isRedeliver()) {
- if (!delivery.getHeaders().containsKey(RabbitHeaders.RABBIT_HEADER_CAF_DELIVERY_COUNT)) {
- //RABBIT_HEADER_CAF_DELIVERY_COUNT is not available, message was delivered from CLASSIC queue
+
+ final byte[] deliveryMessageData = delivery.getMessageData();
+ final TaskMessage taskMessage;
+ try {
+ try {
+ taskMessage = codec.deserialise(deliveryMessageData, TaskMessage.class, DecodeMethod.LENIENT);
+ } catch (final CodecException e) {
+ handleInvalidDelivery(inboundMessageId, Optional.empty(), deliveryMessageData, deliveryHeaders,
+ "Cannot deserialize delivery messageData to TaskMessage");
+ return;
+ }
+
+ try {
+ handleTaskDataInjection(taskMessage, inboundMessageId, taskMessageStorageRefOpt);
+ } catch (final InvalidDeliveryException ex) {
+ handleInvalidDelivery(inboundMessageId, Optional.of(taskMessage), deliveryMessageData, deliveryHeaders,
+ ex.getMessage());
+ return;
+ }
+
+ final PoisonMessageStatus poisonMessageStatus = getPoisonMessageStatus(
+ isRedelivered, deliveryHeaders, retries);
+
+ if (poisonMessageStatus == PoisonMessageStatus.CLASSIC_POSSIBLY_POISON) {
+ republishClassicRedelivery(
+ delivery.getEnvelope().getRoutingKey(),
+ inboundMessageId,
+ deliveryMessageData,
+ taskMessage.getTaskData(),
+ deliveryHeaders,
+ retries,
+ taskMessage.getTracking(),
+ taskMessageStorageRefOpt
+ );
+ return;
+ }
+
+ processDelivery(
+ inboundMessageId,
+ routingKey,
+ deliveryHeaders,
+ taskMessage,
+ deliveryMessageData,
+ poisonMessageStatus == PoisonMessageStatus.POISON
+ );
+ } catch (final TransientDeliveryException e) {
+ LOG.warn("Transient error processing message id {}, disconnecting.", inboundMessageId, e);
+ offloadedPayloadsToDelete.remove(inboundMessageId);
+ //Disconnect the channel to allow for a reconnect when the HealthCheck passes.
+ disconnectCallback.run();
+ }
+ }
+
+ /**
+ * Handles the logic for injecting taskData from the store into the TaskMessage if required.
+ * Returns true if processing should continue, false if it should stop (e.g. error).
+ * If invalid, handles as poison message (publishes to retry queue) and returns false.
+ */
+ private void handleTaskDataInjection(final TaskMessage taskMessage, final long inboundMessageId,
+ final Optional taskMessageStorageRefOpt)
+ throws InvalidDeliveryException, TransientDeliveryException
+ {
+ final byte[] currentTaskData = taskMessage.getTaskData();
+ final boolean hasStorageRef = taskMessageStorageRefOpt.isPresent();
+ final boolean hasTaskData = currentTaskData != null;
+
+ if (hasTaskData && hasStorageRef) {
+ throw new InvalidDeliveryException(
+ "TaskMessage contains both taskData and a storage reference. This is invalid.", inboundMessageId);
+ }
+ if (!hasTaskData && !hasStorageRef) {
+ throw new InvalidDeliveryException(
+ "TaskMessage contains neither taskData nor a storage reference. This is invalid.", inboundMessageId);
+ }
+ if (hasStorageRef) {
+ final byte[] offloadedTaskData;
+ try {
+ offloadedTaskData = retrieveTaskDataFromStore(taskMessageStorageRefOpt.get(), inboundMessageId);
+ } catch (final ReferenceNotFoundException e) {
+ throw new InvalidDeliveryException(e.getMessage(), inboundMessageId);
+ }
+ taskMessage.setTaskData(offloadedTaskData);
+ }
+ // If hasTaskData and !hasStorageRef, nothing to do
+ }
+
+ private byte[] retrieveTaskDataFromStore(final String taskMessageStorageRef, final long inboundMessageId)
+ throws ReferenceNotFoundException, TransientDeliveryException
+ {
+ try (final var inputStream = dataStore.retrieve(taskMessageStorageRef)) {
+ final var taskData = inputStream.readAllBytes();
+ offloadedPayloadsToDelete.put(inboundMessageId, taskMessageStorageRef);
+ return taskData;
+ } catch (final IOException | DataStoreException ex) {
+ if (ex instanceof ReferenceNotFoundException) {
+ throw (ReferenceNotFoundException)ex;
+ }
+ throw new TransientDeliveryException(
+ "TaskMessage's TaskData could not be retrieved from DataStore", inboundMessageId, ex);
+ }
+ }
+
+ private void handleInvalidDelivery(
+ final long inboundMessageId,
+ final Optional deliveredTaskMessageOpt,
+ final byte[] deliveryMessageData,
+ final Map deliveryHeaders,
+ final String exceptionMesssage
+ )
+ {
+ try {
+ final RabbitTaskInformation taskInformation = new RabbitTaskInformation(String.valueOf(inboundMessageId), true);
+ taskInformation.incrementResponseCount(true);
+ final var publishHeaders = new HashMap<>(deliveryHeaders);
+ publishHeaders.put(RABBIT_HEADER_CAF_WORKER_INVALID, exceptionMesssage);
+
+ publisherEventQueue.add(new WorkerPublishQueueEvent(deliveryMessageData, invalidRoutingKey, taskInformation, publishHeaders));
+
+ if (deliveredTaskMessageOpt.isPresent()) {
+ final var taskMessage = deliveredTaskMessageOpt.get();
+ if(taskMessage.getTracking() != null) {
+ sendFailureTrackingReport(taskMessage, exceptionMesssage, taskInformation);
+ }
+ }
+ } catch (CodecException e) {
+ LOG.error("Failed to serialise report update task data.");
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void sendFailureTrackingReport(
+ final TaskMessage taskMessage,
+ final String invalidDeliveryExceptionMessage,
+ final RabbitTaskInformation rabbitTaskInformation
+ ) throws CodecException {
+ final TrackingReportFailure failure = new TrackingReportFailure();
+ failure.failureId = TaskStatus.INVALID_TASK.toString();
+ failure.failureTime = new Date();
+ failure.failureSource = getWorkerName(taskMessage);
+ failure.failureMessage = invalidDeliveryExceptionMessage;
+
+ final List trackingReports = new ArrayList<>();
+
+ final TrackingReport trackingReport = new TrackingReport();
+ trackingReport.failure = failure;
+ trackingReport.status = TrackingReportStatus.Failed;
+
+ trackingReports.add(trackingReport);
+
+ final TrackingReportTask trackingReportTask = new TrackingReportTask();
+ trackingReportTask.trackingReports = trackingReports;
+
+ final byte[] trackingReportTaskTaskData = codec.serialise(trackingReportTask);
+
+ final TrackingInfo trackingInfo = taskMessage.getTracking();
+
+ final TaskMessage failureReportTaskMessage = new TaskMessage(
+ UUID.randomUUID().toString(), TrackingReportConstants.TRACKING_REPORT_TASK_NAME,
+ TrackingReportConstants.TRACKING_REPORT_TASK_API_VER, trackingReportTaskTaskData, TaskStatus.NEW_TASK,
+ Collections.emptyMap(), trackingInfo.getTrackingPipe(), null, null,
+ taskMessage.getCorrelationId());
+
+ publisherEventQueue.add(new WorkerPublishQueueEvent(codec.serialise(failureReportTaskMessage),
+ trackingInfo.getTrackingPipe(), rabbitTaskInformation, Collections.emptyMap()));
+ }
+
+ private PoisonMessageStatus getPoisonMessageStatus(
+ final boolean isRedelivered,
+ final Map deliveryHeaders,
+ final int retries
+ ) {
+ // If the message is being redelivered it is potentially a poison message.
+ if (isRedelivered) {
+ // If the headers do not contain the delivery count, then it is a classic queue.
+ if (!deliveryHeaders.containsKey(RabbitHeaders.RABBIT_HEADER_CAF_DELIVERY_COUNT)) {
+ // If the retries have not been exceeded, then republish the message
+ // with a header recording the retry count
if (retries < retryLimit) {
- //Republish the delivery with a header recording the incremented number of retries.
- //Classic queues do not record delivery count, so we republish the message with an incremented
- //retry count. This allows us to track the number of attempts to process the message.
- republishClassicRedelivery(delivery, retries);
- return;
+ return PoisonMessageStatus.CLASSIC_POSSIBLY_POISON;
}
- isPoison = true;
- } else {
- isPoison = retries > retryLimit;
}
- } else {
- isPoison = false;
+ return (retries >= retryLimit)
+ ? PoisonMessageStatus.POISON
+ : PoisonMessageStatus.NOT_POISON;
}
+ return PoisonMessageStatus.NOT_POISON;
+ }
- final RabbitTaskInformation taskInformation = new RabbitTaskInformation(String.valueOf(delivery.getEnvelope().getDeliveryTag()), isPoison);
+ private void processDelivery(
+ final long inboundMessageId,
+ final String routingKey,
+ final Map deliveryHeaders,
+ final TaskMessage taskMessage,
+ final byte[] taskMessageByteArray,
+ final boolean isPoison
+ ) {
+ final TrackingInfo trackingInfo = taskMessage.getTracking();
+ final String trackingJobTaskId = trackingInfo != null ? trackingInfo.getJobTaskId() : "untracked";
+ final RabbitTaskInformation taskInformation = new RabbitTaskInformation(
+ String.valueOf(inboundMessageId), isPoison, Optional.of(trackingJobTaskId)
+ );
try {
- LOG.debug("Registering new message {}", taskInformation.getInboundMessageId());
- callback.registerNewTask(taskInformation, delivery.getMessageData(), delivery.getHeaders());
- } catch (InvalidTaskException e) {
- LOG.error("Cannot register new message, rejecting {}", taskInformation.getInboundMessageId(), e);
+ LOG.debug("Registering new message {}", inboundMessageId);
+ callback.registerNewTask(taskInformation, taskMessage, deliveryHeaders);
+ } catch (final InvalidTaskException e) {
+ LOG.error("Cannot register new message, rejecting {}", inboundMessageId, e);
taskInformation.incrementResponseCount(true);
- publisherEventQueue.add(new WorkerPublishQueueEvent(delivery.getMessageData(), retryRoutingKey, taskInformation,
- Collections.singletonMap(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_REJECTED, REJECTED_REASON_TASKMESSAGE)));
- } catch (TaskRejectedException e) {
- LOG.warn("Message {} rejected as a task at this time, returning to queue", taskInformation.getInboundMessageId(), e);
+ final var publishHeaders = new HashMap();
+ publishHeaders.put(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_INVALID, e);
+ publisherEventQueue.add(new WorkerPublishQueueEvent(taskMessageByteArray, invalidRoutingKey, taskInformation, publishHeaders));
+ } catch (final TaskRejectedException e) {
+ LOG.warn("Message {} rejected as a task at this time, returning to queue", inboundMessageId, e);
taskInformation.incrementResponseCount(true);
- publisherEventQueue.add(new WorkerPublishQueueEvent(delivery.getMessageData(), delivery.getEnvelope().getRoutingKey(),
- taskInformation));
+ publisherEventQueue.add(new WorkerPublishQueueEvent(taskMessageByteArray, routingKey, taskInformation, deliveryHeaders));
}
}
@@ -131,6 +364,17 @@ public void processAck(long tag)
LOG.warn("Couldn't ack message {}, will retry", tag, e);
metrics.incremementErrors();
consumerEventQueue.add(new ConsumerAckEvent(tag));
+ return;
+ }
+
+ final String datastorePayloadReference = offloadedPayloadsToDelete.remove(tag);
+ if (datastorePayloadReference != null) {
+ try {
+ dataStore.delete(datastorePayloadReference);
+ } catch (final DataStoreException e) {
+ LOG.warn("Couldn't delete offloaded payload '{}' for delivery tag '{}' from datastore message.",
+ datastorePayloadReference, tag, e);
+ }
}
}
@@ -175,21 +419,59 @@ private void processReject(long id, boolean requeue)
}
}
- /**
- * Republish the delivery to the retry queue with the retry count stamped in the headers.
- *
- * @param delivery the redelivered message
- */
- private void republishClassicRedelivery(final Delivery delivery, final int retries) {
-
- final RabbitTaskInformation taskInformation =
- new RabbitTaskInformation(String.valueOf(delivery.getEnvelope().getDeliveryTag()));
+ private void republishClassicRedelivery(
+ final String deliveryQueue,
+ final long inboundMessageId,
+ final byte[] serializedTaskMessage,
+ final byte[] serializedTaskData,
+ final Map deliveryHeaders,
+ final int retries,
+ final TrackingInfo tracking,
+ final Optional taskMessageStorageRefOpt
+ )
+ {
+ final String trackingJobTaskId = tracking != null ? tracking.getJobTaskId() : "untracked";
+ final RabbitTaskInformation taskInformation = new RabbitTaskInformation(
+ String.valueOf(inboundMessageId), false, Optional.of(trackingJobTaskId));
LOG.debug("Received redelivered message with id {}, retry count {}, retry limit {}, republishing to retry queue",
- delivery.getEnvelope().getDeliveryTag(), retryLimit, retries + 1);
- final Map headers = new HashMap<>();
- headers.put(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_RETRY, String.valueOf(retries + 1));
+ inboundMessageId, retryLimit, retries + 1);
+ final Map publishHeaders = new HashMap<>(deliveryHeaders);
+ publishHeaders.put(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_RETRY, String.valueOf(retries + 1));
taskInformation.incrementResponseCount(true);
- publisherEventQueue.add(new WorkerPublishQueueEvent(delivery.getMessageData(), retryRoutingKey,
- taskInformation, headers));
+ if(taskMessageStorageRefOpt.isPresent()) {
+ if (!retryRoutingKey.equals(deliveryQueue)) {
+ try {
+ final String newStorageReference =
+ dataStore.store(serializedTaskData,
+ taskMessageStorageRefOpt.get().replace(deliveryQueue, retryRoutingKey));
+ publishHeaders.put(RABBIT_HEADER_CAF_PAYLOAD_OFFLOADING_STORAGE_REF, newStorageReference);
+ }
+ catch (final DataStoreException e) {
+ LOG.error("Failed to relocate offloaded payload for message id {} from {} to {}",
+ inboundMessageId, deliveryQueue, retryRoutingKey, e);
+ //Disconnect the channel to allow for a reconnect when the HealthCheck passes.
+ disconnectCallback.run();
+ }
+ }
+ else {
+ //We are reusing the same routing key, so we do not need to relocate the payload.
+ offloadedPayloadsToDelete.remove(inboundMessageId);
+ }
+ }
+ publisherEventQueue.add(new WorkerPublishQueueEvent(serializedTaskMessage, retryRoutingKey, taskInformation, publishHeaders));
+ }
+
+ private String getWorkerName(final TaskMessage taskMessage)
+ {
+ final var taskClassifier = MoreObjects.firstNonNull(taskMessage.getTaskClassifier(), "");
+ if (workerConfiguration != null) {
+ final String workerName = workerConfiguration.getWorkerName();
+
+ if (workerName != null) {
+ return workerName;
+ }
+ }
+
+ return taskClassifier;
}
}
diff --git a/worker-queue-rabbit/src/test/java/com/github/workerframework/queues/rabbit/RabbitWorkerQueueConsumerTest.java b/worker-queue-rabbit/src/test/java/com/github/workerframework/queues/rabbit/RabbitWorkerQueueConsumerTest.java
index 13c39e1f9..fed7f93a9 100644
--- a/worker-queue-rabbit/src/test/java/com/github/workerframework/queues/rabbit/RabbitWorkerQueueConsumerTest.java
+++ b/worker-queue-rabbit/src/test/java/com/github/workerframework/queues/rabbit/RabbitWorkerQueueConsumerTest.java
@@ -15,11 +15,22 @@
*/
package com.github.workerframework.queues.rabbit;
+import com.github.cafapi.common.api.Codec;
+import com.github.cafapi.common.api.CodecException;
+import com.github.cafapi.common.codecs.json.JsonCodec;
+import com.github.workerframework.api.DataStoreException;
import com.github.workerframework.api.InvalidTaskException;
+import com.github.workerframework.api.ManagedDataStore;
import com.github.workerframework.api.TaskCallback;
import com.github.workerframework.api.TaskInformation;
+import com.github.workerframework.api.TaskMessage;
import com.github.workerframework.api.TaskRejectedException;
+import com.github.workerframework.api.TaskStatus;
+import com.github.workerframework.api.TrackingInfo;
+import com.github.workerframework.api.WorkerConfiguration;
import com.github.workerframework.api.WorkerException;
+import com.github.workerframework.datastores.fs.FileSystemDataStore;
+import com.github.workerframework.datastores.fs.FileSystemDataStoreConfiguration;
import com.github.workerframework.util.rabbitmq.ConsumerAckEvent;
import com.github.workerframework.util.rabbitmq.ConsumerDropEvent;
import com.github.workerframework.util.rabbitmq.ConsumerRejectEvent;
@@ -31,15 +42,19 @@
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
+import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -47,24 +62,63 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import static org.mockito.Mockito.mock;
+
public class RabbitWorkerQueueConsumerTest
{
private String testQueue = "testQueue";
- private RabbitTaskInformation taskInformation;
- private byte[] data = "test123".getBytes(StandardCharsets.UTF_8);
+ private RabbitTaskInformation taskInformation;
private Envelope newEnv;
private Envelope poisonEnv;
private Envelope redeliveredEnv;
private String retryKey = "retry";
+ private String invalidKey = "invalid";
private RabbitMetricsReporter metrics = new RabbitMetricsReporter();
- private TaskCallback mockCallback = Mockito.mock(TaskCallback.class);
+ private TaskCallback mockCallback = mock(TaskCallback.class);
+ private File tempDataStore;
+ private ManagedDataStore dataStore;
+ private static Codec codec;
+ private static byte[] data;
+
+ @BeforeClass
+ public static void beforeClass() throws CodecException {
+ codec = new JsonCodec();
+ data = getNewTaskMessage();
+ }
@BeforeMethod
- public void beforeMethod() {
+ public void beforeMethod() throws DataStoreException {
taskInformation = new RabbitTaskInformation("101");
newEnv = new Envelope(Long.valueOf(taskInformation.getInboundMessageId()), false, "", testQueue);
poisonEnv = new Envelope(Long.valueOf(taskInformation.getInboundMessageId()), true, "", testQueue);
redeliveredEnv = new Envelope(Long.valueOf(taskInformation.getInboundMessageId()), true, "", testQueue);
+ tempDataStore = new File("RabbitWorkerQueueConsumerTest");
+ dataStore = new FileSystemDataStore(createConfig());
+ }
+
+ @AfterMethod
+ public void tearDown()
+ {
+ deleteDir(tempDataStore);
+ }
+
+ private void deleteDir(File file)
+ {
+ File[] contents = file.listFiles();
+ if (contents != null) {
+ for (File f : contents) {
+ deleteDir(f);
+ }
+ }
+ file.delete();
+ }
+
+ private FileSystemDataStoreConfiguration createConfig()
+ {
+ final FileSystemDataStoreConfiguration conf = new FileSystemDataStoreConfiguration();
+ conf.setDataDir(tempDataStore.getAbsolutePath());
+ conf.setDataDirHealthcheckTimeoutSeconds(10);
+ return conf;
}
/**
@@ -76,19 +130,21 @@ public void testHandleDelivery()
{
BlockingQueue> consumerEvents = new LinkedBlockingQueue<>();
BlockingQueue> publisherEvents = new LinkedBlockingQueue<>();
- Channel channel = Mockito.mock(Channel.class);
+ Channel channel = mock(Channel.class);
CountDownLatch latch = new CountDownLatch(1);
- TaskCallback callback = Mockito.mock(TaskCallback.class);
+ TaskCallback callback = mock(TaskCallback.class);
Answer a = invocationOnMock -> {
latch.countDown();
return null;
};
Mockito.doAnswer(a).when(callback).registerNewTask(Mockito.any(), Mockito.any(), Mockito.anyMap());
- WorkerQueueConsumerImpl impl = new WorkerQueueConsumerImpl(callback, metrics, consumerEvents, channel, publisherEvents, retryKey, 1);
+ WorkerQueueConsumerImpl impl = new WorkerQueueConsumerImpl(
+ callback, metrics, consumerEvents, channel, publisherEvents, retryKey, 1, invalidKey,
+ dataStore, codec, () -> {}, mock(WorkerConfiguration.class));
DefaultRabbitConsumer consumer = new DefaultRabbitConsumer(consumerEvents, impl);
Thread t = new Thread(consumer);
t.start();
- AMQP.BasicProperties prop = Mockito.mock(AMQP.BasicProperties.class);
+ AMQP.BasicProperties prop = mock(AMQP.BasicProperties.class);
Mockito.when(prop.getHeaders()).thenReturn(Collections.emptyMap());
consumer.handleDelivery("consumer", newEnv, prop, data);
Assert.assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
@@ -105,19 +161,21 @@ public void testPoisonDelivery()
{
BlockingQueue> consumerEvents = new LinkedBlockingQueue<>();
BlockingQueue> publisherEvents = new LinkedBlockingQueue<>();
- Channel channel = Mockito.mock(Channel.class);
+ Channel channel = mock(Channel.class);
CountDownLatch latch = new CountDownLatch(1);
- TaskCallback callback = Mockito.mock(TaskCallback.class);
+ TaskCallback callback = mock(TaskCallback.class);
Answer a = invocationOnMock -> {
latch.countDown();
return null;
};
Mockito.doAnswer(a).when(callback).registerNewTask(Mockito.any(), Mockito.any(), Mockito.anyMap());
- WorkerQueueConsumerImpl impl = new WorkerQueueConsumerImpl(callback, metrics, consumerEvents, channel, publisherEvents, retryKey, 1);
+ WorkerQueueConsumerImpl impl = new WorkerQueueConsumerImpl(
+ callback, metrics, consumerEvents, channel, publisherEvents, retryKey, 1, invalidKey,
+ dataStore, codec, () -> {}, mock(WorkerConfiguration.class));
DefaultRabbitConsumer consumer = new DefaultRabbitConsumer(consumerEvents, impl);
Thread t = new Thread(consumer);
t.start();
- AMQP.BasicProperties prop = Mockito.mock(AMQP.BasicProperties.class);
+ AMQP.BasicProperties prop = mock(AMQP.BasicProperties.class);
Map headers = new HashMap<>();
headers.put(RabbitHeaders.RABBIT_HEADER_CAF_WORKER_RETRY, "1");
Mockito.when(prop.getHeaders()).thenReturn(headers);
@@ -131,8 +189,8 @@ public void testPoisonDelivery()
}
/**
- * Send in a new message and verify that if the task registration throws an InvalidTaskException that a new publish request to the
- * reject queue is sent.
+ * Send in a new message and verify that if the task registration throws an InvalidTaskException that a new publish
+ * request to the invalid queue is sent.
*/
@Test
public void testHandleDeliveryInvalid()
@@ -140,28 +198,30 @@ public void testHandleDeliveryInvalid()
{
BlockingQueue> consumerEvents = new LinkedBlockingQueue<>();
BlockingQueue> publisherEvents = new LinkedBlockingQueue<>();
- Channel channel = Mockito.mock(Channel.class);
- TaskCallback callback = Mockito.mock(TaskCallback.class);
+ Channel channel = mock(Channel.class);
+ TaskCallback callback = mock(TaskCallback.class);
Answer a = invocationOnMock -> {
throw new InvalidTaskException("blah");
};
Mockito.doAnswer(a).when(callback).registerNewTask(Mockito.any(), Mockito.any(), Mockito.anyMap());
- WorkerQueueConsumerImpl impl = new WorkerQueueConsumerImpl(callback, metrics, consumerEvents, channel, publisherEvents, retryKey, 1);
+ WorkerQueueConsumerImpl impl = new WorkerQueueConsumerImpl(
+ callback, metrics, consumerEvents, channel, publisherEvents, retryKey, 1, invalidKey,
+ dataStore, codec, () -> {}, mock(WorkerConfiguration.class));
DefaultRabbitConsumer consumer = new DefaultRabbitConsumer(consumerEvents, impl);
Thread t = new Thread(consumer);
t.start();
- AMQP.BasicProperties prop = Mockito.mock(AMQP.BasicProperties.class);
+ AMQP.BasicProperties prop = mock(AMQP.BasicProperties.class);
Mockito.when(prop.getHeaders()).thenReturn(Collections.emptyMap());
consumer.handleDelivery("consumer", newEnv, prop, data);
Event pubEvent = publisherEvents.poll(1, TimeUnit.SECONDS);
Assert.assertNotNull(pubEvent);
- WorkerPublisher publisher = Mockito.mock(WorkerPublisher.class);
+ WorkerPublisher publisher = mock(WorkerPublisher.class);
ArgumentCaptor