Automatically block analysis of logs containing blocked contents in paths (#108)

* Extract paths from logs and check for blocked content

* Extract paths in command line analyzer

* Split disabled paths message if necessary

* Log the blocked path that caused a warning

* Remove duplicate command alias

* Remove bad characters from extracted filepaths

* Fix is_path_disabled() only checking the full path

* Apply formatting

* Improve wording for the warning embeds

* Apply formatting
This commit is contained in:
TSRBerry 2024-09-01 15:53:44 +02:00 committed by GitHub
parent 76fe1dbbd4
commit 147011eba1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 168 additions and 2 deletions

View file

@ -1,5 +1,6 @@
import logging import logging
import re import re
from typing import Optional
import aiohttp import aiohttp
from discord import Colour, Embed, Message, Attachment from discord import Colour, Embed, Message, Attachment
@ -23,6 +24,12 @@ from robocop_ng.helpers.disabled_ids import (
remove_disabled_ro_section, remove_disabled_ro_section,
remove_disable_id, remove_disable_id,
) )
from robocop_ng.helpers.disabled_paths import (
is_path_disabled,
get_disabled_paths,
add_disabled_path,
remove_disabled_path,
)
from robocop_ng.helpers.ryujinx_log_analyser import LogAnalyser from robocop_ng.helpers.ryujinx_log_analyser import LogAnalyser
logging.basicConfig( logging.basicConfig(
@ -92,6 +99,15 @@ class LogFileReader(Cog):
return True return True
return is_ro_section_disabled(self.bot, main_ro_section) return is_ro_section_disabled(self.bot, main_ro_section)
def contains_blocked_paths(self, log_file: str) -> Optional[str]:
filepaths = LogAnalyser.get_filepaths(log_file)
if filepaths is None:
return None
for filepath in filepaths:
if is_path_disabled(self.bot, filepath):
return filepath
return None
async def blocked_game_action(self, message: Message) -> Embed: async def blocked_game_action(self, message: Message) -> Embed:
warn_command = self.bot.get_command("warn") warn_command = self.bot.get_command("warn")
if warn_command is not None: if warn_command is not None:
@ -106,7 +122,7 @@ class LogFileReader(Cog):
) )
else: else:
logging.error( logging.error(
f"Couldn't find 'warn' command. Unable to warn {message.author}." f"Couldn't find 'warn' command. Unable to warn {message.author} for uploading a log of a blocked game."
) )
pirate_role = message.guild.get_role(self.bot.config.named_roles["pirate"]) pirate_role = message.guild.get_role(self.bot.config.named_roles["pirate"])
@ -116,7 +132,38 @@ class LogFileReader(Cog):
title="⛔ Blocked game detected ⛔", title="⛔ Blocked game detected ⛔",
colour=Colour(0xFF0000), colour=Colour(0xFF0000),
description="This log contains a blocked game and has been removed.\n" description="This log contains a blocked game and has been removed.\n"
"The user has been warned and the pirate role was applied.", "The user has been warned and the pirate role has been applied.",
)
embed.set_footer(text=f"Log uploaded by @{message.author.name}")
await message.delete()
return embed
async def blocked_path_action(self, message: Message, blocked_path: str) -> Embed:
warn_command = self.bot.get_command("warn")
if warn_command is not None:
warn_message = await message.reply(
".warn This log contains blocked content in paths."
)
warn_context = await self.bot.get_context(warn_message)
await warn_context.invoke(
warn_command,
target=None,
reason=f"This log contains blocked content in paths: '{blocked_path}'",
)
else:
logging.error(
f"Couldn't find 'warn' command. Unable to warn {message.author} for uploading a log "
f"containing a blocked content in paths."
)
pirate_role = message.guild.get_role(self.bot.config.named_roles["pirate"])
await message.author.add_roles(pirate_role)
embed = Embed(
title="⛔ Blocked content in path detected ⛔",
colour=Colour(0xFF0000),
description="This log contains paths containing blocked content and has been removed.\n"
"The user has been warned and the pirate role has been applied.",
) )
embed.set_footer(text=f"Log uploaded by @{message.author.name}") embed.set_footer(text=f"Log uploaded by @{message.author.name}")
await message.delete() await message.delete()
@ -243,6 +290,9 @@ class LogFileReader(Cog):
if self.is_game_blocked(log_file): if self.is_game_blocked(log_file):
return await self.blocked_game_action(message) return await self.blocked_game_action(message)
blocked_path = self.contains_blocked_paths(log_file)
if blocked_path:
return await self.blocked_path_action(message, blocked_path)
for role in message.author.roles: for role in message.author.roles:
if role.id in self.disallowed_roles: if role.id in self.disallowed_roles:
@ -449,6 +499,61 @@ class LogFileReader(Cog):
else: else:
return await ctx.send(f"No read-only section blocked for '{disable_id}'.") return await ctx.send(f"No read-only section blocked for '{disable_id}'.")
@commands.check(check_if_staff)
@commands.command(
aliases=["disallow_path", "forbid_path", "block_path", "blockpath"]
)
async def disable_path(self, ctx: Context, block_path: str):
if add_disabled_path(self.bot, block_path):
return await ctx.send(f"Path content `{block_path}` is now blocked!")
else:
return await ctx.send(f"Path content `{block_path}` is already blocked.")
@commands.check(check_if_staff)
@commands.command(
aliases=[
"allow_path",
"unblock_path",
"unblockpath",
]
)
async def enable_path(self, ctx: Context, block_path: str):
if remove_disabled_path(self.bot, block_path):
return await ctx.send(f"Path content `{block_path}` is now unblocked!")
else:
return await ctx.send(f"No blocked path content '{block_path}' found.")
@commands.check(check_if_staff)
@commands.command(
aliases=[
"disabled_paths",
"blocked_paths",
"listdisabledpaths",
"listblockedpaths",
"list_blocked_paths",
]
)
async def list_disabled_paths(self, ctx: Context):
messages = []
disabled_paths = get_disabled_paths(self.bot)
message = (
"**Blocking analysis of logs containing the following content in paths:**\n"
)
for entry in disabled_paths:
if len(message) >= 1500:
messages.append(message)
message = f"- `{entry}`\n"
else:
message += f"- `{entry}`\n"
if message not in messages:
# Add the last message as well
messages.append(message)
for msg in messages:
await ctx.send(msg)
async def analyse_log_message(self, message: Message, attachment_index=0): async def analyse_log_message(self, message: Message, attachment_index=0):
author_id = message.author.id author_id = message.author.id
author_mention = message.author.mention author_mention = message.author.mention
@ -563,6 +668,12 @@ class LogFileReader(Cog):
return await message.channel.send( return await message.channel.send(
content=None, embed=await self.blocked_game_action(message) content=None, embed=await self.blocked_game_action(message)
) )
blocked_path = self.contains_blocked_paths(log_file)
if blocked_path:
return await message.channel.send(
content=None,
embed=await self.blocked_path_action(message, blocked_path),
)
elif ( elif (
is_log_file is_log_file
and is_ryujinx_log_file and is_ryujinx_log_file

View file

@ -0,0 +1,47 @@
import json
import os
from robocop_ng.helpers.data_loader import read_json
def get_disabled_paths_path(bot) -> str:
return os.path.join(bot.state_dir, "data/disabled_paths.json")
def get_disabled_paths(bot) -> list[str]:
disabled_paths = read_json(bot, get_disabled_paths_path(bot))
if "paths" not in disabled_paths.keys():
return []
return disabled_paths["paths"]
def set_disabled_paths(bot, contents: list[str]):
with open(get_disabled_paths_path(bot), "w") as f:
json.dump({"paths": contents}, f)
def is_path_disabled(bot, path: str) -> bool:
for disabled_path in get_disabled_paths(bot):
if disabled_path in path.strip().lower():
return True
return False
def add_disabled_path(bot, disabled_path: str) -> bool:
disabled_path = disabled_path.strip().lower()
disabled_paths = get_disabled_paths(bot)
if disabled_path not in disabled_paths:
disabled_paths.append(disabled_path)
set_disabled_paths(bot, disabled_paths)
return True
return False
def remove_disabled_path(bot, disabled_path: str) -> bool:
disabled_path = disabled_path.strip().lower()
disabled_paths = get_disabled_paths(bot)
if disabled_path in disabled_paths:
disabled_paths.remove(disabled_path)
set_disabled_paths(bot, disabled_paths)
return True
return False

View file

@ -41,6 +41,13 @@ class LogAnalyser:
re.search("Load.*Application: Loading as [Hh]omebrew", log_file) is not None re.search("Load.*Application: Loading as [Hh]omebrew", log_file) is not None
) )
@staticmethod
def get_filepaths(log_file: str) -> list[str]:
return [
x.rstrip("\u0000")
for x in re.findall(r"(?:[A-Za-z]:)?(?:[\\/]+[^\\/:\"\r\n]+)+", log_file)
]
@staticmethod @staticmethod
def get_main_ro_section(log_file: str) -> Optional[dict[str, str]]: def get_main_ro_section(log_file: str) -> Optional[dict[str, str]]:
ro_section_matches = re.findall( ro_section_matches = re.findall(
@ -711,6 +718,7 @@ class LogAnalyser:
"errors": self._log_errors, "errors": self._log_errors,
"settings": self._settings, "settings": self._settings,
"app_info": self.get_app_info(self._log_text), "app_info": self.get_app_info(self._log_text),
"paths": self.get_filepaths(self._log_text),
} }