Log purged messages, Fix macro usage in DMs & Fix reading empty files (#88)
* Log purged messages * Fix CommandInvokeError for macros in DMs * Fix decoding empty files and simplify read json logic * Apply black formatting
This commit is contained in:
parent
f981d9837d
commit
7c4bf15c93
13 changed files with 95 additions and 125 deletions
|
@ -6,12 +6,12 @@ from discord.ext import commands
|
||||||
from discord.ext.commands import Cog
|
from discord.ext.commands import Cog
|
||||||
|
|
||||||
from robocop_ng.helpers.checks import check_if_collaborator
|
from robocop_ng.helpers.checks import check_if_collaborator
|
||||||
|
from robocop_ng.helpers.invites import add_invite
|
||||||
|
|
||||||
|
|
||||||
class Invites(Cog):
|
class Invites(Cog):
|
||||||
def __init__(self, bot):
|
def __init__(self, bot):
|
||||||
self.bot = bot
|
self.bot = bot
|
||||||
self.invites_json_path = os.path.join(self.bot.state_dir, "data/invites.json")
|
|
||||||
|
|
||||||
@commands.command()
|
@commands.command()
|
||||||
@commands.guild_only()
|
@commands.guild_only()
|
||||||
|
@ -24,18 +24,7 @@ class Invites(Cog):
|
||||||
max_age=0, max_uses=1, temporary=True, unique=True, reason=reason
|
max_age=0, max_uses=1, temporary=True, unique=True, reason=reason
|
||||||
)
|
)
|
||||||
|
|
||||||
with open(self.invites_json_path, "r") as f:
|
add_invite(self.bot, invite.id, invite.url, 1, invite.code)
|
||||||
invites = json.load(f)
|
|
||||||
|
|
||||||
invites[invite.id] = {
|
|
||||||
"uses": 0,
|
|
||||||
"url": invite.url,
|
|
||||||
"max_uses": 1,
|
|
||||||
"code": invite.code,
|
|
||||||
}
|
|
||||||
|
|
||||||
with open(self.invites_json_path, "w") as f:
|
|
||||||
f.write(json.dumps(invites))
|
|
||||||
|
|
||||||
await ctx.message.add_reaction("🆗")
|
await ctx.message.add_reaction("🆗")
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -6,6 +6,7 @@ import discord
|
||||||
from discord.ext.commands import Cog
|
from discord.ext.commands import Cog
|
||||||
|
|
||||||
from robocop_ng.helpers.checks import check_if_staff
|
from robocop_ng.helpers.checks import check_if_staff
|
||||||
|
from robocop_ng.helpers.invites import get_invites, set_invites
|
||||||
from robocop_ng.helpers.restrictions import get_user_restrictions
|
from robocop_ng.helpers.restrictions import get_user_restrictions
|
||||||
from robocop_ng.helpers.userlogs import get_userlog
|
from robocop_ng.helpers.userlogs import get_userlog
|
||||||
|
|
||||||
|
@ -17,7 +18,6 @@ class Logs(Cog):
|
||||||
|
|
||||||
def __init__(self, bot):
|
def __init__(self, bot):
|
||||||
self.bot = bot
|
self.bot = bot
|
||||||
self.invites_json_path = os.path.join(self.bot.state_dir, "data/invites.json")
|
|
||||||
self.invite_re = re.compile(
|
self.invite_re = re.compile(
|
||||||
r"((discord\.gg|discordapp\.com/" r"+invite)/+[a-zA-Z0-9-]+)", re.IGNORECASE
|
r"((discord\.gg|discordapp\.com/" r"+invite)/+[a-zA-Z0-9-]+)", re.IGNORECASE
|
||||||
)
|
)
|
||||||
|
@ -41,8 +41,7 @@ class Logs(Cog):
|
||||||
escaped_name = self.bot.escape_message(member)
|
escaped_name = self.bot.escape_message(member)
|
||||||
|
|
||||||
# Attempt to correlate the user joining with an invite
|
# Attempt to correlate the user joining with an invite
|
||||||
with open(self.invites_json_path, "r") as f:
|
invites = get_invites(self.bot)
|
||||||
invites = json.load(f)
|
|
||||||
|
|
||||||
real_invites = await member.guild.invites()
|
real_invites = await member.guild.invites()
|
||||||
|
|
||||||
|
@ -76,8 +75,7 @@ class Logs(Cog):
|
||||||
del invites[id]
|
del invites[id]
|
||||||
|
|
||||||
# Save invites data.
|
# Save invites data.
|
||||||
with open(self.invites_json_path, "w") as f:
|
set_invites(self.bot, invites)
|
||||||
f.write(json.dumps(invites))
|
|
||||||
|
|
||||||
# Prepare the invite correlation message
|
# Prepare the invite correlation message
|
||||||
if len(probable_invites_used) == 1:
|
if len(probable_invites_used) == 1:
|
||||||
|
|
|
@ -22,7 +22,8 @@ class Macro(Cog):
|
||||||
@commands.cooldown(3, 30, BucketType.user)
|
@commands.cooldown(3, 30, BucketType.user)
|
||||||
@commands.command(aliases=["m"])
|
@commands.command(aliases=["m"])
|
||||||
async def macro(self, ctx: Context, key: str, targets: Greedy[discord.User] = None):
|
async def macro(self, ctx: Context, key: str, targets: Greedy[discord.User] = None):
|
||||||
await ctx.message.delete()
|
if ctx.guild:
|
||||||
|
await ctx.message.delete()
|
||||||
if len(key) > 0:
|
if len(key) > 0:
|
||||||
text = get_macro(self.bot, key)
|
text = get_macro(self.bot, key)
|
||||||
if text is not None:
|
if text is not None:
|
||||||
|
|
|
@ -612,15 +612,30 @@ class Mod(Cog):
|
||||||
@commands.command(aliases=["clear"])
|
@commands.command(aliases=["clear"])
|
||||||
async def purge(self, ctx, limit: int, channel: discord.TextChannel = None):
|
async def purge(self, ctx, limit: int, channel: discord.TextChannel = None):
|
||||||
"""Clears a given number of messages, staff only."""
|
"""Clears a given number of messages, staff only."""
|
||||||
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
|
modlog_channel = self.bot.get_channel(self.bot.config.modlog_channel)
|
||||||
|
log_channel = self.bot.get_channel(self.bot.config.log_channel)
|
||||||
if not channel:
|
if not channel:
|
||||||
channel = ctx.channel
|
channel = ctx.channel
|
||||||
await channel.purge(limit=limit)
|
|
||||||
|
purged_log_jump_url = ""
|
||||||
|
for deleted_message in await channel.purge(limit=limit):
|
||||||
|
msg = (
|
||||||
|
"🗑️ **Message purged**: \n"
|
||||||
|
f"from {self.bot.escape_message(deleted_message.author.name)} "
|
||||||
|
f"({deleted_message.author.id}), in {deleted_message.channel.mention}:\n"
|
||||||
|
f"`{deleted_message.clean_content}`"
|
||||||
|
)
|
||||||
|
if len(purged_log_jump_url) == 0:
|
||||||
|
purged_log_jump_url = (await log_channel.send(msg)).jump_url
|
||||||
|
else:
|
||||||
|
await log_channel.send(msg)
|
||||||
|
|
||||||
msg = (
|
msg = (
|
||||||
f"🗑 **Purged**: {str(ctx.author)} purged {limit} "
|
f"🗑 **Purged**: {str(ctx.author)} purged {limit} "
|
||||||
f"messages in {channel.mention}."
|
f"messages in {channel.mention}."
|
||||||
|
f"\n🔗 __Jump__: <{purged_log_jump_url}>"
|
||||||
)
|
)
|
||||||
await log_channel.send(msg)
|
await modlog_channel.send(msg)
|
||||||
|
|
||||||
@commands.guild_only()
|
@commands.guild_only()
|
||||||
@commands.check(check_if_staff)
|
@commands.check(check_if_staff)
|
||||||
|
|
21
robocop_ng/helpers/data_loader.py
Normal file
21
robocop_ng/helpers/data_loader.py
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
|
||||||
|
from robocop_ng.helpers.notifications import report_critical_error
|
||||||
|
|
||||||
|
|
||||||
|
def read_json(bot, filepath: str) -> dict:
|
||||||
|
if os.path.isfile(filepath) and os.path.getsize(filepath) > 0:
|
||||||
|
with open(filepath, "r") as f:
|
||||||
|
try:
|
||||||
|
return json.load(f)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
content = f.read()
|
||||||
|
report_critical_error(
|
||||||
|
bot,
|
||||||
|
e,
|
||||||
|
additional_info={
|
||||||
|
"file": {"length": len(content), "content": content}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return {}
|
|
@ -2,7 +2,7 @@ import json
|
||||||
import os
|
import os
|
||||||
from typing import Union
|
from typing import Union
|
||||||
|
|
||||||
from robocop_ng.helpers.notifications import report_critical_error
|
from robocop_ng.helpers.data_loader import read_json
|
||||||
|
|
||||||
|
|
||||||
def get_disabled_ids_path(bot) -> str:
|
def get_disabled_ids_path(bot) -> str:
|
||||||
|
@ -22,21 +22,8 @@ def is_ro_section_valid(ro_section: dict[str, str]) -> bool:
|
||||||
|
|
||||||
|
|
||||||
def get_disabled_ids(bot) -> dict[str, dict[str, Union[str, dict[str, str]]]]:
|
def get_disabled_ids(bot) -> dict[str, dict[str, Union[str, dict[str, str]]]]:
|
||||||
if os.path.isfile(get_disabled_ids_path(bot)):
|
disabled_ids = read_json(bot, get_disabled_ids_path(bot))
|
||||||
with open(get_disabled_ids_path(bot), "r") as f:
|
if len(disabled_ids) > 0:
|
||||||
try:
|
|
||||||
disabled_ids = json.load(f)
|
|
||||||
except json.JSONDecodeError as e:
|
|
||||||
content = f.read()
|
|
||||||
report_critical_error(
|
|
||||||
bot,
|
|
||||||
e,
|
|
||||||
additional_info={
|
|
||||||
"file": {"length": len(content), "content": content}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
return {}
|
|
||||||
|
|
||||||
# Migration code
|
# Migration code
|
||||||
if "app_id" in disabled_ids.keys():
|
if "app_id" in disabled_ids.keys():
|
||||||
old_disabled_ids = disabled_ids.copy()
|
old_disabled_ids = disabled_ids.copy()
|
||||||
|
@ -54,9 +41,7 @@ def get_disabled_ids(bot) -> dict[str, dict[str, Union[str, dict[str, str]]]]:
|
||||||
disabled_ids[key.lower()]["ro_section"] = value
|
disabled_ids[key.lower()]["ro_section"] = value
|
||||||
set_disabled_ids(bot, disabled_ids)
|
set_disabled_ids(bot, disabled_ids)
|
||||||
|
|
||||||
return disabled_ids
|
return disabled_ids
|
||||||
|
|
||||||
return {}
|
|
||||||
|
|
||||||
|
|
||||||
def set_disabled_ids(bot, contents: dict[str, dict[str, Union[str, dict[str, str]]]]):
|
def set_disabled_ids(bot, contents: dict[str, dict[str, Union[str, dict[str, str]]]]):
|
||||||
|
|
29
robocop_ng/helpers/invites.py
Normal file
29
robocop_ng/helpers/invites.py
Normal file
|
@ -0,0 +1,29 @@
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from typing import Union
|
||||||
|
|
||||||
|
from robocop_ng.helpers.data_loader import read_json
|
||||||
|
|
||||||
|
|
||||||
|
def get_invites_path(bot):
|
||||||
|
return os.path.join(bot.state_dir, "data/invites.json")
|
||||||
|
|
||||||
|
|
||||||
|
def get_invites(bot) -> dict[str, dict[str, Union[str, int]]]:
|
||||||
|
return read_json(bot, get_invites_path(bot))
|
||||||
|
|
||||||
|
|
||||||
|
def add_invite(bot, invite_id: str, url: str, max_uses: int, code: str):
|
||||||
|
invites = get_invites(bot)
|
||||||
|
invites[invite_id] = {
|
||||||
|
"uses": 0,
|
||||||
|
"url": url,
|
||||||
|
"max_uses": max_uses,
|
||||||
|
code: code,
|
||||||
|
}
|
||||||
|
set_invites(bot, invites)
|
||||||
|
|
||||||
|
|
||||||
|
def set_invites(bot, contents: dict[str, dict[str, Union[str, int]]]):
|
||||||
|
with open(get_invites_path(bot), "w") as f:
|
||||||
|
json.dump(contents, f)
|
|
@ -2,7 +2,7 @@ import json
|
||||||
import os
|
import os
|
||||||
from typing import Optional, Union
|
from typing import Optional, Union
|
||||||
|
|
||||||
from robocop_ng.helpers.notifications import report_critical_error
|
from robocop_ng.helpers.data_loader import read_json
|
||||||
|
|
||||||
|
|
||||||
def get_macros_path(bot):
|
def get_macros_path(bot):
|
||||||
|
@ -10,21 +10,8 @@ def get_macros_path(bot):
|
||||||
|
|
||||||
|
|
||||||
def get_macros_dict(bot) -> dict[str, dict[str, Union[list[str], str]]]:
|
def get_macros_dict(bot) -> dict[str, dict[str, Union[list[str], str]]]:
|
||||||
if os.path.isfile(get_macros_path(bot)):
|
macros = read_json(bot, get_macros_path(bot))
|
||||||
with open(get_macros_path(bot), "r") as f:
|
if len(macros) > 0:
|
||||||
try:
|
|
||||||
macros = json.load(f)
|
|
||||||
except json.JSONDecodeError as e:
|
|
||||||
content = f.read()
|
|
||||||
report_critical_error(
|
|
||||||
bot,
|
|
||||||
e,
|
|
||||||
additional_info={
|
|
||||||
"file": {"length": len(content), "content": content}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
return {}
|
|
||||||
|
|
||||||
# Migration code
|
# Migration code
|
||||||
if "aliases" not in macros.keys():
|
if "aliases" not in macros.keys():
|
||||||
new_macros = {"macros": macros, "aliases": {}}
|
new_macros = {"macros": macros, "aliases": {}}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from robocop_ng.helpers.notifications import report_critical_error
|
from robocop_ng.helpers.data_loader import read_json
|
||||||
|
|
||||||
|
|
||||||
def get_restrictions_path(bot):
|
def get_restrictions_path(bot):
|
||||||
|
@ -9,20 +9,7 @@ def get_restrictions_path(bot):
|
||||||
|
|
||||||
|
|
||||||
def get_restrictions(bot):
|
def get_restrictions(bot):
|
||||||
if os.path.isfile(get_restrictions_path(bot)):
|
return read_json(bot, get_restrictions_path(bot))
|
||||||
with open(get_restrictions_path(bot), "r") as f:
|
|
||||||
try:
|
|
||||||
return json.load(f)
|
|
||||||
except json.JSONDecodeError as e:
|
|
||||||
content = f.read()
|
|
||||||
report_critical_error(
|
|
||||||
bot,
|
|
||||||
e,
|
|
||||||
additional_info={
|
|
||||||
"file": {"length": len(content), "content": content}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
return {}
|
|
||||||
|
|
||||||
|
|
||||||
def set_restrictions(bot, contents):
|
def set_restrictions(bot, contents):
|
||||||
|
|
|
@ -2,7 +2,7 @@ import json
|
||||||
import math
|
import math
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from robocop_ng.helpers.notifications import report_critical_error
|
from robocop_ng.helpers.data_loader import read_json
|
||||||
|
|
||||||
|
|
||||||
def get_crontab_path(bot):
|
def get_crontab_path(bot):
|
||||||
|
@ -10,20 +10,7 @@ def get_crontab_path(bot):
|
||||||
|
|
||||||
|
|
||||||
def get_crontab(bot):
|
def get_crontab(bot):
|
||||||
if os.path.isfile(get_crontab_path(bot)):
|
return read_json(bot, get_crontab_path(bot))
|
||||||
with open(get_crontab_path(bot), "r") as f:
|
|
||||||
try:
|
|
||||||
return json.load(f)
|
|
||||||
except json.JSONDecodeError as e:
|
|
||||||
content = f.read()
|
|
||||||
report_critical_error(
|
|
||||||
bot,
|
|
||||||
e,
|
|
||||||
additional_info={
|
|
||||||
"file": {"length": len(content), "content": content}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
return {}
|
|
||||||
|
|
||||||
|
|
||||||
def set_crontab(bot, contents):
|
def set_crontab(bot, contents):
|
||||||
|
|
|
@ -2,7 +2,7 @@ import json
|
||||||
import os.path
|
import os.path
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from robocop_ng.helpers.notifications import report_critical_error
|
from robocop_ng.helpers.data_loader import read_json
|
||||||
|
|
||||||
|
|
||||||
def get_persistent_roles_path(bot):
|
def get_persistent_roles_path(bot):
|
||||||
|
@ -10,20 +10,7 @@ def get_persistent_roles_path(bot):
|
||||||
|
|
||||||
|
|
||||||
def get_persistent_roles(bot) -> dict[str, list[str]]:
|
def get_persistent_roles(bot) -> dict[str, list[str]]:
|
||||||
if os.path.isfile(get_persistent_roles_path(bot)):
|
return read_json(bot, get_persistent_roles_path(bot))
|
||||||
with open(get_persistent_roles_path(bot), "r") as f:
|
|
||||||
try:
|
|
||||||
return json.load(f)
|
|
||||||
except json.JSONDecodeError as e:
|
|
||||||
content = f.read()
|
|
||||||
report_critical_error(
|
|
||||||
bot,
|
|
||||||
e,
|
|
||||||
additional_info={
|
|
||||||
"file": {"length": len(content), "content": content}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
return {}
|
|
||||||
|
|
||||||
|
|
||||||
def set_persistent_roles(bot, contents: dict[str, list[str]]):
|
def set_persistent_roles(bot, contents: dict[str, list[str]]):
|
||||||
|
@ -42,8 +29,5 @@ def add_user_roles(bot, uid: int, roles: list[int]):
|
||||||
|
|
||||||
def get_user_roles(bot, uid: int) -> list[str]:
|
def get_user_roles(bot, uid: int) -> list[str]:
|
||||||
uid = str(uid)
|
uid = str(uid)
|
||||||
with open(get_persistent_roles_path(bot), "r") as f:
|
persistent_roles = get_persistent_roles(bot)
|
||||||
roles = json.load(f)
|
return persistent_roles[uid] if uid in persistent_roles else []
|
||||||
if uid in roles:
|
|
||||||
return roles[uid]
|
|
||||||
return []
|
|
||||||
|
|
|
@ -230,9 +230,9 @@ class LogAnalyser:
|
||||||
ram_total, dest_unit
|
ram_total, dest_unit
|
||||||
)
|
)
|
||||||
|
|
||||||
self._hardware_info[
|
self._hardware_info[setting] = (
|
||||||
setting
|
f"{ram_available:.0f}/{ram_total:.0f} {dest_unit.name}"
|
||||||
] = f"{ram_available:.0f}/{ram_total:.0f} {dest_unit.name}"
|
)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
# ram_match.group(1) or ram_match.group(3) couldn't be parsed as a float.
|
# ram_match.group(1) or ram_match.group(3) couldn't be parsed as a float.
|
||||||
self._hardware_info[setting] = "Error"
|
self._hardware_info[setting] = "Error"
|
||||||
|
|
|
@ -2,7 +2,7 @@ import json
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from robocop_ng.helpers.notifications import report_critical_error
|
from robocop_ng.helpers.data_loader import read_json
|
||||||
|
|
||||||
userlog_event_types = {
|
userlog_event_types = {
|
||||||
"warns": "Warn",
|
"warns": "Warn",
|
||||||
|
@ -18,20 +18,7 @@ def get_userlog_path(bot):
|
||||||
|
|
||||||
|
|
||||||
def get_userlog(bot):
|
def get_userlog(bot):
|
||||||
if os.path.isfile(get_userlog_path(bot)):
|
return read_json(bot, get_userlog_path(bot))
|
||||||
with open(get_userlog_path(bot), "r") as f:
|
|
||||||
try:
|
|
||||||
return json.load(f)
|
|
||||||
except json.JSONDecodeError as e:
|
|
||||||
content = f.read()
|
|
||||||
report_critical_error(
|
|
||||||
bot,
|
|
||||||
e,
|
|
||||||
additional_info={
|
|
||||||
"file": {"length": len(content), "content": content}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
return {}
|
|
||||||
|
|
||||||
|
|
||||||
def set_userlog(bot, contents):
|
def set_userlog(bot, contents):
|
||||||
|
|
Loading…
Reference in a new issue