-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathmain.py
More file actions
executable file
·447 lines (375 loc) · 15.3 KB
/
main.py
File metadata and controls
executable file
·447 lines (375 loc) · 15.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
#!/usr/bin/env python
import asyncio
import logging
import os.path
import re
import shutil
import socket
from typing import Dict, List, Tuple, Optional
import configargparse
import discord
from discord import app_commands
from discord.ext import commands
from slack_bolt.adapter.socket_mode.aiohttp import AsyncSocketModeHandler
from slack_bolt.async_app import AsyncApp
import emoji_config
from custom_emoji_cache import CustomEmojiCache
from emoji_config import EmojiMapping
from markov import Markov
from time import time
logging.basicConfig(level=logging.INFO)
parser = configargparse.ArgParser(description="CodeBro: A triumph of machine over man")
parser.add_argument(
"-c", "--config", is_config_file=True, help="Path to config file in yaml format"
)
parser.add_argument(
"-d",
"--discord_token",
env_var="CB_DISCORD_TOKEN",
help="This bot's discord bot token",
)
parser.add_argument(
"--slack_bot_token",
env_var="CB_SLACK_BOT_TOKEN",
help="This bot's slack bot token (the one prefixed with \"xoxb-\" in \"OAuth Tokens for Your Workspace\" under \"OAuth & Permissions\")",
)
parser.add_argument(
"--slack_app_token",
env_var="CB_SLACK_APP_TOKEN",
help="This bot's slack app token (the one prefixed with \"xapp-\" in \"App-Level Tokens\" under \"Basic Information\")",
)
parser.add_argument(
"--local_server_port",
type=int,
help="Set a local listen port to enable a local server",
)
parser.add_argument(
"-b",
"--brain",
env_var="CB_BRAIN",
required=True,
help="This bot's input brain as a YAML or newline-delimited text file, also used as the base name for rotated brains",
)
parser.add_argument(
"-o",
"--output",
env_var="CB_OUTPUT",
required=True,
help="File for writing the real-time updated corpus",
)
parser.add_argument(
"-n",
"--name",
env_var="CB_NAME",
required=True,
help="The name this bot will respond to in chats",
)
parser.add_argument(
"-r",
"--rotate",
env_var="CB_ROTATE",
required=False,
action="store_true",
help="Backup the brain and copy the output to the brain on SIGTERM",
)
parser.add_argument(
"-u",
"--user_map",
env_var="USER_MAP",
required=False,
help="Discord-to-Slack user id map",
)
parser.add_argument(
"-e",
"--emoji_map_file",
env_var="EMOJI_MAP",
required=False,
help="Yaml file of mappings of words which will result in an emoji react",
)
parser.add_argument(
"-g",
"--guild_id",
env_var="GUILD_ID",
type=int,
required=True,
help="Guild ID to register commands to",
)
parser.add_argument(
"--extra_guild_ids",
type=int,
nargs='+',
required=False,
help="Extra Guild IDs to register commands to",
)
args = parser.parse_args()
discord_token = args.discord_token
slack_bot_token = args.slack_bot_token
slack_app_token = args.slack_app_token
user_map = args.user_map
bot_name = args.name
emoji_map_file = args.emoji_map_file
emoji_map_active = emoji_map_file is not None and emoji_map_file != ''
main_guild_id:int = args.guild_id
extra_guild_ids:List[int] = args.extra_guild_ids if args.extra_guild_ids is not None else list()
all_guild_objects:List[discord.Object] = [discord.Object(id=i) for i in ([main_guild_id] + extra_guild_ids)]
brain = Markov(args.brain, args.output, args.user_map, [bot_name])
intents = discord.Intents(guild_messages=True, message_content=True)
discord_client: discord.Client = None
def rotate_brain(the_brain: str, output: str):
brain_backup = "{}.{}".format(the_brain, time())
shutil.move(the_brain, brain_backup)
shutil.move(output, the_brain)
def sanitize_and_tokenize(msg: str) -> list[str]:
msg_tokens = msg.split()
for i in range(0, len(msg_tokens)):
msg_tokens[i] = msg_tokens[i].strip("'\"!@#$%^&*().,/\\+=<>?:;").upper()
return msg_tokens
my_emoji_config: emoji_config.EmojiConfig = emoji_config.read_emoji_config(emoji_map_file)
custom_emoji_cache: CustomEmojiCache = CustomEmojiCache()
def get_ten(is_slack) -> str:
response = ""
for i in range(0, 9):
response += brain.create_response(slack=is_slack)
response += "\n"
return response
#**********************< SLACK & DISCORD STUFF>**************************#
if discord_token:
discord_client = commands.Bot(command_prefix='?', intents=intents)
tree = discord_client.tree
else:
discord_client = None
if discord_client:
@discord_client.event
async def on_ready():
for g in discord_client.guilds:
print(f'Starting sync for guild {g.id}...')
await tree.sync(guild=g)
print(f'Synced for guild {g.id}')
print("Logged in as {0.user}".format(discord_client))
@discord_client.event
async def on_message(message:discord.Message):
if message.author == discord_client.user:
return
bot_display_names = [discord_client.user.display_name]
mentioned = False
for mention in message.mentions:
if mention.id == discord_client.user.id:
mentioned = True
break
await try_append_emoji_to_message(message)
# print(f"Discord message from {message.author}: {message.content}")
response = create_raw_response(message.content, False, force_mention=mentioned, other_bot_names=bot_display_names)
if response and response.strip() != "":
await message.channel.send(response)
@tree.command(
name="add_react",
description="Add a reaction emoji config",
guilds=all_guild_objects
)
@app_commands.describe(regex_str='The regex against which each token will be checked (case insensitive)',
emoji_str="The emoji string (either a single unicode character or an emoji's name) to react with")
@app_commands.checks.cooldown(3, 20, key=lambda i: (i.guild_id, i.user.id))
async def add_react(ctx: discord.Interaction, regex_str: str, emoji_str: str):
if not await get_user_has_role_for_interaction(ctx, 'admin'):
await ctx.response.send_message('Missing permissions for this command')
return
def is_valid_str_arg(arg: str) -> bool:
if arg is None or arg == '':
return False
return True
if not is_valid_str_arg(regex_str):
await ctx.response.send_message(f'Failed to add emoji since regex \"{regex_str}\" is invalid')
return
if not is_valid_str_arg(emoji_str):
await ctx.response.send_message(f'Failed to add emoji since emoji \"{emoji_str}\" is invalid')
return
existing_emoji_mapping = my_emoji_config.find_mapping_via_regex_str(regex_str, ctx.guild_id)
if existing_emoji_mapping is not None:
await ctx.response.send_message(f'Failed to add emoji since regex \"{regex_str}\" already in config')
return
def try_get_regex_pattern(regex_str:str) -> Optional[re.Pattern]:
try:
pattern = re.compile(regex_str)
return pattern
except:
return None
new_pattern = try_get_regex_pattern(regex_str)
if new_pattern is None:
await ctx.response.send_message(f'Failed to add emoji since regex \"{regex_str}\" is invalid')
return
if len(emoji_str) >= 2:
custom_emoji = await custom_emoji_cache.find_custom_emoji_with_name(ctx.guild, emoji_str, force_refresh_emoji=True)
if custom_emoji is None:
msg = f"Failed to add emoji. Couldn't find emoji \"{emoji_str}\""
print(msg)
await ctx.response.send_message(msg)
return
my_emoji_config.add_mapping(EmojiMapping(regex_str, emoji_str, ctx.guild_id))
if emoji_map_active:
emoji_config.write_emoji_config(emoji_map_file, my_emoji_config)
await ctx.response.send_message('Added emoji response')
@tree.command(
name="remove_react",
description="Remove a reaction emoji config",
guilds=all_guild_objects
)
@app_commands.describe(regex_str='The regex against which each token will be checked (case insensitive)')
@app_commands.checks.cooldown(3, 20, key=lambda i: (i.guild_id, i.user.id))
async def remove_react(ctx:discord.Interaction, regex_str:str):
if not await get_user_has_role_for_interaction(ctx, 'admin'):
await ctx.response.send_message('Missing permissions for this command')
return
def is_valid_str_arg(arg:str)->bool:
if arg is None or arg == '':
return False
return True
if not is_valid_str_arg(regex_str):
await ctx.response.send_message(f'Failed to remove emoji since regex \"{regex_str}\" is invalid')
return
removed = my_emoji_config.remove_mappings_for_regex(regex_str, ctx.guild_id)
if removed > 0:
if emoji_map_active:
emoji_config.write_emoji_config(emoji_map_file, my_emoji_config)
await ctx.response.send_message(f'Removed emoji react for regex \"{regex_str}\"')
else:
await ctx.response.send_message(f'Nothing to remove for regex \"{regex_str}\"')
@tree.command(
name="list_react",
description="List emoji reactions",
guilds=all_guild_objects
)
@app_commands.checks.cooldown(3, 20, key=lambda i: (i.guild_id, i.user.id))
async def list_react(ctx: discord.Interaction):
if not await get_user_has_role_for_interaction(ctx, 'admin'):
await ctx.response.send_message('Missing permissions for this command')
return
reply_content = 'Emoji:\n'
for emoji_mapping in my_emoji_config.emoji_config_list:
reply_content += f'{emoji_mapping.regex_str} => {emoji_mapping.emoji_str}\n'
await ctx.response.send_message(reply_content)
@tree.command(
name="force_sync",
description="Force Sync the bot",
guilds=all_guild_objects
)
@app_commands.checks.cooldown(3, 20, key=lambda i: (i.guild_id, i.user.id))
async def force_sync(ctx: discord.Interaction):
if not await get_user_has_role_for_interaction(ctx, 'admin'):
await ctx.response.send_message('Missing permissions for this command')
return
await ctx.response.defer()
print(f'Starting sync for guild {ctx.guild.id}...')
await tree.sync(guild=ctx.guild)
print(f'Synced for guild {ctx.guild.id}')
await ctx.followup.send('Forcibly synced')
async def get_user_has_role_for_interaction(ctx: discord.Interaction, role_name: str) -> bool:
"""For some reason I couldn't get app_commands.checks.has_role working. Something is missing in the discordpy
cache. Since we're using this for really low frequency operations, I just refetch everything here to do
checks solidly."""
guild = await discord_client.fetch_guild(ctx.guild_id)
if guild is None:
return False
user: discord.Member = await guild.fetch_member(ctx.user.id)
if user is None:
return False
guild_roles = await guild.fetch_roles()
found_role = None
for r in guild_roles:
if r.name.lower() == role_name.lower():
found_role = r
break
if found_role is None:
return False
# For some reason, ctx.user.get_role doesn't work here because ctx.guild has an empty roles dictionary?
user_role = user.get_role(found_role.id)
return user_role is not None
async def try_append_emoji_to_message(message:discord.Message):
msg_tokens = sanitize_and_tokenize(message.content)
reaction_count = 0
max_reaction_count = 5
for token in msg_tokens:
configured_reaction_emoji = my_emoji_config.find_emoji_for_message_token(token, message.guild.id)
if configured_reaction_emoji is None:
continue
custom_emoji = await custom_emoji_cache.find_custom_emoji_with_name(message.guild, configured_reaction_emoji.emoji_str)
if custom_emoji is not None:
await message.add_reaction(custom_emoji)
reaction_count += 1
else:
try:
await message.add_reaction(configured_reaction_emoji.emoji_str)
reaction_count += 1
except:
print(f'Failed to add emoji for {configured_reaction_emoji}')
if reaction_count >= max_reaction_count:
break
if slack_bot_token:
app = AsyncApp(token=slack_bot_token)
else:
app = None
if app:
@app.event("message")
async def handle_slack_message(payload):
response = create_raw_response(payload["text"], True)
if response and response.strip() != "":
await app.client.chat_postMessage(channel=payload["channel"], text=response)
slack_socket_client = AsyncSocketModeHandler(app, slack_app_token)
async def run_slack_app():
await slack_socket_client.connect_async()
#**********************</SLACK & DISCORD STUFF>**************************#
def create_raw_response(
incoming_message: str,
is_slack: bool,
force_mention: bool = False,
other_bot_names: Optional[List[str]] = None
):
msg_tokens = sanitize_and_tokenize(incoming_message)
mentioned = force_mention or any([n.upper() in msg_tokens for n in other_bot_names + [bot_name]])
if mentioned or "TOWN" in msg_tokens: # it's not _not_ a bug
if "GETGET10" in msg_tokens:
return get_ten(is_slack)
else:
return brain.create_response(incoming_message, learn=True, slack=is_slack)
# TODO: the local server should probably be a class and should probably be
# multi-threaded to handle simultaneous connections ... but this is expedient
# for quick local testing without Slack/Discord integration
#
# this will listen on a local server, if a port is specified.
# try connecting with netcat or something, like nc localhost <your port>
def run_local_server(port_num):
host = "localhost"
port = port_num
prompt = "\nSay something: "
print("Listening on port: " + str(port))
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((host, port))
s.listen(1)
conn, addr = s.accept()
with conn:
print("Connection recieved from", addr)
while True:
conn.sendall(str.encode(prompt))
data = conn.recv(1024)
if not data:
break
decoded_data = data.decode("utf-8")
response = create_raw_response(decoded_data, False)
if response:
response = bot_name + " " + "said: " + response
conn.sendall(str.encode(response))
# MAIN ----
basic_loop = asyncio.get_event_loop()
try:
if args.local_server_port:
run_local_server(port_num=args.local_server_port)
if app:
basic_loop.create_task(run_slack_app())
if discord_client:
basic_loop.create_task(discord_client.start(discord_token)),
basic_loop.run_forever()
except KeyboardInterrupt:
if args.rotate:
rotate_brain(args.brain, args.output)
finally:
basic_loop.close()