Add commands to block log analysis of specific TIDs (#42)

* Small styling changes

* Add disallowed_roles for logfilereader

* macros: Fix naming and missing bot parameter

* Add disabled_tids helper

* Add pirate role to named role examples

* logfilereader: Add commands to block specific tids

* Add black formatting

* Add command to manually analyse logs

And some minor cleanup
This commit is contained in:
TSRBerry 2023-04-24 08:21:04 +02:00 committed by GitHub
parent 2f4990d64f
commit 994438d3fa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 275 additions and 92 deletions

View file

@ -55,6 +55,7 @@ wanted_jsons = [
"data/invites.json", "data/invites.json",
"data/macros.json", "data/macros.json",
"data/persistent_roles.json", "data/persistent_roles.json",
"data/disabled_tids.json",
] ]
for wanted_json_idx in range(len(wanted_jsons)): for wanted_json_idx in range(len(wanted_jsons)):
@ -77,10 +78,10 @@ bot.state_dir = state_dir
bot.wanted_jsons = wanted_jsons bot.wanted_jsons = wanted_jsons
async def get_channel_safe(self, id): async def get_channel_safe(self, channel_id: int):
res = self.get_channel(id) res = self.get_channel(channel_id)
if res is None: if res is None:
res = await self.fetch_channel(id) res = await self.fetch_channel(channel_id)
return res return res

View file

