-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbot.py
More file actions
339 lines (285 loc) · 12.1 KB
/
bot.py
File metadata and controls
339 lines (285 loc) · 12.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
import discord
from discord.ext import commands
import logging
import sys
import datetime
import traceback
from collections import deque
import contextlib
import os
import json
import functools
import creds
import click
import importlib
import asyncio
from cogs.utils.db import Table
from cogs.utils import context
import config
db_path = os.path.join(os.getcwd(), 'cogs', 'utils', 'database.db')
json_location = os.path.join(os.getcwd(), 'cogs', 'utils', 'mathsbot.json')
webhook = discord.Webhook.partial(id=creds.webhookid, token=creds.webhooktoken, adapter=discord.RequestsWebhookAdapter())
# webhook for logging command errors
initial_extensions = ['cogs.jokes',
'cogs.games',
'cogs.stats',
'cogs.hangman',
'cogs.mod',
'cogs.admin',
'cogs.roles',
'cogs.leaderboard'
]
# cogs to load
def run_bot():
loop = asyncio.get_event_loop()
log = logging.getLogger()
try:
pool = loop.run_until_complete(Table.create_pool(config.postgresql, command_timeout=60))
except Exception as e:
click.echo('Could not set up PostgreSQL. Exiting.', file=sys.stderr)
log.exception('Could not set up PostgreSQL. Exiting.')
return
bot = MathsBot()
bot.pool = pool
bot.run()
# run bot
class MathsBot(commands.Bot):
def __init__(self):
super().__init__(command_prefix=self.find_prefix, case_insensitive=True)
# setup bot
self.remove_command(name='help')
# we have our own help formatter
self._prev_events = deque(maxlen=10)
self.webhook = webhook
# # assign error log webhook to bot, make it easy to call elsewhere
self.extns = initial_extensions
with open(json_location) as guildinfo:
self.loaded = json.load(guildinfo)
for e in initial_extensions:
# load cogs
try:
self.load_extension(e)
except Exception as e:
print(f'Failed to load extension {e}.', file=sys.stderr)
print(e)
def get_guild_prefix(self, guildid):
pref = []
for prefixes in self.loaded['prefixes']:
if prefixes['guildid'] == guildid:
pref.append(prefixes['prefix'])
return pref
def get_ignored(self, guildid, cid='all'):
ignored = []
for users_channels in self.loaded['ignored']:
if cid == 'all':
if users_channels['guildid'] == guildid:
ignored.append(users_channels['id'])
else:
if (users_channels['id'] == cid) and (users_channels['guildid'] == guildid):
ignored.append(users_channels['id'])
return ignored
def get_global_ignored(self, id):
ignored = []
for users_channels in self.loaded['global_ignored']:
if id == 'all':
ignored.append(users_channels['id'])
else:
if users_channels['id'] == id:
ignored.append(users_channels['id'])
return ignored
def get_blacklisted(self, id):
for guildids in self.loaded['blacklisted_guilds']:
return guildids['id'] == id
def get_colours(self, guildid, colour):
roles = []
for role in self.loaded['colour_roles']:
if (role['colour'] == colour) and (role['guildid'] == guildid):
roles.append(role)
if (colour == 'all') and (role['guildid'] == guildid):
roles.append(role)
return roles
def get_config(self, guildid, config, *, userid):
for role in self.loaded['config']:
if role['guildid'] == guildid:
return role[config]
return False
def get_admin(self, guildid, userid):
for admin in self.loaded['admin']:
if (admin['guildid'] == guildid) and (admin['userid'] == userid):
return True
return False
def get_mod(self, guildid, userid):
for mod in self.loaded['mod']:
if (mod['guildid'] == guildid) and (mod['userid'] == userid):
return True
return False
def find_prefix(self, bot, msg):
# callable prefix
bot_id = bot.user.id
# bot user id
prefixes = [f'<@{bot_id}> ', f'<@!{bot_id}> ']
# @mathsbot is always going to be a prefix
if msg.guild is None:
prefixes.append('!')
# prefix is ! in dms
else:
try:
prefix = self.get_guild_prefix(msg.guild.id)
prefixes.extend(prefix)
if not prefix:
prefixes.append('!')
except KeyError:
prefixes.extend(['!', '?'])
return prefixes
def save_to_json(self):
"""
Save json to the file.
"""
with open(json_location, 'w') as outfile:
json.dump(self.loaded, outfile)
async def save_json(self):
thing = functools.partial(self.save_to_json)
await self.loop.run_in_executor(None, thing)
async def on_message(self, message):
# on any message
if message.author.bot:
# ignore command if author is a bot
return
# if message.author.id != 230214242618441728:
# return
# ignored = self.get_ignored(message.guild.id, cid='all')
# ignored.extend(self.get_global_ignored('all'))
# if message.guild.id in ignored:
# return
# if message.channel.id in ignored:
# return
# if message.author.id in ignored:
# return
# send rest of messages through (to look for prefix, command etc.)
await self.process_commands(message)
async def process_commands(self, message):
ctx = await self.get_context(message, cls=context.Context)
if ctx.command is None:
return
async with ctx.acquire():
await self.invoke(ctx)
async def on_command(self, ctx):
await ctx.message.channel.trigger_typing()
async def on_command_error(self, ctx, error):
print('ok')
print(''.join(traceback.format_exception(type(error), error, error.__traceback__, chain=False)))
# we dont want logs for this stuff which isnt our problem
ignored = (commands.NoPrivateMessage, commands.DisabledCommand, commands.CheckFailure,
commands.CommandNotFound, commands.UserInputError, discord.Forbidden)
error = getattr(error, 'original', error)
# filter errors we dont want
if isinstance(error, ignored):
return
# send error to log channel
e = discord.Embed(title='Command Error', colour=0xcc3366)
e.add_field(name='Name', value=ctx.command.qualified_name)
e.add_field(name='Author', value=f'{ctx.author} (ID: {ctx.author.id})')
fmt = f'Channel: {ctx.channel} (ID: {ctx.channel.id})'
if ctx.guild:
fmt = f'{fmt}\nGuild: {ctx.guild} (ID: {ctx.guild.id})'
e.add_field(name='Location', value=fmt, inline=False)
# format legible traceback
exc = ''.join(traceback.format_exception(type(error), error, error.__traceback__, chain=False))
e.description = f'```py\n{exc}\n```'
e.timestamp = datetime.datetime.utcnow()
# send to log channel with webhook attribute assigned to bot earlier
self.webhook.send(embed=e)
def send_guild_stats(self, e, guild):
# when bot joins a server I want to know about it. Sends info about the server
e.add_field(name='Name', value=guild.name)
e.add_field(name='ID', value=guild.id)
e.add_field(name='Owner', value=f'{guild.owner} (ID: {guild.owner.id})')
# gives me stats about how many/percentage of bots
bots = sum(m.bot for m in guild.members)
total = guild.member_count
online = sum(m.status is discord.Status.online for m in guild.members)
e.add_field(name='Members', value=str(total))
e.add_field(name='Bots', value=f'{bots} ({bots/total:.2%})')
e.add_field(name='Online', value=f'{online} ({online/total:.2%})')
if guild.icon:
e.set_thumbnail(url=guild.icon_url)
if guild.me:
e.timestamp = guild.me.joined_at
# send to error log channel with same webhook assigned to bot
self.webhook.send(embed=e)
async def on_guild_join(self, guild):
if self.get_blacklisted(guild.id):
for channel in guild.text_channels:
try:
channel.send("It appears this server has been blacklisted :thinking_face: "
"This could be because"
" you have too many bots or members spamming messages that will lag my client. "
"If you believe this is incorrect, please join my support server: xxyyzz")
return await guild.leave()
except discord.Forbidden:
pass
return await guild.leave()
# when I join a guild send message to error log channel that I've joined a guild
e = discord.Embed(colour=0x53dda4, title='New Guild') # green colour
self.send_guild_stats(e, guild)
sys_channel = guild.system_channel
if sys_channel:
# if there is a 'default' channel (ie. where default discord welcome msg pops up) send message there
await sys_channel.send('Hello, thanks for adding me! \N{UPSIDE-DOWN FACE} '
'You can find my help with `!help` and change my prefix with '
'`!changeprefix [new prefix]`')
else:
# otherwise just get all text channels in guild
for channel in guild.text_channels:
try:
# send message to first one that pops up
await channel.send("Hello, thanks for adding me! \N{UPSIDE-DOWN FACE}. I couldn't find"
"a default channel so came here. "
"You can find my help with `!help` and change my prefix with "
"`!changeprefix [new prefix]`")
return
except discord.Forbidden:
# I dont have permissions to send messages in this channel, continue to next one
pass
async with aiosqlite.connect(db_path) as db:
c = await db.execute("SELECT * FROM action_log_config WHERE guildid = :id",
{'id': guild.id})
dump = await c.fetchall()
if len(dump) == 0:
await db.execute("INSERT INTO action_log_config VALUES ('0000000000000000000000',"
" :id, '', '', '', '', '')",
{'id': guild.id})
await db.execute("INSERT INTO guildinfo VALUES (:id, '!', 0)",
{'id': guild.id})
await db.commit()
async def on_guild_remove(self, guild):
# when bot leaves a guild send msg to error log channel
e = discord.Embed(colour=0xdd5f53, title='Left Guild') # red colour
self.send_guild_stats(e, guild)
def find_command(self, bot, command):
"""Finds a command (be it parent or sub command) based on string given"""
# idk why this is still here from old help message maker I made
cmd = None
for part in command.split():
try:
if cmd is None:
cmd = bot.get_command(part)
else:
cmd = cmd.get_command(part)
except AttributeError:
cmd = None
break
return cmd
def run(self):
# run it or tell me why it won't work
print('ok')
try:
super().run(creds.discordtoken)
except Exception as e:
print(e)
@property
def config(self):
return __import__('config')
if __name__ == '__main__':
bot = MathsBot()
bot.run(config.token)