@@ -40,6 +40,7 @@ public void openConnection()
40
40
throws IOException , TimeoutException {
41
41
try {
42
42
connectionFactory .useSslProtocol ();
43
+ connectionFactory .useNio ();
43
44
} catch (Exception ex ) {
44
45
throw new IOException (ex .toString ());
45
46
}
@@ -55,15 +56,15 @@ public void openConnection()
55
56
}
56
57
}
57
58
if (connection == null ) {
58
- fail ("Couldn't open TLS connection after 3 attemps " );
59
+ fail ("Couldn't open TLS connection after 3 attempts " );
59
60
}
60
61
61
62
}
62
63
63
64
@ Test
64
65
public void connectionGetConsume () throws Exception {
65
66
CountDownLatch latch = new CountDownLatch (1 );
66
- connection = basicGetBasicConsume (connection , "tls.nio.queue" , latch );
67
+ connection = basicGetBasicConsume (connection , "tls.nio.queue" , latch , 100 * 1000 );
67
68
boolean messagesReceived = latch .await (5 , TimeUnit .SECONDS );
68
69
assertTrue ("Message has not been received" , messagesReceived );
69
70
}
@@ -94,20 +95,31 @@ public void configure(SSLEngine sslEngine) throws IOException {
94
95
}
95
96
}
96
97
97
- private Connection basicGetBasicConsume (Connection connection , String queue , final CountDownLatch latch )
98
+ @ Test public void messageSize () throws Exception {
99
+ int [] sizes = new int [] {100 , 1000 , 10 * 1000 , 1 * 1000 * 1000 , 5 * 1000 * 1000 };
100
+ for (int size : sizes ) {
101
+ CountDownLatch latch = new CountDownLatch (1 );
102
+ connection = basicGetBasicConsume (connection , "tls.nio.queue" , latch , size );
103
+ boolean messagesReceived = latch .await (5 , TimeUnit .SECONDS );
104
+ assertTrue ("Message has not been received" , messagesReceived );
105
+ }
106
+ }
107
+
108
+ private Connection basicGetBasicConsume (Connection connection , String queue , final CountDownLatch latch , int msgSize )
98
109
throws IOException , TimeoutException {
99
110
Channel channel = connection .createChannel ();
100
111
channel .queueDeclare (queue , false , false , false , null );
101
112
channel .queuePurge (queue );
102
113
103
- channel .basicPublish ("" , queue , null , new byte [100 * 1000 ]);
114
+ channel .basicPublish ("" , queue , null , new byte [msgSize ]);
104
115
105
116
channel .basicConsume (queue , false , new DefaultConsumer (channel ) {
106
117
107
118
@ Override
108
119
public void handleDelivery (String consumerTag , Envelope envelope , AMQP .BasicProperties properties , byte [] body ) throws IOException {
109
120
getChannel ().basicAck (envelope .getDeliveryTag (), false );
110
121
latch .countDown ();
122
+ getChannel ().basicCancel (consumerTag );
111
123
}
112
124
});
113
125
0 commit comments