@ -2,8 +2,18 @@ import logging
import re import re
import aiohttp import aiohttp
from discord import Colour, Embed from discord import Colour, Embed, Message, Attachment
from discord.ext.commands import Cog from discord.ext import commands
from discord.ext.commands import Cog, Context
from robocop_ng.helpers.checks import check_if_staff
from robocop_ng.helpers.disabled_tids import (
add_disabled_tid,
is_tid_valid,
remove_disabled_tid,
get_disabled_tids,
is_tid_disabled,
)
logging.basicConfig( logging.basicConfig(
format="%(asctime)s (%(levelname)s) %(message)s (Line %(lineno)d)", format="%(asctime)s (%(levelname)s) %(message)s (Line %(lineno)d)",
@ -12,12 +22,28 @@ logging.basicConfig(
class LogFileReader(Cog): class LogFileReader(Cog):
@staticmethod
def is_valid_log(attachment: Attachment) -> tuple[bool, bool]:
filename = attachment.filename
# Any message over 2000 chars is uploaded as message.txt, so this is accounted for
ryujinx_log_file_regex = re.compile(r"^Ryujinx_.*\.log|message\.txt$")
log_file = re.compile(r"^.*\.log|.*\.txt$")
is_ryujinx_log_file = re.match(ryujinx_log_file_regex, filename) is not None
is_log_file = re.match(log_file, filename) is not None
return is_log_file, is_ryujinx_log_file
def __init__(self, bot): def __init__(self, bot):
self.bot = bot self.bot = bot
self.bot_log_allowed_channels = self.bot.config.bot_log_allowed_channels self.bot_log_allowed_channels = self.bot.config.bot_log_allowed_channels
self.disallowed_named_roles = ["pirate"]
self.ryujinx_blue = Colour(0x4A90E2) self.ryujinx_blue = Colour(0x4A90E2)
self.uploaded_log_info = [] self.uploaded_log_info = []
self.disallowed_roles = [
self.bot.config.named_roles[x] for x in self.disallowed_named_roles
]
async def download_file(self, log_url): async def download_file(self, log_url):
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
# Grabs first and last few bytes of log file to prevent abuse from large files # Grabs first and last few bytes of log file to prevent abuse from large files
@ -70,6 +96,20 @@ class LogFileReader(Cog):
log_file_header_regex = re.compile(r"\d{2}:\d{2}:\d{2}\.\d{3}.*", re.DOTALL) log_file_header_regex = re.compile(r"\d{2}:\d{2}:\d{2}\.\d{3}.*", re.DOTALL)
log_file = re.search(log_file_header_regex, log_file).group(0) log_file = re.search(log_file_header_regex, log_file).group(0)
def is_tid_blocked(log_file=log_file):
game_name = re.search(
r"Loader [A-Za-z]*: Application Loaded:\s([^;\n\r]*)",
log_file,
re.MULTILINE,
)
if game_name is not None and len(game_name.groups()) > 0:
game_name = game_name.group(1).rstrip()
tid = re.match(r".* \[([a-zA-Z0-9]*)\]", game_name)
if tid is not None:
tid = tid.group(1).strip()
return is_tid_disabled(self.bot, tid)
return False
def get_hardware_info(log_file=log_file): def get_hardware_info(log_file=log_file):
for setting in self.embed["hardware_info"]: for setting in self.embed["hardware_info"]:
try: try:
@ -615,6 +655,7 @@ class LogFileReader(Cog):
old_mainline_version = re.compile(r"^\d\.\d\.(\d){4}$") old_mainline_version = re.compile(r"^\d\.\d\.(\d){4}$")
pr_version = re.compile(r"^\d\.\d\.\d\+([a-f]|\d){7}$") pr_version = re.compile(r"^\d\.\d\.\d\+([a-f]|\d){7}$")
ldn_version = re.compile(r"^\d\.\d\.\d\-ldn\d+\.\d+(?:\.\d+|$)") ldn_version = re.compile(r"^\d\.\d\.\d\-ldn\d+\.\d+(?:\.\d+|$)")
mac_version = re.compile(r"^\d\.\d\.\d\-macos\d+\.\d+(?:\.\d+|$)")
is_channel_allowed = False is_channel_allowed = False
@ -645,6 +686,7 @@ class LogFileReader(Cog):
or re.match( or re.match(
old_mainline_version, self.embed["emu_info"]["ryu_version"] old_mainline_version, self.embed["emu_info"]["ryu_version"]
) )
or re.match(mac_version, self.embed["emu_info"]["ryu_version"])
or re.match(ldn_version, self.embed["emu_info"]["ryu_version"]) or re.match(ldn_version, self.embed["emu_info"]["ryu_version"])
or re.match(pr_version, self.embed["emu_info"]["ryu_version"]) or re.match(pr_version, self.embed["emu_info"]["ryu_version"])
or re.match("Unknown", self.embed["emu_info"]["ryu_version"]) or re.match("Unknown", self.embed["emu_info"]["ryu_version"])
@ -672,95 +714,189 @@ class LogFileReader(Cog):
except AttributeError: except AttributeError:
pass pass
if is_tid_blocked():
warn_message = await message.reply(
f".warn This log contains a blocked title id."
)
await self.bot.invoke(await self.bot.get_context(warn_message))
pirate_role = message.guild.get_role(self.bot.config.named_roles["pirate"])
message.author.add_roles(pirate_role)
embed = Embed(
title="⛔ Blocked game detected ⛔",
colour=Colour(0xFF0000),
description="This log contains a blocked title id and has been removed.\n"
"The user has been warned and the pirate role was applied.",
)
embed.set_footer(text=f"Log uploaded by {author_name}")
await message.delete()
return embed
get_hardware_info() get_hardware_info()
get_ryujinx_info() get_ryujinx_info()
game_notes = analyse_log() game_notes = analyse_log()
return format_log_embed() return format_log_embed()
@commands.check(check_if_staff)
@commands.command(
aliases=["disallow_log_tid", "forbid_log_tid", "block_tid", "blocktid"]
)
async def disable_log_tid(self, ctx: Context, tid: str, note=""):
if not is_tid_valid(tid):
return await ctx.send("The specified TID is invalid.")
if add_disabled_tid(self.bot, tid, note):
return await ctx.send(f"TID '{tid}' is now blocked!")
else:
return await ctx.send(f"TID '{tid}' is already blocked.")
@commands.check(check_if_staff)
@commands.command(
aliases=[
"allow_log_tid",
"unblock_log_tid",
"unblock_tid",
"allow_tid",
"unblocktid",
]
)
async def enable_log_tid(self, ctx: Context, tid: str):
if not is_tid_valid(tid):
return await ctx.send("The specified TID is invalid.")
if remove_disabled_tid(self.bot, tid):
return await ctx.send(f"TID '{tid}' is now unblocked!")
else:
return await ctx.send(f"TID '{tid}' is not blocked.")
@commands.check(check_if_staff)
@commands.command(
aliases=[
"blocked_tids",
"listblockedtids",
"list_blocked_log_tids",
"list_blocked_tids",
]
)
async def list_disabled_tids(self, ctx: Context):
disabled_tids = get_disabled_tids(self.bot)
message = "**Blocking analysis of the following TIDs:**\n"
for tid, note in disabled_tids.items():
message += f"- [{tid.upper()}]: {note}\n" if note != "" else f"- [{tid}]\n"
return await ctx.send(message)
async def analyse_log_message(self, message: Message, attachment_index=0):
author_id = message.author.id
author_mention = message.author.mention
filename = message.attachments[attachment_index].filename
filesize = message.attachments[attachment_index].size
# Any message over 2000 chars is uploaded as message.txt, so this is accounted for
log_file_link = message.jump_url
for role in message.author.roles:
if role.id in self.disallowed_roles:
return await message.channel.send(
"I'm not allowed to analyse this log."
)
uploaded_logs_exist = [
True for elem in self.uploaded_log_info if filename in elem.values()
]
if not any(uploaded_logs_exist):
reply_message = await message.channel.send("Log detected, parsing...")
try:
embed = await self.log_file_read(message)
if "Ryujinx_" in filename:
self.uploaded_log_info.append(
{
"filename": filename,
"file_size": filesize,
"link": log_file_link,
"author": author_id,
}
)
# Avoid duplicate log file analysis, at least temporarily; keep track of the last few filenames of uploaded logs
# this should help support channels not be flooded with too many log files
# fmt: off
self.uploaded_log_info = self.uploaded_log_info[-5:]
# fmt: on
return await reply_message.edit(content=None, embed=embed)
except UnicodeDecodeError:
return await message.channel.send(
content=author_mention,
embed=Embed(
description=f"This log file appears to be invalid. Please re-check and re-upload your log file.",
colour=self.ryujinx_blue,
),
)
except Exception as error:
await reply_message.edit(
content=f"Error: Couldn't parse log; parser threw `{type(error).__name__}` exception."
)
print(logging.warning(error))
else:
duplicate_log_file = next(
(
elem
for elem in self.uploaded_log_info
if elem["filename"] == filename
and elem["file_size"] == filesize
and elem["author"] == author_id
),
None,
)
await message.channel.send(
content=author_mention,
embed=Embed(
description=f"The log file `{filename}` appears to be a duplicate [already uploaded here]({duplicate_log_file['link']}). Please upload a more recent file.",
colour=self.ryujinx_blue,
),
)
@commands.check(check_if_staff)
@commands.command(
aliases=["analyselog", "analyse_log", "analyze", "analyzelog", "analyze_log"]
)
async def analyse(self, ctx: Context, attachment_number=1):
await ctx.message.delete()
if ctx.message.reference is not None:
message = await ctx.fetch_message(ctx.message.reference.message_id)
if len(message.attachments) >= attachment_number:
attachment = message.attachments[attachment_number - 1]
is_log_file, _ = self.is_valid_log(attachment)
if is_log_file:
return await self.analyse_log_message(
message, attachment_number - 1
)
else:
return await ctx.send(
f"The attached log file '{attachment.filename}' is not valid.",
reference=ctx.message.reference,
)
return await ctx.send(
"Please use `.analyse` as a reply to a message with an attached log file."
)
@Cog.listener() @Cog.listener()
async def on_message(self, message): async def on_message(self, message: Message):
await self.bot.wait_until_ready() await self.bot.wait_until_ready()
if message.author.bot: if message.author.bot:
return return
try: for attachment in message.attachments:
author_id = message.author.id is_log_file, is_ryujinx_log_file = self.is_valid_log(attachment)
author_mention = message.author.mention
filename = message.attachments[0].filename
filesize = message.attachments[0].size
# Any message over 2000 chars is uploaded as message.txt, so this is accounted for
ryujinx_log_file_regex = re.compile(r"^Ryujinx_.*\.log|message\.txt$")
log_file = re.compile(r"^.*\.log|.*\.txt$")
log_file_link = message.jump_url
is_ryujinx_log_file = re.match(ryujinx_log_file_regex, filename)
is_log_file = re.match(log_file, filename)
if ( if message.channel.id in self.bot_log_allowed_channels.values():
message.channel.id in self.bot_log_allowed_channels.values() return await self.analyse_log_message(
and is_ryujinx_log_file message, message.attachments.index(attachment)
): )
uploaded_logs_exist = [ elif is_log_file and not is_ryujinx_log_file:
True for elem in self.uploaded_log_info if filename in elem.values()
]
if not any(uploaded_logs_exist):
reply_message = await message.channel.send(
"Log detected, parsing..."
)
try:
embed = await self.log_file_read(message)
if "Ryujinx_" in filename:
self.uploaded_log_info.append(
{
"filename": filename,
"file_size": filesize,
"link": log_file_link,
"author": author_id,
}
)
# Avoid duplicate log file analysis, at least temporarily; keep track of the last few filenames of uploaded logs
# this should help support channels not be flooded with too many log files
# fmt: off
self.uploaded_log_info = self.uploaded_log_info[-5:]
# fmt: on
return await reply_message.edit(content=None, embed=embed)
except UnicodeDecodeError:
return await message.channel.send(
content=author_mention,
embed=Embed(
description=f"This log file appears to be invalid. Please re-check and re-upload your log file.",
colour=self.ryujinx_blue,
),
)
except Exception as error:
await reply_message.edit(
content=f"Error: Couldn't parse log; parser threw `{type(error).__name__}` exception."
)
print(logging.warning(error))
else:
duplicate_log_file = next(
(
elem
for elem in self.uploaded_log_info
if elem["filename"] == filename
and elem["file_size"] == filesize
and elem["author"] == author_id
),
None,
)
await message.channel.send(
content=author_mention,
embed=Embed(
description=f"The log file `{filename}` appears to be a duplicate [already uploaded here]({duplicate_log_file['link']}). Please upload a more recent file.",
colour=self.ryujinx_blue,
),
)
elif (
is_log_file
and not is_ryujinx_log_file
and message.channel.id in self.bot_log_allowed_channels.values()
):
return await message.channel.send( return await message.channel.send(
content=author_mention, content=message.author.mention,
embed=Embed( embed=Embed(
description=f"Your file does not match the Ryujinx log format. Please check your file.", description=f"Your file does not match the Ryujinx log format. Please check your file.",
colour=self.ryujinx_blue, colour=self.ryujinx_blue,
@ -771,7 +907,7 @@ class LogFileReader(Cog):
and not message.channel.id in self.bot_log_allowed_channels.values() and not message.channel.id in self.bot_log_allowed_channels.values()
): ):
return await message.author.send( return await message.author.send(
content=author_mention, content=message.author.mention,
embed=Embed( embed=Embed(
description="\n".join( description="\n".join(
( (
@ -782,14 +918,11 @@ class LogFileReader(Cog):
f'<#{self.bot.config.bot_log_allowed_channels["patreon-support"]}>: Help and troubleshooting for Patreon subscribers', f'<#{self.bot.config.bot_log_allowed_channels["patreon-support"]}>: Help and troubleshooting for Patreon subscribers',
f'<#{self.bot.config.bot_log_allowed_channels["development"]}>: Ryujinx development discussion', f'<#{self.bot.config.bot_log_allowed_channels["development"]}>: Ryujinx development discussion',
f'<#{self.bot.config.bot_log_allowed_channels["pr-testing"]}>: Discussion of in-progress pull request builds', f'<#{self.bot.config.bot_log_allowed_channels["pr-testing"]}>: Discussion of in-progress pull request builds',
f'<#{self.bot.config.bot_log_allowed_channels["linux-master-race"]}>: Linux support and discussion',
) )
), ),
colour=self.ryujinx_blue, colour=self.ryujinx_blue,
), ),
) )
except IndexError:
pass
async def setup(bot): async def setup(bot):

View file

@ -1,5 +1,5 @@
import hashlib
import datetime import datetime
import hashlib
# Basic bot config, insert your token here, update description if you want # Basic bot config, insert your token here, update description if you want
prefixes = [".", "!"] prefixes = [".", "!"]
@ -69,6 +69,7 @@ named_roles = {
"community": 420010997877833731, "community": 420010997877833731,
"hacker": 364508795038072833, "hacker": 364508795038072833,
"participant": 434353085926866946, "participant": 434353085926866946,
"pirate": 0,
} }
# The bot manager and staff roles # The bot manager and staff roles

View file

@ -0,0 +1,48 @@
import json
import os
def get_disabled_tids_path(bot) -> str:
return os.path.join(bot.state_dir, "data/disabled_tids.json")
def is_tid_valid(tid: str) -> bool:
return len(tid) == 16 and tid.isalnum()
def get_disabled_tids(bot) -> dict[str, str]:
if os.path.isfile(get_disabled_tids_path(bot)):
with open(get_disabled_tids_path(bot), "r") as f:
return json.load(f)
return {}
def set_disabled_tids(bot, contents: dict[str, str]):
with open(get_disabled_tids_path(bot), "w") as f:
json.dump(contents, f)
def is_tid_disabled(bot, tid: str) -> bool:
disabled_tids = get_disabled_tids(bot)
tid = tid.lower()
return tid in disabled_tids.keys()
def add_disabled_tid(bot, tid: str, note="") -> bool:
disabled_tids = get_disabled_tids(bot)
tid = tid.lower()
if tid not in disabled_tids.keys():
disabled_tids[tid] = note
set_disabled_tids(bot, disabled_tids)
return True
return False
def remove_disabled_tid(bot, tid: str) -> bool:
disabled_tids = get_disabled_tids(bot)
tid = tid.lower()
if tid in disabled_tids.keys():
del disabled_tids[tid]
set_disabled_tids(bot, disabled_tids)
return True
return False

View file

@ -3,13 +3,13 @@ import os
from typing import Optional, Union from typing import Optional, Union
def get_crontab_path(bot): def get_macros_path(bot):
return os.path.join(bot.state_dir, "data/macros.json") return os.path.join(bot.state_dir, "data/macros.json")
def get_macros_dict(bot) -> dict[str, dict[str, Union[list[str], str]]]: def get_macros_dict(bot) -> dict[str, dict[str, Union[list[str], str]]]:
if os.path.isfile(get_crontab_path(bot)): if os.path.isfile(get_macros_path(bot)):
with open(get_crontab_path(bot), "r") as f: with open(get_macros_path(bot), "r") as f:
macros = json.load(f) macros = json.load(f)
# Migration code # Migration code
@ -31,7 +31,7 @@ def get_macros_dict(bot) -> dict[str, dict[str, Union[list[str], str]]]:
del new_macros["macros"][key] del new_macros["macros"][key]
duplicate_num += 1 duplicate_num += 1
set_macros(new_macros) set_macros(bot, new_macros)
return new_macros return new_macros
return macros return macros
@ -52,7 +52,7 @@ def is_macro_key_available(
def set_macros(bot, contents: dict[str, dict[str, Union[list[str], str]]]): def set_macros(bot, contents: dict[str, dict[str, Union[list[str], str]]]):
with open(get_crontab_path(bot), "w") as f: with open(get_macros_path(bot), "w") as f:
json.dump(contents, f) json.dump(contents, f)