-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwebsocket.cpp
More file actions
155 lines (121 loc) · 3.34 KB
/
websocket.cpp
File metadata and controls
155 lines (121 loc) · 3.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#include "extension.h"
#include "websocket.h"
#include "rapidjson.h"
#include "native.h"
// Async times per tick
#define ASYNCTIMES 50
#define WS_URI "ncs-backend.ncs"
// WebSocket
WebSocket* g_websocket;
// Send Queue
std::queue<std::string> g_MsgSendQueue;
// Rev Queue
std::queue<std::string> g_MsgRevQueue;
// Connected forward
bool g_ConnectedCall = false;
// Disconnected forward
bool g_DisconnectedCall = false;
// Forward
IForward* g_OnReceivedMsg = nullptr;
IForward* g_OnConnected = nullptr;
IForward* g_OnDisconnected = nullptr;
void WS_Open()
{
g_pSM->LogMessage(myself, "WebSocket server connected!");
g_ConnectedCall = true;
}
void WS_Fail(const char* error)
{
g_pSM->LogError(myself, "WebSocket connection faild! Error: %s", error);
// Reconnect after 5s
timersys->CreateTimer(new WebSocketReconnect(), 5.0, nullptr, 0);
}
void WS_Close(const char* reason)
{
g_pSM->LogMessage(myself, "WebSocket connection closed! Reason: %s", reason);
// Reconnect after 5s
timersys->CreateTimer(new WebSocketReconnect(), 5.0, nullptr, 0);
g_DisconnectedCall = true;
}
void WS_Msg(const char* msg)
{
g_MsgRevQueue.push(msg);
}
void WS_Error(const char* error)
{
g_pSM->LogError(myself, "Websocket Errro: %s", error);
}
void ConnectWS()
{
// Create WebSocket
g_websocket = new WebSocket(WS_Error, WS_Open, WS_Fail, WS_Close, WS_Msg);
const char* uri = smutils->GetCoreConfigValue("NCS_Server");
g_websocket->connect(uri == nullptr ? WS_URI : uri);
}
void StartWebSocket()
{
ConnectWS();
// Add Forward
g_OnReceivedMsg = forwards->CreateForward("NCS_OnReceivedMsg", ET_Ignore, 2, NULL, Param_String, Param_Cell);
g_OnConnected = forwards->CreateForward("NCS_OnConnected", ET_Ignore, 0, NULL);
g_OnDisconnected = forwards->CreateForward("NCS_OnDisconnected", ET_Ignore, 0, NULL);
}
void StopWebSocket()
{
delete& g_websocket; //delete websocket connection
forwards->ReleaseForward(g_OnReceivedMsg);
forwards->ReleaseForward(g_OnConnected);
forwards->ReleaseForward(g_OnDisconnected);
}
void WebSocketAsyncCheck()
{
std::string msg;
if (g_ConnectedCall)
{
g_ConnectedCall = false;
g_OnConnected->Execute(nullptr);
}
if (g_DisconnectedCall)
{
g_DisconnectedCall = false;
g_OnDisconnected->Execute(nullptr);
}
for (int i = 0; i < ASYNCTIMES && !g_MsgRevQueue.empty(); i++)
{
msg = g_MsgRevQueue.front();
if (msg.empty())
continue;
rapidjson::Document document;
if (document.Parse(msg.c_str()).HasParseError() || !document.HasMember("Router") || !document.HasMember("Msg"))
continue;
if (!document["Router"].IsString())
continue;
Handle_t hdl = GetMsgHandle(new MsgRevJson(document["Msg"]));
// Call Forward with json handle
g_OnReceivedMsg->PushString(document["Router"].GetString());
g_OnReceivedMsg->PushCell(static_cast<cell_t>(hdl));
g_OnReceivedMsg->Execute(nullptr);
g_MsgRevQueue.pop();
FreeHandle(hdl); // Free handle when called forward
}
for (int i = 0; i < ASYNCTIMES && !g_MsgSendQueue.empty() && g_websocket->IsConnected(); i++)
{
g_websocket->send(g_MsgSendQueue.front());
g_MsgSendQueue.pop();
}
}
bool SendQuery(std::string str)
{
if (g_websocket->IsConnected())
{
g_MsgSendQueue.push(str);
return true;
}
return false;
}
ResultType WebSocketReconnect::OnTimer(ITimer* pTimer, void* pData)
{
g_pSM->LogMessage(myself, "Reconnect WebSocket server ...");
ConnectWS();
return Pl_Stop;
}