-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAMQPTesterChannels.java
More file actions
240 lines (186 loc) · 8.69 KB
/
AMQPTesterChannels.java
File metadata and controls
240 lines (186 loc) · 8.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
//This is a specific tester which checks if an AMQP library honors the channel limits
//as specificed in the initial handshake
import java.util.*;
/*
* This test checks that a client does not open more than no_channels channels to the broker
* The client under test is sent a maximum channel limit of no_channels during connection and
* the client under test should attempt to open as many channels as possible in order to see
* if the limit is honored
*/
public class AMQPTesterChannels extends AMQPTester {
//The server will send this channel limit to the client
public static final int no_channels = 10;
//The current count of open channels
public int open_channels = 0;
//Queue of incoming frames
LinkedList<AMQPFrame> queue_incoming = new LinkedList<AMQPFrame>();
//Queue of outgoing frames
LinkedList<AMQPFrame> queue_outgoing = new LinkedList<AMQPFrame>();
//Tester state enumeration
public enum State {
INITIALIZING, //Connection.Start, Connection.Tune
HANDSHAKE_COMPLETE, //Connection.Start and Tune complete
}
//Associated AMQPConnection
AMQPConnection amqpConnection;
//Current state this tester is in
public State state = State.INITIALIZING;
//Constructor, we've just completed the handshake and the client now expects a
//connection.start object
AMQPTesterChannels(AMQPConnection amqpConnection) {
//Store reference to the AMQPConnection we are working with
this.amqpConnection = amqpConnection;
//Arguments in Connection.Start
LinkedHashMap<AShortString, AMQPNativeType> start_arg = new LinkedHashMap<AShortString, AMQPNativeType>();
//Properties of server-properties
LinkedHashMap<AShortString, AMQPNativeType> server_props = new LinkedHashMap<AShortString, AMQPNativeType>();
server_props.put(new AShortString("copyright"), new ALongString("Hello World Inc."));
//Add the expected data to the Connection.Start arglist
start_arg.put(new AShortString("version-major"), new AOctet(0x00));
start_arg.put(new AShortString("version-minor"), new AOctet(0x09));
start_arg.put(new AShortString("server-properties"), new AFieldTable(server_props));
start_arg.put(new AShortString("mechanisms"), new ALongString("PLAIN AMQPPLAIN"));
start_arg.put(new AShortString("locales"), new ALongString("en-US"));
//Build the complete frame
AMQPFrame start_frame = AMQPMethodFrame.build(10, 10, start_arg);
//Queue the frame up to be sent to the client
queue_outgoing.add(start_frame);
System.out.println("Sending Connection.Start");
}
//Called when a frame is received and we are still initalizing
public void updateInitializing() {
//Make sure we have at least one frame
if (queue_incoming.size() == 0) return;
//Get the received frame
AMQPFrame frame = queue_incoming.pop();
//Did we receive a method frame?
if (frame.amqpFrameType == AMQPFrame.AMQPFrameType.METHOD) {
//Get the inner frame, which is an AMQPMethodFrame in this case
AMQPMethodFrame inner = (AMQPMethodFrame) frame.innerFrame;
System.out.println("Received: " + inner.toString());
//Start-OK
if (inner.amqpClass.toInt() == 10 && inner.amqpMethod.toInt() == 11) {
//Send Connection.Tune
//Arguments to include in the method call
LinkedHashMap<AShortString, AMQPNativeType> arguments = new LinkedHashMap<AShortString, AMQPNativeType>();
arguments.put(new AShortString("channel-max"), new AShortUInt(no_channels));
arguments.put(new AShortString("frame-max"), new ALongUInt(4096));
arguments.put(new AShortString("heartbeat"), new AShortUInt(0)); //Ignore heartbeats for now
//Send connection.tune
queue_outgoing.add(AMQPMethodFrame.build(10, 30, arguments));
System.out.println("Sending Connection.Tune");
}
//Connection.Tune-ok
if (inner.amqpClass.toInt() == 10 && inner.amqpMethod.toInt() == 31) {
state = State.HANDSHAKE_COMPLETE;
System.out.println("Handshake phase complete");
}
} else { //We are not expecting any non-method frames here
//Invalid frame, disconnect the client
amqpConnection.status = AMQPConnection.AMQPConnectionState.DISCONNECT;
System.out.println("AMQPTesterChannels: Received bad frame during initialization");
}
}
//Periodical update
public void periodical() {
}
//Currently triggered upon modifying the incoming frame queue
//May be periodically triggered in the future
public void updateState() {
//Are we initializing? This is handeled separately to make the code more clean
if (state == State.INITIALIZING) {
updateInitializing();
return;
}
//Make sure that we have incoming data
if (queue_incoming.size() == 0) return;
AMQPFrame frame = queue_incoming.pop();
//Did we receive a Method frame?
if (frame.amqpFrameType == AMQPFrame.AMQPFrameType.METHOD) {
//Get the inner frame that contains all important frame data
AMQPMethodFrame inner = (AMQPMethodFrame) frame.innerFrame;
System.out.println("Frame received (full size: " + frame.size() + "): " + inner.toString());
//Connection.open
if (inner.amqpClass.toInt() == 10 && inner.amqpMethod.toInt() == 40) {
//Maybe check the path in the future if needed?
//Send connection.open-ok
//The supplied octet is the reserved field
queue_outgoing.add(AMQPMethodFrame.build(10, 41, new AOctet(0x00)));
System.out.println("Sending Connection.Open-OK");
}
//Connection.close
if (inner.amqpClass.toInt() == 10 && inner.amqpMethod.toInt() == 50) {
//Maybe check the path in the future if needed?
//Send connection.open-ok
//The supplied octet is the reserved field
queue_outgoing.add(AMQPMethodFrame.build(10, 51));
System.out.println("Sending Connection.Close-OK");
}
//Channel.open
if (inner.amqpClass.toInt() == 20 && inner.amqpMethod.toInt() == 10) {
//Make sure the channel is being opened on another channel number than zero
if (frame.channel.toInt() == 0) {
System.out.println("*** WARNING: Received channel-open on channel 0");
}
//Check if we are over the channel limit
if (open_channels >= no_channels) {
System.out.println("*** WARNING: Over channel limit, but still requesting more channels");
//Make AMQPTester drop the client if it requests more channels than
//negotiated. This should be done according to the specs.
//amqpConnection.status = AMQPConnection.AMQPConnectionState.DISCONNECT;
//return;
}
//Build the channel.open-ok frame
//Arguments: class, method, args (arg in this case is reserved)
AMQPFrame outgoing = AMQPMethodFrame.build(20, 11, new ALongUInt(0));
//Reply on same channel as we got the message on
outgoing.channel = frame.channel;
//Queue frame to be sent
queue_outgoing.add(outgoing);
//Increase channel number
open_channels += 1;
//Debugging
System.out.println("Sending Channel.Open-OK (Currently open: " + open_channels + ", last opened: " + frame.channel.toString() + ")");
}
//Channel.close
if (inner.amqpClass.toInt() == 20 && inner.amqpMethod.toInt() == 40) {
//Prepare channel.close-ok
AMQPFrame outgoing = AMQPMethodFrame.build(20, 41, new ALongUInt(0));
//Reply on same channel as we got the message on
outgoing.channel = frame.channel;
//Queue frame to be sent
queue_outgoing.add(outgoing);
//Debugging
System.out.println("Sending Channel.Close-OK");
//Decrease channel number
open_channels -= 1;
}
}
//Did we receive a Header frame?
if (frame.amqpFrameType == AMQPFrame.AMQPFrameType.HEADER) {
System.out.println("Received header frame in TesterChannels, not interested...");
System.out.println(frame.innerFrame.toString());
}
//Did we receive a Body frame?
if (frame.amqpFrameType == AMQPFrame.AMQPFrameType.BODY) {
System.out.println("Received body frame (full size: " + frame.toWire().length() + ") in TesterSimple, data:");
System.out.println(frame.innerFrame.toString());
}
}
//Called when a frame has been received and decoded over the wire
public void deliverFrame(AMQPFrame amqpFrame) {
//Add frame to queue
queue_incoming.add(amqpFrame);
//Trigger state update
updateState();
}
//Get a frame from the internal queue
//Returns null if no frames are available
public AMQPFrame getFrame() {
if (queue_outgoing.size() != 0) {
//System.out.println("AMQPTesterChannels sent a frame");
return queue_outgoing.pop();
}
return null;
}
};