ryuko-ng/robocop_ng/cogs/logfilereader.py
TSRBerry 147011eba1
Automatically block analysis of logs containing blocked contents in paths (#108)
* Extract paths from logs and check for blocked content

* Extract paths in command line analyzer

* Split disabled paths message if necessary

* Log the blocked path that caused a warning

* Remove duplicate command alias

* Remove bad characters from extracted filepaths

* Fix is_path_disabled() only checking the full path

* Apply formatting

* Improve wording for the warning embeds

* Apply formatting
2024-09-01 15:53:44 +02:00

711 lines
29 KiB
Python

import logging
import re
from typing import Optional
import aiohttp
from discord import Colour, Embed, Message, Attachment
from discord.ext import commands
from discord.ext.commands import Cog, Context, BucketType
from robocop_ng.helpers.checks import check_if_staff
from robocop_ng.helpers.disabled_ids import (
add_disabled_app_id,
is_app_id_valid,
remove_disabled_app_id,
get_disabled_ids,
is_app_id_disabled,
is_build_id_valid,
add_disabled_build_id,
remove_disabled_build_id,
is_build_id_disabled,
is_ro_section_disabled,
is_ro_section_valid,
add_disabled_ro_section,
remove_disabled_ro_section,
remove_disable_id,
)
from robocop_ng.helpers.disabled_paths import (
is_path_disabled,
get_disabled_paths,
add_disabled_path,
remove_disabled_path,
)
from robocop_ng.helpers.ryujinx_log_analyser import LogAnalyser
logging.basicConfig(
format="%(asctime)s (%(levelname)s) %(message)s (Line %(lineno)d)",
level=logging.INFO,
)
class LogFileReader(Cog):
@staticmethod
def is_valid_log_name(attachment: Attachment) -> tuple[bool, bool]:
filename = attachment.filename
ryujinx_log_file_regex = re.compile(r"^Ryujinx_.*\.log$")
log_file = re.compile(r"^.*\.log|.*\.txt$")
is_ryujinx_log_file = re.match(ryujinx_log_file_regex, filename) is not None
is_log_file = re.match(log_file, filename) is not None
return is_log_file, is_ryujinx_log_file
def __init__(self, bot):
self.bot = bot
self.bot_log_allowed_channels = self.bot.config.bot_log_allowed_channels
self.disallowed_named_roles = ["pirate"]
self.ryujinx_blue = Colour(0x4A90E2)
self.uploaded_log_info = []
self.disallowed_roles = [
self.bot.config.named_roles[x] for x in self.disallowed_named_roles
]
@staticmethod
async def download_file(log_url):
async with aiohttp.ClientSession() as session:
# Grabs first and last few bytes of log file to prevent abuse from large files
headers = {"Range": "bytes=0-60000, -6000"}
async with session.get(log_url, headers=headers) as response:
return await response.text("UTF-8")
@staticmethod
def is_log_valid(log_file: str) -> bool:
app_info = LogAnalyser.get_app_info(log_file)
is_homebrew = LogAnalyser.is_homebrew(log_file)
if app_info is None or is_homebrew:
return True
game_name, app_id, another_app_id, build_ids, main_ro_section = app_info
if (
game_name is None
or app_id is None
or another_app_id is None
or build_ids is None
or main_ro_section is None
):
return False
return app_id == another_app_id
def is_game_blocked(self, log_file: str) -> bool:
app_info = LogAnalyser.get_app_info(log_file)
if app_info is None:
return False
game_name, app_id, another_app_id, build_ids, main_ro_section = app_info
if is_app_id_disabled(self.bot, app_id) or is_app_id_disabled(
self.bot, another_app_id
):
return True
for bid in build_ids:
if is_build_id_disabled(self.bot, bid):
return True
return is_ro_section_disabled(self.bot, main_ro_section)
def contains_blocked_paths(self, log_file: str) -> Optional[str]:
filepaths = LogAnalyser.get_filepaths(log_file)
if filepaths is None:
return None
for filepath in filepaths:
if is_path_disabled(self.bot, filepath):
return filepath
return None
async def blocked_game_action(self, message: Message) -> Embed:
warn_command = self.bot.get_command("warn")
if warn_command is not None:
warn_message = await message.reply(
".warn This log contains a blocked game."
)
warn_context = await self.bot.get_context(warn_message)
await warn_context.invoke(
warn_command,
target=None,
reason="This log contains a blocked game.",
)
else:
logging.error(
f"Couldn't find 'warn' command. Unable to warn {message.author} for uploading a log of a blocked game."
)
pirate_role = message.guild.get_role(self.bot.config.named_roles["pirate"])
await message.author.add_roles(pirate_role)
embed = Embed(
title="⛔ Blocked game detected ⛔",
colour=Colour(0xFF0000),
description="This log contains a blocked game and has been removed.\n"
"The user has been warned and the pirate role has been applied.",
)
embed.set_footer(text=f"Log uploaded by @{message.author.name}")
await message.delete()
return embed
async def blocked_path_action(self, message: Message, blocked_path: str) -> Embed:
warn_command = self.bot.get_command("warn")
if warn_command is not None:
warn_message = await message.reply(
".warn This log contains blocked content in paths."
)
warn_context = await self.bot.get_context(warn_message)
await warn_context.invoke(
warn_command,
target=None,
reason=f"This log contains blocked content in paths: '{blocked_path}'",
)
else:
logging.error(
f"Couldn't find 'warn' command. Unable to warn {message.author} for uploading a log "
f"containing a blocked content in paths."
)
pirate_role = message.guild.get_role(self.bot.config.named_roles["pirate"])
await message.author.add_roles(pirate_role)
embed = Embed(
title="⛔ Blocked content in path detected ⛔",
colour=Colour(0xFF0000),
description="This log contains paths containing blocked content and has been removed.\n"
"The user has been warned and the pirate role has been applied.",
)
embed.set_footer(text=f"Log uploaded by @{message.author.name}")
await message.delete()
return embed
def format_analysed_log(self, author_name: str, analysed_log):
cleaned_game_name = re.sub(
r"\s\[(64|32)-bit\]$", "", analysed_log["game_info"]["game_name"]
)
analysed_log["game_info"]["game_name"] = cleaned_game_name
hardware_info = " | ".join(
(
f"**CPU:** {analysed_log['hardware_info']['cpu']}",
f"**GPU:** {analysed_log['hardware_info']['gpu']}",
f"**RAM:** {analysed_log['hardware_info']['ram']}",
f"**OS:** {analysed_log['hardware_info']['os']}",
)
)
system_settings_info = "\n".join(
(
f"**Audio Backend:** `{analysed_log['settings']['audio_backend']}`",
f"**Console Mode:** `{analysed_log['settings']['docked']}`",
f"**PPTC Cache:** `{analysed_log['settings']['pptc']}`",
f"**Shader Cache:** `{analysed_log['settings']['shader_cache']}`",
f"**V-Sync:** `{analysed_log['settings']['vsync']}`",
f"**Hypervisor:** `{analysed_log['settings']['hypervisor']}`",
)
)
graphics_settings_info = "\n".join(
(
f"**Graphics Backend:** `{analysed_log['settings']['graphics_backend']}`",
f"**Resolution:** `{analysed_log['settings']['resolution_scale']}`",
f"**Anisotropic Filtering:** `{analysed_log['settings']['anisotropic_filtering']}`",
f"**Aspect Ratio:** `{analysed_log['settings']['aspect_ratio']}`",
f"**Texture Recompression:** `{analysed_log['settings']['texture_recompression']}`",
)
)
ryujinx_info = " | ".join(
(
f"**Version:** {analysed_log['emu_info']['ryu_version']}",
f"**Firmware:** {analysed_log['emu_info']['ryu_firmware']}",
)
)
log_embed = Embed(title=f"{cleaned_game_name}", colour=self.ryujinx_blue)
log_embed.set_footer(text=f"Log uploaded by {author_name}")
log_embed.add_field(
name="General Info",
value=" | ".join((ryujinx_info, hardware_info)),
inline=False,
)
log_embed.add_field(
name="System Settings",
value=system_settings_info,
inline=True,
)
log_embed.add_field(
name="Graphics Settings",
value=graphics_settings_info,
inline=True,
)
if (
cleaned_game_name == "Unknown"
and analysed_log["game_info"]["errors"] == "No errors found in log"
):
log_embed.add_field(
name="Empty Log",
value=f"""The log file appears to be empty. To get a proper log, follow these steps:
1) In Logging settings, ensure `Enable Logging to File` is checked.
2) Ensure the following default logs are enabled: `Info`, `Warning`, `Error`, `Guest` and `Stub`.
3) Start a game up.
4) Play until your issue occurs.
5) Upload the latest log file which is larger than 3KB.""",
inline=False,
)
if (
cleaned_game_name == "Unknown"
and analysed_log["game_info"]["errors"] != "No errors found in log"
):
log_embed.add_field(
name="Latest Error Snippet",
value=analysed_log["game_info"]["errors"],
inline=False,
)
log_embed.add_field(
name="No Game Boot Detected",
value=f"""No game boot has been detected in log file. To get a proper log, follow these steps:
1) In Logging settings, ensure `Enable Logging to File` is checked.
2) Ensure the following default logs are enabled: `Info`, `Warning`, `Error`, `Guest` and `Stub`.
3) Start a game up.
4) Play until your issue occurs.
5) Upload the latest log file which is larger than 3KB.""",
inline=False,
)
else:
log_embed.add_field(
name="Latest Error Snippet",
value=analysed_log["game_info"]["errors"],
inline=False,
)
log_embed.add_field(
name="Mods", value=analysed_log["game_info"]["mods"], inline=False
)
log_embed.add_field(
name="Cheats", value=analysed_log["game_info"]["cheats"], inline=False
)
log_embed.add_field(
name="Notes",
value=analysed_log["game_info"]["notes"],
inline=False,
)
return log_embed
async def log_file_read(self, message):
attached_log = message.attachments[0]
author_name = f"@{message.author.name}"
log_file = await self.download_file(attached_log.url)
if self.is_game_blocked(log_file):
return await self.blocked_game_action(message)
blocked_path = self.contains_blocked_paths(log_file)
if blocked_path:
return await self.blocked_path_action(message, blocked_path)
for role in message.author.roles:
if role.id in self.disallowed_roles:
embed = Embed(
colour=Colour(0xFF0000),
description="I'm not allowed to analyse this log.",
)
embed.set_footer(text=f"Log uploaded by {author_name}")
return embed
if not self.is_log_valid(log_file):
embed = Embed(
title="⚠️ Modified log detected ⚠️",
colour=Colour(0xFCFC00),
description="This log contains manually modified information and won't be analysed.",
)
embed.set_footer(text=f"Log uploaded by {author_name}")
return embed
try:
analyser = LogAnalyser(log_file)
except ValueError:
return Embed(
colour=self.ryujinx_blue,
description="This log file appears to be invalid. Please make sure to upload a Ryujinx log file.",
)
is_channel_allowed = False
for allowed_channel_id in self.bot.config.bot_log_allowed_channels.values():
if message.channel.id == allowed_channel_id:
is_channel_allowed = True
break
return self.format_analysed_log(
author_name,
analyser.analyse_discord(
is_channel_allowed,
self.bot.config.bot_log_allowed_channels["pr-testing"],
),
)
@commands.check(check_if_staff)
@commands.command(
aliases=["disallow_log_id", "forbid_log_id", "block_id", "blockid"]
)
async def disable_log_id(
self, ctx: Context, disable_id: str, block_id_type: str, *, block_id: str
):
match block_id_type.lower():
case "app" | "app_id" | "appid" | "tid" | "title_id":
if not is_app_id_valid(block_id):
return await ctx.send("The specified app id is invalid.")
if add_disabled_app_id(self.bot, disable_id, block_id):
return await ctx.send(
f"Application id '{block_id}' is now blocked!"
)
else:
return await ctx.send(
f"Application id '{block_id}' is already blocked."
)
case "build" | "build_id" | "bid":
if not is_build_id_valid(block_id):
return await ctx.send("The specified build id is invalid.")
if add_disabled_build_id(self.bot, disable_id, block_id):
return await ctx.send(f"Build id '{block_id}' is now blocked!")
else:
return await ctx.send(f"Build id '{block_id}' is already blocked.")
case "ro_section" | "rosection":
ro_section_snippet = block_id.strip("`").splitlines()
ro_section_snippet = [
line for line in ro_section_snippet if len(line.strip()) > 0
]
ro_section_info_regex = re.search(
r"PrintRoSectionInfo: main:", ro_section_snippet[0]
)
if ro_section_info_regex is None:
ro_section_snippet.insert(0, "PrintRoSectionInfo: main:")
ro_section = LogAnalyser.get_main_ro_section(
"\n".join(ro_section_snippet)
)
if ro_section is not None and is_ro_section_valid(ro_section):
if add_disabled_ro_section(self.bot, disable_id, ro_section):
return await ctx.send(
f"The specified read-only section for '{disable_id}' is now blocked."
)
else:
return await ctx.send(
f"The specified read-only section for '{disable_id}' is already blocked."
)
case _:
return await ctx.send(
"The specified id type is invalid. Valid id types are: ['app_id', 'build_id', 'ro_section']"
)
@commands.check(check_if_staff)
@commands.command(
aliases=[
"allow_log_id",
"unblock_log_id",
"unblock_id",
"allow_id",
"unblockid",
]
)
async def enable_log_id(self, ctx: Context, disable_id: str, block_id_type="all"):
match block_id_type.lower():
case "all":
if remove_disable_id(self.bot, disable_id):
return await ctx.send(
f"All ids for '{disable_id}' are now unblocked!"
)
else:
return await ctx.send(f"No blocked ids for '{disable_id}' found.")
case "app" | "app_id" | "appid" | "tid" | "title_id":
if remove_disabled_app_id(self.bot, disable_id):
return await ctx.send(
f"Application id for '{disable_id}' is now unblocked!"
)
else:
return await ctx.send(
f"No blocked application id for '{disable_id}' found."
)
case "build" | "build_id" | "bid":
if remove_disabled_build_id(self.bot, disable_id):
return await ctx.send(
f"Build id for '{disable_id}' is now unblocked!"
)
else:
return await ctx.send(f"No blocked build id '{disable_id}' found.")
case "ro_section" | "rosection":
if remove_disabled_ro_section(self.bot, disable_id):
return await ctx.send(
f"Read-only section for '{disable_id}' is now unblocked!"
)
else:
return await ctx.send(
f"No blocked read-only section for '{disable_id}' found."
)
case _:
return await ctx.send(
"The specified id type is invalid. Valid id types are: ['all', 'app_id', 'build_id', 'ro_section']"
)
@commands.check(check_if_staff)
@commands.command(
aliases=[
"disabled_ids",
"blocked_ids",
"listblockedids",
"list_blocked_log_ids",
"list_blocked_ids",
]
)
async def list_disabled_ids(self, ctx: Context):
disabled_ids = get_disabled_ids(self.bot)
id_types = {"app_id": "AppID", "build_id": "BID", "ro_section": "RoSection"}
message = "**Blocking analysis of the following IDs:**\n"
for name, entry in disabled_ids.items():
message += f"- {name}:\n"
for id_type, title in id_types.items():
if len(entry[id_type]) > 0:
if id_type != "ro_section":
message += f" - __{title}__: {entry[id_type]}\n"
else:
message += f" - __{title}__\n"
message += "\n"
return await ctx.send(message)
@commands.check(check_if_staff)
@commands.command(
aliases=[
"get_blocked_ro_section",
"disabled_ro_section",
"blocked_ro_section",
"list_disabled_ro_section",
"list_blocked_ro_section",
]
)
async def get_disabled_ro_section(self, ctx: Context, disable_id: str):
disabled_ids = get_disabled_ids(self.bot)
disable_id = disable_id.lower()
if (
disable_id in disabled_ids.keys()
and len(disabled_ids[disable_id]["ro_section"]) > 0
):
message = f"**Blocked read-only section for '{disable_id}'**:\n"
message += "```\n"
for key, content in disabled_ids[disable_id]["ro_section"].items():
match key:
case "module":
message += f"Module: {content}\n"
case "sdk_libraries":
message += f"SDK Libraries: \n"
for entry in content:
message += f" SDK {entry}\n"
message += "\n"
message += "```"
return await ctx.send(message)
else:
return await ctx.send(f"No read-only section blocked for '{disable_id}'.")
@commands.check(check_if_staff)
@commands.command(
aliases=["disallow_path", "forbid_path", "block_path", "blockpath"]
)
async def disable_path(self, ctx: Context, block_path: str):
if add_disabled_path(self.bot, block_path):
return await ctx.send(f"Path content `{block_path}` is now blocked!")
else:
return await ctx.send(f"Path content `{block_path}` is already blocked.")
@commands.check(check_if_staff)
@commands.command(
aliases=[
"allow_path",
"unblock_path",
"unblockpath",
]
)
async def enable_path(self, ctx: Context, block_path: str):
if remove_disabled_path(self.bot, block_path):
return await ctx.send(f"Path content `{block_path}` is now unblocked!")
else:
return await ctx.send(f"No blocked path content '{block_path}' found.")
@commands.check(check_if_staff)
@commands.command(
aliases=[
"disabled_paths",
"blocked_paths",
"listdisabledpaths",
"listblockedpaths",
"list_blocked_paths",
]
)
async def list_disabled_paths(self, ctx: Context):
messages = []
disabled_paths = get_disabled_paths(self.bot)
message = (
"**Blocking analysis of logs containing the following content in paths:**\n"
)
for entry in disabled_paths:
if len(message) >= 1500:
messages.append(message)
message = f"- `{entry}`\n"
else:
message += f"- `{entry}`\n"
if message not in messages:
# Add the last message as well
messages.append(message)
for msg in messages:
await ctx.send(msg)
async def analyse_log_message(self, message: Message, attachment_index=0):
author_id = message.author.id
author_mention = message.author.mention
filename = message.attachments[attachment_index].filename
filesize = message.attachments[attachment_index].size
# Any message over 2000 chars is uploaded as message.txt, so this is accounted for
log_file_link = message.jump_url
uploaded_logs_exist = [
True for elem in self.uploaded_log_info if filename in elem.values()
]
if not any(uploaded_logs_exist):
reply_message = await message.channel.send(
"Log detected, parsing...", reference=message
)
try:
embed = await self.log_file_read(message)
if "Ryujinx_" in filename:
self.uploaded_log_info.append(
{
"filename": filename,
"file_size": filesize,
"link": log_file_link,
"author": author_id,
}
)
# Avoid duplicate log file analysis, at least temporarily; keep track of the last few filenames of uploaded logs
# this should help support channels not be flooded with too many log files
# fmt: off
self.uploaded_log_info = self.uploaded_log_info[-5:]
# fmt: on
return await reply_message.edit(content=None, embed=embed)
except UnicodeDecodeError as error:
await reply_message.edit(
content=author_mention,
embed=Embed(
description="This log file appears to be invalid. Please re-check and re-upload your log file.",
colour=self.ryujinx_blue,
),
)
logging.warning(error)
except Exception as error:
await reply_message.edit(
content=f"Error: Couldn't parse log; parser threw `{type(error).__name__}` exception."
)
logging.warning(error)
else:
duplicate_log_file = next(
(
elem
for elem in self.uploaded_log_info
if elem["filename"] == filename
and elem["file_size"] == filesize
and elem["author"] == author_id
),
None,
)
await message.channel.send(
content=author_mention,
embed=Embed(
description=f"The log file `{filename}` appears to be a duplicate [already uploaded here]({duplicate_log_file['link']}). Please upload a more recent file.",
colour=self.ryujinx_blue,
),
)
@commands.cooldown(3, 30, BucketType.channel)
@commands.command(
aliases=["analyselog", "analyse_log", "analyze", "analyzelog", "analyze_log"]
)
async def analyse(self, ctx: Context, attachment_number=1):
await ctx.message.delete()
if ctx.message.reference is not None:
message = await ctx.fetch_message(ctx.message.reference.message_id)
if len(message.attachments) >= attachment_number:
attachment = message.attachments[attachment_number - 1]
is_log_file, _ = self.is_valid_log_name(attachment)
if is_log_file:
return await self.analyse_log_message(
message, attachment_number - 1
)
else:
return await ctx.send(
f"The attached log file '{attachment.filename}' is not valid.",
reference=ctx.message.reference,
)
return await ctx.send(
"Please use `.analyse` as a reply to a message with an attached log file."
)
@Cog.listener()
async def on_message(self, message: Message):
await self.bot.wait_until_ready()
if message.author.bot:
return
for attachment in message.attachments:
is_log_file, is_ryujinx_log_file = self.is_valid_log_name(attachment)
if is_log_file and not is_ryujinx_log_file:
attached_log = message.attachments[0]
log_file = await self.download_file(attached_log.url)
# Large files show a header value when not downloaded completely
# this regex makes sure that the log text to read starts from the first timestamp, ignoring headers
log_file_header_regex = re.compile(
r"\d{2}:\d{2}:\d{2}\.\d{3}.*", re.DOTALL
)
log_file_match = re.search(log_file_header_regex, log_file)
if log_file_match:
log_file = log_file_match.group(0)
if self.is_game_blocked(log_file):
return await message.channel.send(
content=None, embed=await self.blocked_game_action(message)
)
blocked_path = self.contains_blocked_paths(log_file)
if blocked_path:
return await message.channel.send(
content=None,
embed=await self.blocked_path_action(message, blocked_path),
)
elif (
is_log_file
and is_ryujinx_log_file
and message.channel.id in self.bot_log_allowed_channels.values()
):
return await self.analyse_log_message(
message, message.attachments.index(attachment)
)
elif (
is_log_file
and is_ryujinx_log_file
and message.channel.id not in self.bot_log_allowed_channels.values()
):
return await message.author.send(
content=message.author.mention,
embed=Embed(
description="\n".join(
(
f"Please upload Ryujinx log files to the correct location:\n",
f'<#{self.bot.config.bot_log_allowed_channels["windows-support"]}>: Windows help and troubleshooting',
f'<#{self.bot.config.bot_log_allowed_channels["linux-support"]}>: Linux help and troubleshooting',
f'<#{self.bot.config.bot_log_allowed_channels["macos-support"]}>: macOS help and troubleshooting',
f'<#{self.bot.config.bot_log_allowed_channels["patreon-support"]}>: Help and troubleshooting for Patreon subscribers',
f'<#{self.bot.config.bot_log_allowed_channels["development"]}>: Ryujinx development discussion',
f'<#{self.bot.config.bot_log_allowed_channels["pr-testing"]}>: Discussion of in-progress pull request builds',
)
),
colour=self.ryujinx_blue,
),
)
async def setup(bot):
await bot.add_cog(LogFileReader(bot))