Skip to content

Commit 117f77b

Browse files
Merge pull request #272 from rabbitmq/rabbitmq-java-client-271
Let reply properties be updated in RpcServer
2 parents 842cb42 + f146c8f commit 117f77b

File tree

3 files changed

+135
-3
lines changed

3 files changed

+135
-3
lines changed

src/main/java/com/rabbitmq/client/RpcServer.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,11 @@ public void processRequest(QueueingConsumer.Delivery request)
144144
String replyTo = requestProperties.getReplyTo();
145145
if (correlationId != null && replyTo != null)
146146
{
147-
AMQP.BasicProperties replyProperties
148-
= new AMQP.BasicProperties.Builder().correlationId(correlationId).build();
147+
AMQP.BasicProperties.Builder replyPropertiesBuilder
148+
= new AMQP.BasicProperties.Builder().correlationId(correlationId);
149+
AMQP.BasicProperties replyProperties = preprocessReplyProperties(request, replyPropertiesBuilder);
149150
byte[] replyBody = handleCall(request, replyProperties);
151+
replyProperties = postprocessReplyProperties(request, replyProperties.builder());
150152
_channel.basicPublish("", replyTo, replyProperties, replyBody);
151153
} else {
152154
handleCast(request);
@@ -187,6 +189,27 @@ public byte[] handleCall(byte[] requestBody,
187189
return new byte[0];
188190
}
189191

192+
/**
193+
* Gives a chance to set/modify reply properties before handling call.
194+
* Note the correlationId property is already set.
195+
* @param request the inbound message
196+
* @param builder the reply properties builder
197+
* @return the properties to pass in to the handling call
198+
*/
199+
protected AMQP.BasicProperties preprocessReplyProperties(QueueingConsumer.Delivery request, AMQP.BasicProperties.Builder builder) {
200+
return builder.build();
201+
}
202+
203+
/**
204+
* Gives a chance to set/modify reply properties after the handling call
205+
* @param request the inbound message
206+
* @param builder the reply properties builder
207+
* @return the properties to pass in to the response message
208+
*/
209+
protected AMQP.BasicProperties postprocessReplyProperties(QueueingConsumer.Delivery request, AMQP.BasicProperties.Builder builder) {
210+
return builder.build();
211+
}
212+
190213
/**
191214
* Lowest-level handler method. Calls
192215
* handleCast(AMQP.BasicProperties,byte[]).

src/test/java/com/rabbitmq/client/test/ClientTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@
4747
DnsSrvRecordAddressResolverTest.class,
4848
JavaNioTest.class,
4949
ConnectionFactoryTest.class,
50-
RecoveryAwareAMQConnectionFactoryTest.class
50+
RecoveryAwareAMQConnectionFactoryTest.class,
51+
RpcTest.class
5152
})
5253
public class ClientTests {
5354

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
17+
package com.rabbitmq.client.test;
18+
19+
import com.rabbitmq.client.*;
20+
import org.junit.After;
21+
import org.junit.Before;
22+
import org.junit.Test;
23+
24+
import java.io.IOException;
25+
import java.util.Collections;
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
29+
import static org.junit.Assert.assertEquals;
30+
31+
public class RpcTest {
32+
33+
Connection clientConnection, serverConnection;
34+
Channel clientChannel, serverChannel;
35+
String queue = "rpc.queue";
36+
RpcServer rpcServer;
37+
38+
39+
@Before public void init() throws Exception {
40+
clientConnection = TestUtils.connectionFactory().newConnection();
41+
clientChannel = clientConnection.createChannel();
42+
serverConnection = TestUtils.connectionFactory().newConnection();
43+
serverChannel = serverConnection.createChannel();
44+
serverChannel.queueDeclare(queue, false, false, false, null);
45+
}
46+
47+
@After public void tearDown() throws Exception {
48+
if(rpcServer != null) {
49+
rpcServer.terminateMainloop();
50+
}
51+
if(serverChannel != null) {
52+
serverChannel.queueDelete(queue);
53+
}
54+
clientConnection.close();
55+
serverConnection.close();
56+
}
57+
58+
@Test
59+
public void rpc() throws Exception {
60+
rpcServer = new TestRpcServer(serverChannel, queue);
61+
new Thread(new Runnable() {
62+
@Override
63+
public void run() {
64+
try {
65+
rpcServer.mainloop();
66+
} catch (Exception e) {
67+
// safe to ignore when loops ends/server is canceled
68+
}
69+
}
70+
}).start();
71+
72+
RpcClient client = new RpcClient(clientChannel, "", queue, 1000);
73+
RpcClient.Response response = client.doCall(null, "hello".getBytes());
74+
assertEquals("*** hello ***", new String(response.getBody()));
75+
assertEquals("pre-hello", response.getProperties().getHeaders().get("pre").toString());
76+
assertEquals("post-hello", response.getProperties().getHeaders().get("post").toString());
77+
client.close();
78+
}
79+
80+
private static class TestRpcServer extends RpcServer {
81+
82+
public TestRpcServer(Channel channel, String queueName) throws IOException {
83+
super(channel, queueName);
84+
}
85+
86+
@Override
87+
protected AMQP.BasicProperties preprocessReplyProperties(QueueingConsumer.Delivery request, AMQP.BasicProperties.Builder builder) {
88+
Map<String, Object> headers = new HashMap<String, Object>();
89+
headers.put("pre", "pre-" + new String(request.getBody()));
90+
builder.headers(headers);
91+
return builder.build();
92+
}
93+
94+
@Override
95+
public byte[] handleCall(QueueingConsumer.Delivery request, AMQP.BasicProperties replyProperties) {
96+
String input = new String(request.getBody());
97+
return ("*** " + input + " ***").getBytes();
98+
}
99+
100+
@Override
101+
protected AMQP.BasicProperties postprocessReplyProperties(QueueingConsumer.Delivery request, AMQP.BasicProperties.Builder builder) {
102+
Map<String, Object> headers = new HashMap<String, Object>(builder.build().getHeaders());
103+
headers.put("post", "post-" + new String(request.getBody()));
104+
builder.headers(headers);
105+
return builder.build();
106+
}
107+
}
108+
}

0 commit comments

Comments
 (0)