diff --git a/robocop_ng/cogs/logfilereader.py b/robocop_ng/cogs/logfilereader.py index 8406aa0..89e1fa3 100644 --- a/robocop_ng/cogs/logfilereader.py +++ b/robocop_ng/cogs/logfilereader.py @@ -1,5 +1,6 @@ import logging import re +from typing import Optional import aiohttp from discord import Colour, Embed, Message, Attachment @@ -23,6 +24,12 @@ from robocop_ng.helpers.disabled_ids import ( remove_disabled_ro_section, remove_disable_id, ) +from robocop_ng.helpers.disabled_paths import ( + is_path_disabled, + get_disabled_paths, + add_disabled_path, + remove_disabled_path, +) from robocop_ng.helpers.ryujinx_log_analyser import LogAnalyser logging.basicConfig( @@ -92,6 +99,15 @@ class LogFileReader(Cog): return True return is_ro_section_disabled(self.bot, main_ro_section) + def contains_blocked_paths(self, log_file: str) -> Optional[str]: + filepaths = LogAnalyser.get_filepaths(log_file) + if filepaths is None: + return None + for filepath in filepaths: + if is_path_disabled(self.bot, filepath): + return filepath + return None + async def blocked_game_action(self, message: Message) -> Embed: warn_command = self.bot.get_command("warn") if warn_command is not None: @@ -106,7 +122,7 @@ class LogFileReader(Cog): ) else: logging.error( - f"Couldn't find 'warn' command. Unable to warn {message.author}." + f"Couldn't find 'warn' command. Unable to warn {message.author} for uploading a log of a blocked game." ) pirate_role = message.guild.get_role(self.bot.config.named_roles["pirate"]) @@ -116,7 +132,38 @@ class LogFileReader(Cog): title="⛔ Blocked game detected ⛔", colour=Colour(0xFF0000), description="This log contains a blocked game and has been removed.\n" - "The user has been warned and the pirate role was applied.", + "The user has been warned and the pirate role has been applied.", + ) + embed.set_footer(text=f"Log uploaded by @{message.author.name}") + await message.delete() + return embed + + async def blocked_path_action(self, message: Message, blocked_path: str) -> Embed: + warn_command = self.bot.get_command("warn") + if warn_command is not None: + warn_message = await message.reply( + ".warn This log contains blocked content in paths." + ) + warn_context = await self.bot.get_context(warn_message) + await warn_context.invoke( + warn_command, + target=None, + reason=f"This log contains blocked content in paths: '{blocked_path}'", + ) + else: + logging.error( + f"Couldn't find 'warn' command. Unable to warn {message.author} for uploading a log " + f"containing a blocked content in paths." + ) + + pirate_role = message.guild.get_role(self.bot.config.named_roles["pirate"]) + await message.author.add_roles(pirate_role) + + embed = Embed( + title="⛔ Blocked content in path detected ⛔", + colour=Colour(0xFF0000), + description="This log contains paths containing blocked content and has been removed.\n" + "The user has been warned and the pirate role has been applied.", ) embed.set_footer(text=f"Log uploaded by @{message.author.name}") await message.delete() @@ -243,6 +290,9 @@ class LogFileReader(Cog): if self.is_game_blocked(log_file): return await self.blocked_game_action(message) + blocked_path = self.contains_blocked_paths(log_file) + if blocked_path: + return await self.blocked_path_action(message, blocked_path) for role in message.author.roles: if role.id in self.disallowed_roles: @@ -449,6 +499,61 @@ class LogFileReader(Cog): else: return await ctx.send(f"No read-only section blocked for '{disable_id}'.") + @commands.check(check_if_staff) + @commands.command( + aliases=["disallow_path", "forbid_path", "block_path", "blockpath"] + ) + async def disable_path(self, ctx: Context, block_path: str): + if add_disabled_path(self.bot, block_path): + return await ctx.send(f"Path content `{block_path}` is now blocked!") + else: + return await ctx.send(f"Path content `{block_path}` is already blocked.") + + @commands.check(check_if_staff) + @commands.command( + aliases=[ + "allow_path", + "unblock_path", + "unblockpath", + ] + ) + async def enable_path(self, ctx: Context, block_path: str): + if remove_disabled_path(self.bot, block_path): + return await ctx.send(f"Path content `{block_path}` is now unblocked!") + else: + return await ctx.send(f"No blocked path content '{block_path}' found.") + + @commands.check(check_if_staff) + @commands.command( + aliases=[ + "disabled_paths", + "blocked_paths", + "listdisabledpaths", + "listblockedpaths", + "list_blocked_paths", + ] + ) + async def list_disabled_paths(self, ctx: Context): + messages = [] + disabled_paths = get_disabled_paths(self.bot) + + message = ( + "**Blocking analysis of logs containing the following content in paths:**\n" + ) + for entry in disabled_paths: + if len(message) >= 1500: + messages.append(message) + message = f"- `{entry}`\n" + else: + message += f"- `{entry}`\n" + + if message not in messages: + # Add the last message as well + messages.append(message) + + for msg in messages: + await ctx.send(msg) + async def analyse_log_message(self, message: Message, attachment_index=0): author_id = message.author.id author_mention = message.author.mention @@ -563,6 +668,12 @@ class LogFileReader(Cog): return await message.channel.send( content=None, embed=await self.blocked_game_action(message) ) + blocked_path = self.contains_blocked_paths(log_file) + if blocked_path: + return await message.channel.send( + content=None, + embed=await self.blocked_path_action(message, blocked_path), + ) elif ( is_log_file and is_ryujinx_log_file diff --git a/robocop_ng/helpers/disabled_paths.py b/robocop_ng/helpers/disabled_paths.py new file mode 100644 index 0000000..c016856 --- /dev/null +++ b/robocop_ng/helpers/disabled_paths.py @@ -0,0 +1,47 @@ +import json +import os + +from robocop_ng.helpers.data_loader import read_json + + +def get_disabled_paths_path(bot) -> str: + return os.path.join(bot.state_dir, "data/disabled_paths.json") + + +def get_disabled_paths(bot) -> list[str]: + disabled_paths = read_json(bot, get_disabled_paths_path(bot)) + if "paths" not in disabled_paths.keys(): + return [] + return disabled_paths["paths"] + + +def set_disabled_paths(bot, contents: list[str]): + with open(get_disabled_paths_path(bot), "w") as f: + json.dump({"paths": contents}, f) + + +def is_path_disabled(bot, path: str) -> bool: + for disabled_path in get_disabled_paths(bot): + if disabled_path in path.strip().lower(): + return True + return False + + +def add_disabled_path(bot, disabled_path: str) -> bool: + disabled_path = disabled_path.strip().lower() + disabled_paths = get_disabled_paths(bot) + if disabled_path not in disabled_paths: + disabled_paths.append(disabled_path) + set_disabled_paths(bot, disabled_paths) + return True + return False + + +def remove_disabled_path(bot, disabled_path: str) -> bool: + disabled_path = disabled_path.strip().lower() + disabled_paths = get_disabled_paths(bot) + if disabled_path in disabled_paths: + disabled_paths.remove(disabled_path) + set_disabled_paths(bot, disabled_paths) + return True + return False diff --git a/robocop_ng/helpers/ryujinx_log_analyser.py b/robocop_ng/helpers/ryujinx_log_analyser.py index d5b1069..f9b23e7 100644 --- a/robocop_ng/helpers/ryujinx_log_analyser.py +++ b/robocop_ng/helpers/ryujinx_log_analyser.py @@ -41,6 +41,13 @@ class LogAnalyser: re.search("Load.*Application: Loading as [Hh]omebrew", log_file) is not None ) + @staticmethod + def get_filepaths(log_file: str) -> list[str]: + return [ + x.rstrip("\u0000") + for x in re.findall(r"(?:[A-Za-z]:)?(?:[\\/]+[^\\/:\"\r\n]+)+", log_file) + ] + @staticmethod def get_main_ro_section(log_file: str) -> Optional[dict[str, str]]: ro_section_matches = re.findall( @@ -711,6 +718,7 @@ class LogAnalyser: "errors": self._log_errors, "settings": self._settings, "app_info": self.get_app_info(self._log_text), + "paths": self.get_filepaths(self._log_text), }