-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRpcClient.cs
More file actions
97 lines (85 loc) · 3.2 KB
/
RpcClient.cs
File metadata and controls
97 lines (85 loc) · 3.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.Serialization.Formatters.Binary;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace RabbitMQ_Test
{
public class ClientRPCRabbitMQWrapper : IClientRPCRabbitMQWrapper
{
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly EventingBasicConsumer consumer;
private readonly ConcurrentDictionary<string, TaskCompletionSource<object>> callbackMapper =
new ConcurrentDictionary<string, TaskCompletionSource<object>>();
public ClientRPCRabbitMQWrapper(string hostname)
{
var factory = new ConnectionFactory() { HostName = hostname };
connection = factory.CreateConnection();
channel = connection.CreateModel();
replyQueueName = channel.QueueDeclare(queue: "").QueueName;
consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
if (!callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out TaskCompletionSource<object> tcs))
return;
var body = ea.Body.ToArray();
var response = FromByteArray<object>(body);
tcs.TrySetResult(response);
};
channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true);
}
public Task<object> CallAsync<T>(RPCRequest request, CancellationToken cancellationToken = default)
{
var correlationId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;
var messageBytes = ToByteArray(request);
var tcs = new TaskCompletionSource<object>();
callbackMapper.TryAdd(correlationId, tcs);
channel.BasicPublish(
exchange: "",
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes);
cancellationToken.Register(() => callbackMapper.TryRemove(correlationId, out var tmp));
return tcs.Task;
}
public void Close()
{
channel.Close();
connection.Close();
}
private byte[] ToByteArray<N>(N obj)
{
if (obj == null)
return default;
BinaryFormatter bf = new BinaryFormatter();
MemoryStream ms = new MemoryStream();
bf.Serialize(ms, obj);
return ms.ToArray();
}
public N FromByteArray<N>(byte[] data)
{
if (data == null)
return default;
BinaryFormatter bf = new BinaryFormatter();
using (MemoryStream ms = new MemoryStream(data))
{
object obj = bf.Deserialize(ms);
return (N)obj;
}
}
}
}