-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmsmp_client.py
More file actions
412 lines (330 loc) · 15.9 KB
/
msmp_client.py
File metadata and controls
412 lines (330 loc) · 15.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
import json
import asyncio
import websockets
import logging
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import time
import threading
@dataclass
class PlayerListInfo:
current_players: int = 0
max_players: int = 20
player_names: List[str] = None
def __init__(self):
self.player_names = []
def __str__(self):
return f"PlayerListInfo{{current={self.current_players}, max={self.max_players}, players={', '.join(self.player_names)}}}"
class MSMPClient:
"""
Minecraft Server Management Protocol (MSMP) 客户端
基于 JSON-RPC 2.0 over WebSocket
"""
def __init__(self, host: str, port: int, auth_token: str, logger: logging.Logger, config_manager=None):
self.host = host
self.port = port
self.auth_token = auth_token
self.logger = logger
self.config_manager = config_manager
self.websocket = None
self.connected = False
self.authenticated = False
self.request_id_counter = 1
self.pending_requests = {}
self.event_listener = None
# 心跳相关
self.last_pong_time = 0
self.heartbeat_interval = 30
self.heartbeat_timeout = 90
self.heartbeat_task = None
self.receive_task = None
self.loop = asyncio.new_event_loop()
self.thread = None
async def connect(self):
"""连接到MSMP服务器"""
try:
# 检查是否已经连接
if self.connected and self.websocket and not self.websocket.closed:
self.logger.debug("MSMP已经连接,无需重复连接")
return True
headers = {"Authorization": f"Bearer {self.auth_token}"}
self.websocket = await websockets.connect(
f"ws://{self.host}:{self.port}",
extra_headers=headers,
ping_interval=None,
ping_timeout=None,
close_timeout=10
)
self.connected = True
self.authenticated = True
self.last_pong_time = time.time()
self.logger.info(f"已连接到MSMP服务器 {self.host}:{self.port}")
# 启动消息接收循环
self.receive_task = asyncio.create_task(self._receive_loop())
# 启动心跳检测
await asyncio.sleep(2)
self.heartbeat_task = asyncio.create_task(self._heartbeat_loop())
return True
except Exception as e:
self.logger.error(f"连接MSMP服务器失败: {e}")
self.connected = False
self.authenticated = False
raise
def set_shutdown_mode(self):
"""设置关闭模式,停止所有活动"""
self.connected = False
self.authenticated = False
# 取消所有任务
if self.heartbeat_task and not self.heartbeat_task.done():
self.heartbeat_task.cancel()
if self.receive_task and not self.receive_task.done():
self.receive_task.cancel()
# 清理pending请求
for request_id, future in list(self.pending_requests.items()):
if not future.done():
future.set_exception(Exception("连接已关闭"))
self.pending_requests.clear()
self.logger.debug("MSMP客户端已进入关闭模式")
async def _heartbeat_loop(self):
"""心跳检测循环"""
consecutive_failures = 0
max_consecutive_failures = 3
while self.connected:
try:
if self.websocket and not self.websocket.closed:
try:
# 使用 websocket.ping()
pong_waiter = await self.websocket.ping()
await asyncio.wait_for(pong_waiter, timeout=10.0)
self.last_pong_time = time.time()
consecutive_failures = 0
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("心跳检测成功")
except asyncio.TimeoutError:
consecutive_failures += 1
self.logger.warning(f"心跳超时 (连续失败: {consecutive_failures}/{max_consecutive_failures})")
if consecutive_failures >= max_consecutive_failures:
self.logger.error("心跳连续失败,连接可能已断开")
break
# 检查上次成功时间
time_since_last_success = time.time() - self.last_pong_time
if time_since_last_success > self.heartbeat_timeout:
self.logger.warning(f"心跳超时 ({time_since_last_success:.1f}秒),连接可能已断开")
break
await asyncio.sleep(self.heartbeat_interval)
except Exception as e:
self.logger.error(f"心跳循环异常: {e}")
break
async def _receive_loop(self):
"""消息接收循环"""
try:
async for message in self.websocket:
try:
await self._handle_message(message)
except Exception as e:
self.logger.error(f"处理消息时出错: {e}", exc_info=True)
except websockets.exceptions.ConnectionClosed as e:
self.logger.info(f"MSMP连接已关闭: {e}")
self.connected = False
self.authenticated = False
except Exception as e:
self.logger.error(f"接收循环异常: {e}", exc_info=True)
self.connected = False
self.authenticated = False
async def _handle_message(self, message: str):
"""处理接收到的消息"""
try:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"收到MSMP消息: {message[:200]}")
data = json.loads(message)
# 检查是否是响应消息(有 id 字段)
if 'id' in data and data['id'] is not None:
request_id = data['id']
future = self.pending_requests.pop(request_id, None)
if future and not future.done():
if 'error' in data:
error_msg = data['error'].get('message', 'Unknown error')
future.set_exception(Exception(error_msg))
else:
future.set_result(data)
# 检查是否是通知消息(有 method 字段)
elif 'method' in data:
await self._handle_notification(data)
except json.JSONDecodeError as e:
self.logger.error(f"JSON解析失败: {e}")
except Exception as e:
self.logger.error(f"处理MSMP消息失败: {e}", exc_info=True)
async def _handle_notification(self, notification: Dict[str, Any]):
"""处理通知消息"""
method = notification.get('method', '')
params = notification.get('params', [])
# 从数组中提取参数对象
params_obj = params[0] if params and len(params) > 0 else {}
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"处理通知 - 方法: {method}, 参数: {params_obj}")
if self.event_listener:
try:
# 处理通知方法
if method == 'minecraft:notification/server/started':
self.event_listener.on_server_started(params_obj)
elif method == 'minecraft:notification/server/stopping':
self.event_listener.on_server_stopping(params_obj)
elif method == 'minecraft:notification/players/joined':
self.event_listener.on_player_join(params_obj)
elif method == 'minecraft:notification/players/left':
self.event_listener.on_player_leave(params_obj)
elif method == 'minecraft:notification/server/saving':
self.logger.info("服务器正在保存...")
elif method == 'minecraft:notification/server/saved':
self.logger.info("服务器保存完成")
else:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"未处理的通知: {method}")
except Exception as e:
self.logger.error(f"处理事件监听器回调时出错: {e}", exc_info=True)
async def send_request(self, method: str, params: List[Any] = None) -> Dict[str, Any]:
"""发送JSON-RPC请求"""
if not self.connected or not self.websocket or self.websocket.closed:
raise Exception("MSMP连接未就绪")
request_id = self.request_id_counter
self.request_id_counter += 1
# 构建请求
request = {
"jsonrpc": "2.0",
"id": request_id,
"method": f"minecraft:{method}" if not method.startswith("minecraft:") else method,
"params": params or []
}
future = asyncio.Future()
self.pending_requests[request_id] = future
try:
request_json = json.dumps(request)
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"发送MSMP请求: {request_json[:200]}")
await self.websocket.send(request_json)
# 等待响应,设置超时
return await asyncio.wait_for(future, timeout=30.0)
except asyncio.TimeoutError:
self.pending_requests.pop(request_id, None)
raise Exception(f"请求 {method} 超时")
except Exception as e:
self.pending_requests.pop(request_id, None)
raise e
async def get_server_status(self) -> Dict[str, Any]:
"""获取服务器状态"""
response = await self.send_request("server/status")
return response.get('result', {})
async def get_player_list(self) -> PlayerListInfo:
"""获取玩家列表"""
response = await self.send_request("players")
info = PlayerListInfo()
if 'result' in response:
players = response['result']
info.player_names = [player.get('name', '') for player in players if 'name' in player]
info.current_players = len(info.player_names)
# 获取最大玩家数
try:
max_players_response = await self.send_request("serversettings/max_players")
info.max_players = max_players_response.get('result', 20)
except Exception as e:
self.logger.warning(f"获取最大玩家数失败: {e}")
info.max_players = 20
return info
async def execute_command(self, command: str) -> Dict[str, Any]:
"""执行命令"""
if command.lower() == "server/stop":
params = []
response = await self.send_request("server/stop", params)
return response
else:
params = [{"command": command}]
response = await self.send_request("server/command", params)
return response
async def get_game_rules(self) -> Dict[str, Any]:
"""获取游戏规则"""
response = await self.send_request("gamerules")
return response
async def close(self):
"""关闭连接"""
self.connected = False
self.authenticated = False
# 取消任务
if self.heartbeat_task and not self.heartbeat_task.done():
self.heartbeat_task.cancel()
if self.receive_task and not self.receive_task.done():
self.receive_task.cancel()
# 清理pending请求
for request_id, future in list(self.pending_requests.items()):
if not future.done():
future.set_exception(Exception("连接已关闭"))
self.pending_requests.clear()
# 关闭WebSocket
if self.websocket and not self.websocket.closed:
await self.websocket.close()
def is_authenticated(self) -> bool:
"""检查是否已认证(完整检查)"""
result = (self.authenticated and
self.connected and
self.websocket and
not self.websocket.closed)
# 调试日志
if not result and self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(
f"认证检查失败: authenticated={self.authenticated}, "
f"connected={self.connected}, "
f"websocket={'exists' if self.websocket else 'None'}, "
f"closed={self.websocket.closed if self.websocket else 'N/A'}"
)
return result
def is_connected(self) -> bool:
"""检查是否已连接(简化检查,更可靠)"""
return self.connected and self.websocket and not self.websocket.closed
def get_connection_status(self) -> str:
if not self.websocket or self.websocket.closed:
return "连接已关闭"
if not self.connected:
return "连接未就绪"
if not self.authenticated:
return "未认证"
return "连接正常"
def set_event_listener(self, listener):
"""设置事件监听器"""
self.event_listener = listener
def start_background_loop(self):
"""在后台线程中启动事件循环"""
def run_loop():
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
self.thread = threading.Thread(target=run_loop, daemon=True)
self.thread.start()
# 同步方法包装器
def connect_sync(self):
"""同步连接"""
future = asyncio.run_coroutine_threadsafe(self.connect(), self.loop)
return future.result(timeout=30)
def get_server_status_sync(self) -> Dict[str, Any]:
"""同步获取服务器状态"""
future = asyncio.run_coroutine_threadsafe(self.get_server_status(), self.loop)
return future.result(timeout=10)
def get_player_list_sync(self) -> PlayerListInfo:
"""同步获取玩家列表"""
future = asyncio.run_coroutine_threadsafe(self.get_player_list(), self.loop)
return future.result(timeout=10)
def execute_command_sync(self, command: str) -> Dict[str, Any]:
"""同步执行命令"""
future = asyncio.run_coroutine_threadsafe(self.execute_command(command), self.loop)
return future.result(timeout=30)
def close_sync(self):
"""同步关闭连接"""
future = asyncio.run_coroutine_threadsafe(self.close(), self.loop)
return future.result(timeout=5)
class ServerEventListener:
"""服务器事件监听器接口"""
def on_server_started(self, params: Dict[str, Any]):
pass
def on_server_stopping(self, params: Dict[str, Any]):
pass
def on_player_join(self, params: Dict[str, Any]):
pass
def on_player_leave(self, params: Dict[str, Any]):
pass