Delete all logs with blocked ids & Add cheat analysis (#48)

* Rename disabled_tids to disabled_ids and start adding support for bids

* Add cheat information

* Add analysis block for build ids

Always remove blocked game logs

* Add ro_section to disabled_ids

* Search all potential log files for blocked games

* Add commands to block ro_sections of games

* Change order of macro command arguments

* Add new disabled_ids key to wanted_jsons
This commit is contained in:
TSRBerry 2023-05-02 21:38:22 +02:00 committed by GitHub
parent 18970723e7
commit 77fc2040a2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 466 additions and 132 deletions

View file

@ -56,7 +56,7 @@ wanted_jsons = [
"data/invites.json",
"data/macros.json",
"data/persistent_roles.json",
"data/disabled_tids.json",
"data/disabled_ids.json",
]
for wanted_json_idx in range(len(wanted_jsons)):

View file

@ -1,5 +1,6 @@
import logging
import re
from typing import Optional
import aiohttp
from discord import Colour, Embed, Message, Attachment
@ -7,12 +8,20 @@ from discord.ext import commands
from discord.ext.commands import Cog, Context
from robocop_ng.helpers.checks import check_if_staff
from robocop_ng.helpers.disabled_tids import (
add_disabled_tid,
is_tid_valid,
remove_disabled_tid,
get_disabled_tids,
is_tid_disabled,
from robocop_ng.helpers.disabled_ids import (
add_disabled_app_id,
is_app_id_valid,
remove_disabled_app_id,
get_disabled_ids,
is_app_id_disabled,
is_build_id_valid,
add_disabled_build_id,
remove_disabled_build_id,
is_build_id_disabled,
is_ro_section_disabled,
is_ro_section_valid,
add_disabled_ro_section,
remove_disabled_ro_section,
)
logging.basicConfig(
@ -50,6 +59,113 @@ class LogFileReader(Cog):
async with session.get(log_url, headers=headers) as response:
return await response.text("UTF-8")
def get_main_ro_section(self, log_file: str) -> Optional[dict[str, str]]:
ro_section_regex = re.search(
r"PrintRoSectionInfo: main:[\r\n]*(.*)", log_file, re.DOTALL
)
if ro_section_regex is not None and len(ro_section_regex.groups()) > 0:
ro_section = {"module": "", "sdk_libraries": []}
for line in ro_section_regex.group(1).splitlines():
line = line.strip()
if line.startswith("Module:"):
ro_section["module"] = line[8:]
elif line.startswith("SDK Libraries:"):
ro_section["sdk_libraries"].append(line[19:])
elif line.startswith("SDK "):
ro_section["sdk_libraries"].append(line[4:])
else:
break
return ro_section
return None
def get_app_info(
self, log_file: str
) -> Optional[tuple[str, str, list[str], dict[str, str]]]:
game_name = re.search(
r"Loader [A-Za-z]*: Application Loaded:\s([^;\n\r]*)",
log_file,
re.MULTILINE,
)
if game_name is not None and len(game_name.groups()) > 0:
game_name = game_name.group(1).rstrip()
app_id_regex = re.match(r".* \[([a-zA-Z0-9]*)\]", game_name)
if app_id_regex:
app_id = app_id_regex.group(1).strip()
else:
app_id = None
bids_regex = re.search(
r"Build ids found for title ([a-zA-Z0-9]*):[\n\r]*(.*)",
log_file,
re.DOTALL,
)
if bids_regex is not None and len(bids_regex.groups()) > 0:
app_id_from_bids = bids_regex.group(1).strip()
build_ids = [
bid.strip()
for bid in bids_regex.group(2).splitlines()
if is_build_id_valid(bid.strip())
]
# TODO: Check if self.get_main_ro_section() is None and return an error
return (
app_id,
app_id_from_bids,
build_ids,
self.get_main_ro_section(log_file),
)
return None
def is_log_valid(self, log_file: str) -> bool:
app_info = self.get_app_info(log_file)
if app_info is None:
return True
app_id, another_app_id, _, _ = app_info
return app_id == another_app_id
def is_game_blocked(self, log_file: str) -> bool:
app_info = self.get_app_info(log_file)
if app_info is None:
return False
app_id, another_app_id, build_ids, main_ro_section = app_info
if is_app_id_disabled(self.bot, app_id) or is_app_id_disabled(
self.bot, another_app_id
):
return True
for bid in build_ids:
if is_build_id_disabled(self.bot, bid):
return True
return is_ro_section_disabled(self.bot, main_ro_section)
async def blocked_game_action(self, message: Message) -> Embed:
warn_command = self.bot.get_command("warn")
if warn_command is not None:
warn_message = await message.reply(
".warn This log contains a blocked game."
)
warn_context = await self.bot.get_context(warn_message)
await warn_context.invoke(
warn_command,
target=None,
reason="This log contains a blocked game.",
)
else:
logging.error(
f"Couldn't find 'warn' command. Unable to warn {message.author}."
)
pirate_role = message.guild.get_role(self.bot.config.named_roles["pirate"])
await message.author.add_roles(pirate_role)
embed = Embed(
title="⛔ Blocked game detected ⛔",
colour=Colour(0xFF0000),
description="This log contains a blocked game and has been removed.\n"
"The user has been warned and the pirate role was applied.",
)
embed.set_footer(text=f"Log uploaded by @{message.author.name}")
await message.delete()
return embed
async def log_file_read(self, message):
self.embed = {
"hardware_info": {
@ -67,6 +183,7 @@ class LogFileReader(Cog):
"game_name": "Unknown",
"errors": "No errors found in log",
"mods": "No mods found",
"cheats": "No cheats found",
"notes": [],
},
"settings": {
@ -103,20 +220,6 @@ class LogFileReader(Cog):
description="This log file appears to be invalid. Please make sure to upload a Ryujinx log file.",
)
def is_tid_blocked(log_file=log_file):
game_name = re.search(
r"Loader [A-Za-z]*: Application Loaded:\s([^;\n\r]*)",
log_file,
re.MULTILINE,
)
if game_name is not None and len(game_name.groups()) > 0:
game_name = game_name.group(1).rstrip()
tid = re.match(r".* \[([a-zA-Z0-9]*)\]", game_name)
if tid is not None:
tid = tid.group(1).strip()
return is_tid_disabled(self.bot, tid)
return False
def get_hardware_info(log_file=log_file):
for setting in self.embed["hardware_info"]:
try:
@ -284,6 +387,9 @@ class LogFileReader(Cog):
log_embed.add_field(
name="Mods", value=self.embed["game_info"]["mods"], inline=False
)
log_embed.add_field(
name="Cheats", value=self.embed["game_info"]["cheats"], inline=False
)
try:
notes_value = "\n".join(game_notes)
@ -540,11 +646,20 @@ class LogFileReader(Cog):
mods_status = list(dict.fromkeys(mods_status))
return mods_status
def cheat_information(log_file=log_file):
cheat_regex = re.compile(r"Installing cheat\s\'(.+?)\'")
matches = re.findall(cheat_regex, log_file)
if matches:
cheats = [match[0] for match in matches]
return list(set(cheats))
game_mods = mods_information()
if game_mods:
self.embed["game_info"]["mods"] = "\n".join(game_mods)
else:
pass
game_cheats = cheat_information()
if game_cheats:
self.embed["game_info"]["cheats"] = "\n".join(game_cheats)
controllers_regex = re.compile(r"Hid Configure: ([^\r\n]+)")
controllers = re.findall(controllers_regex, log_file)
@ -721,35 +836,25 @@ class LogFileReader(Cog):
except AttributeError:
pass
if is_tid_blocked():
warn_command = self.bot.get_command("warn")
if warn_command is not None:
warn_message = await message.reply(
".warn This log contains a blocked title id."
)
warn_context = await self.bot.get_context(warn_message)
await warn_context.invoke(
warn_command,
target=None,
reason="This log contains a blocked title id.",
)
else:
logging.error(
f"Couldn't find 'warn' command. Unable to warn {message.author}."
)
pirate_role = message.guild.get_role(self.bot.config.named_roles["pirate"])
await message.author.add_roles(pirate_role)
if self.is_game_blocked(log_file):
return await self.blocked_game_action(message)
for role in message.author.roles:
if role.id in self.disallowed_roles:
embed = Embed(
title="⛔ Blocked game detected ⛔",
colour=Colour(0xFF0000),
description="This log contains a blocked title id and has been removed.\n"
"The user has been warned and the pirate role was applied.",
description="I'm not allowed to analyse this log.",
)
embed.set_footer(text=f"Log uploaded by {author_name}")
return embed
await message.delete()
if not self.is_log_valid(log_file):
embed = Embed(
title="⚠️ Modified log detected ⚠️",
colour=Colour(0xFCFC00),
description="This log contains manually modified information and won't be analysed.",
)
embed.set_footer(text=f"Log uploaded by {author_name}")
return embed
get_hardware_info()
@ -760,52 +865,185 @@ class LogFileReader(Cog):
@commands.check(check_if_staff)
@commands.command(
aliases=["disallow_log_tid", "forbid_log_tid", "block_tid", "blocktid"]
aliases=["disallow_log_id", "forbid_log_id", "block_id", "blockid"]
)
async def disable_log_tid(self, ctx: Context, tid: str, note=""):
if not is_tid_valid(tid):
return await ctx.send("The specified TID is invalid.")
async def disable_log_id(
self, ctx: Context, block_id_type: str, block_id: str, note=""
):
match block_id_type.lower():
case "app" | "app_id" | "appid" | "tid" | "title_id":
if not is_app_id_valid(block_id):
return await ctx.send("The specified app id is invalid.")
if add_disabled_tid(self.bot, tid, note):
return await ctx.send(f"TID '{tid}' is now blocked!")
if add_disabled_app_id(self.bot, block_id, note):
return await ctx.send(
f"Application id '{block_id}' is now blocked!"
)
else:
return await ctx.send(f"TID '{tid}' is already blocked.")
return await ctx.send(
f"Application id '{block_id}' is already blocked."
)
case "build" | "build_id", "bid":
if not is_build_id_valid(block_id):
return await ctx.send("The specified build id is invalid.")
if add_disabled_build_id(self.bot, block_id, note):
return await ctx.send(f"Build id '{block_id}' is now blocked!")
else:
return await ctx.send(f"Build id '{block_id}' is already blocked.")
case _:
return await ctx.send(
"The specified id type is invalid. Valid id types are: ['app_id', 'build_id']"
)
@commands.check(check_if_staff)
@commands.command(
aliases=[
"allow_log_tid",
"unblock_log_tid",
"unblock_tid",
"allow_tid",
"unblocktid",
"allow_log_id",
"unblock_log_id",
"unblock_id",
"allow_id",
"unblockid",
]
)
async def enable_log_tid(self, ctx: Context, tid: str):
if not is_tid_valid(tid):
return await ctx.send("The specified TID is invalid.")
async def enable_log_id(self, ctx: Context, block_id_type: str, block_id: str):
match block_id_type.lower():
case "app" | "app_id" | "appid" | "tid" | "title_id":
if not is_app_id_valid(block_id):
return await ctx.send("The specified app id is invalid.")
if remove_disabled_tid(self.bot, tid):
return await ctx.send(f"TID '{tid}' is now unblocked!")
if remove_disabled_app_id(self.bot, block_id):
return await ctx.send(
f"Application id '{block_id}' is now unblocked!"
)
else:
return await ctx.send(f"TID '{tid}' is not blocked.")
return await ctx.send(
f"Application id '{block_id}' is not blocked."
)
case "build" | "build_id", "bid":
if not is_build_id_valid(block_id):
return await ctx.send("The specified build id is invalid.")
if remove_disabled_build_id(self.bot, block_id):
return await ctx.send(f"Build id '{block_id}' is now unblocked!")
else:
return await ctx.send(f"Build id '{block_id}' is not blocked.")
case _:
return await ctx.send(
"The specified id type is invalid. Valid id types are: ['app_id', 'build_id']"
)
@commands.check(check_if_staff)
@commands.command(
aliases=[
"blocked_tids",
"listblockedtids",
"list_blocked_log_tids",
"list_blocked_tids",
"blocked_ids",
"listblockedids",
"list_blocked_log_ids",
"list_blocked_ids",
]
)
async def list_disabled_tids(self, ctx: Context):
disabled_tids = get_disabled_tids(self.bot)
message = "**Blocking analysis of the following TIDs:**\n"
for tid, note in disabled_tids.items():
message += f"- [{tid.upper()}]: {note}\n" if note != "" else f"- [{tid}]\n"
async def list_disabled_ids(self, ctx: Context):
disabled_ids = get_disabled_ids(self.bot)
message = "**Blocking analysis of the following IDs:**\n"
for id_type, name in {
"app_id": "Application IDs",
"build_id": "Build IDs",
}.items():
if len(disabled_ids[id_type].keys()) > 0:
message += f"- {name}:\n"
for disabled_id, note in disabled_ids[id_type].items():
message += (
f" - [{disabled_id.upper()}]: {note}\n"
if note != ""
else f" - [{disabled_id}]\n"
)
message += "\n"
if len(disabled_ids["ro_section"].keys()) > 0:
message += "- Read-only sections:\n"
for note in disabled_ids["ro_section"].keys():
f"- [{note}]"
return await ctx.send(message)
@commands.check(check_if_staff)
@commands.command(
aliases=[
"disallow_ro_section",
"forbid_ro_section",
"block_ro_section",
"blockrosection",
]
)
async def disable_ro_section(
self, ctx: Context, note: str, ro_section_snippet: str
):
ro_section_snippet = ro_section_snippet.strip("`").splitlines()
ro_section_snippet = [
line for line in ro_section_snippet if len(line.strip()) > 0
]
ro_section_info_regex = re.search(
r"PrintRoSectionInfo: main:", ro_section_snippet[0]
)
if ro_section_info_regex is None:
ro_section_snippet.insert(0, "PrintRoSectionInfo: main:")
ro_section = self.get_main_ro_section("\n".join(ro_section_snippet))
if ro_section is not None and is_ro_section_valid(ro_section):
if add_disabled_ro_section(self.bot, note, ro_section):
return await ctx.send(
f"The specified read-only section '{note}' is now blocked."
)
else:
return await ctx.send(
f"The specified read-only section '{note}' is already blocked."
)
@commands.check(check_if_staff)
@commands.command(
aliases=[
"allow_ro_section",
"unblock_ro_section",
"allow_rosection",
"unblockrosection",
]
)
async def enable_ro_section(self, ctx: Context, note: str):
if remove_disabled_ro_section(self.bot, note):
return await ctx.send(
f"The read-only section for '{note}' is now unblocked!"
)
else:
return await ctx.send(f"The read-only section for '{note}' is not blocked.")
@commands.check(check_if_staff)
@commands.command(
aliases=[
"get_blocked_ro_section",
"disabled_ro_section",
"blocked_ro_section" "list_disabled_ro_section",
"list_blocked_ro_section",
]
)
async def get_disabled_ro_section(self, ctx: Context, note: str):
disabled_ids = get_disabled_ids(self.bot)
key_note = note.lower()
if key_note in disabled_ids["ro_section"].keys():
message = f"**Disabled read-only section for '{note}'**:\n"
message += "```\n"
for key, content in disabled_ids["ro_section"][key_note].items():
match key:
case "module":
message += f"Module: {content}\n"
case "sdk_libraries":
message += f"SDK Libraries: \n"
for entry in content:
message += f" SDK {entry}\n"
message += "\n"
message += "```"
return await ctx.send(message)
else:
return await ctx.send("The specified read-only section is not blocked.")
async def analyse_log_message(self, message: Message, attachment_index=0):
author_id = message.author.id
author_mention = message.author.mention
@ -814,12 +1052,6 @@ class LogFileReader(Cog):
# Any message over 2000 chars is uploaded as message.txt, so this is accounted for
log_file_link = message.jump_url
for role in message.author.roles:
if role.id in self.disallowed_roles:
return await message.channel.send(
"I'm not allowed to analyse this log."
)
uploaded_logs_exist = [
True for elem in self.uploaded_log_info if filename in elem.values()
]
@ -911,7 +1143,22 @@ class LogFileReader(Cog):
for attachment in message.attachments:
is_log_file, is_ryujinx_log_file = self.is_valid_log_name(attachment)
if (
if is_log_file and not is_ryujinx_log_file:
attached_log = message.attachments[0]
log_file = await self.download_file(attached_log.url)
# Large files show a header value when not downloaded completely
# this regex makes sure that the log text to read starts from the first timestamp, ignoring headers
log_file_header_regex = re.compile(
r"\d{2}:\d{2}:\d{2}\.\d{3}.*", re.DOTALL
)
log_file_match = re.search(log_file_header_regex, log_file)
if log_file_match:
log_file = log_file_match.group(0)
if self.is_game_blocked(log_file):
return await message.channel.send(
content=None, embed=await self.blocked_game_action(message)
)
elif (
is_log_file
and is_ryujinx_log_file
and message.channel.id in self.bot_log_allowed_channels.values()

View file

@ -23,7 +23,9 @@ class Macro(Cog):
@commands.cooldown(3, 30, BucketType.member)
@commands.command(aliases=["m"])
async def macro(self, ctx: Context, target: Optional[discord.Member], key: str):
async def macro(
self, ctx: Context, key: str, target: Optional[discord.Member] = None
):
await ctx.message.delete()
if len(key) > 0:
text = get_macro(self.bot, key)

View file

@ -0,0 +1,133 @@
import json
import os
from typing import Union
def get_disabled_ids_path(bot) -> str:
old_filepath = os.path.join(bot.state_dir, "data/disabled_tids.json")
new_filepath = os.path.join(bot.state_dir, "data/disabled_ids.json")
if os.path.isfile(old_filepath):
os.rename(old_filepath, new_filepath)
return new_filepath
def is_app_id_valid(app_id: str) -> bool:
return len(app_id) == 16 and app_id.isalnum()
def is_build_id_valid(build_id: str) -> bool:
return 32 <= len(build_id) <= 64 and build_id.isalnum()
def is_ro_section_valid(ro_section: dict[str, str]) -> bool:
return "module" in ro_section.keys() and "sdk_libraries" in ro_section.keys()
def get_disabled_ids(bot) -> dict[str, dict[str, Union[str, dict[str, str]]]]:
if os.path.isfile(get_disabled_ids_path(bot)):
with open(get_disabled_ids_path(bot), "r") as f:
disabled_ids = json.load(f)
# Migration code
if "app_id" not in disabled_ids.keys():
disabled_ids = {"app_id": disabled_ids, "build_id": {}, "ro_section": {}}
return disabled_ids
return {"app_id": {}, "build_id": {}, "ro_section": {}}
def set_disabled_ids(bot, contents: dict[str, dict[str, Union[str, dict[str, str]]]]):
with open(get_disabled_ids_path(bot), "w") as f:
json.dump(contents, f)
def is_app_id_disabled(bot, app_id: str) -> bool:
disabled_ids = get_disabled_ids(bot)
app_id = app_id.lower()
return app_id in disabled_ids["app_id"].keys()
def is_build_id_disabled(bot, build_id: str) -> bool:
disabled_ids = get_disabled_ids(bot)
build_id = build_id.lower()
if len(build_id) < 64:
build_id += "0" * (64 - len(build_id))
return build_id in disabled_ids["build_id"].keys()
def is_ro_section_disabled(bot, ro_section: dict[str, str]) -> bool:
disabled_ids = get_disabled_ids(bot)
matches = []
for note, entry in disabled_ids["ro_section"].items():
for key, content in entry.items():
matches.append(ro_section[key].lower() == content.lower())
if all(matches):
return True
else:
matches = []
return False
def add_disabled_app_id(bot, app_id: str, note="") -> bool:
disabled_ids = get_disabled_ids(bot)
app_id = app_id.lower()
if app_id not in disabled_ids["app_id"].keys():
disabled_ids["app_id"][app_id] = note
set_disabled_ids(bot, disabled_ids)
return True
return False
def remove_disabled_app_id(bot, app_id: str) -> bool:
disabled_ids = get_disabled_ids(bot)
app_id = app_id.lower()
if app_id in disabled_ids["app_id"].keys():
del disabled_ids["app_id"][app_id]
set_disabled_ids(bot, disabled_ids)
return True
return False
def add_disabled_build_id(bot, build_id: str, note="") -> bool:
disabled_ids = get_disabled_ids(bot)
build_id = build_id.lower()
if len(build_id) < 64:
build_id += "0" * (64 - len(build_id))
if build_id not in disabled_ids["build_id"].keys():
disabled_ids["build_id"][build_id] = note
set_disabled_ids(bot, disabled_ids)
return True
return False
def remove_disabled_build_id(bot, build_id: str) -> bool:
disabled_ids = get_disabled_ids(bot)
build_id = build_id.lower()
if len(build_id) < 64:
build_id += "0" * (64 - len(build_id))
if build_id in disabled_ids["build_id"].keys():
del disabled_ids["build_id"][build_id]
set_disabled_ids(bot, disabled_ids)
return True
return False
def add_disabled_ro_section(bot, note: str, ro_section: dict[str, str]) -> bool:
disabled_ids = get_disabled_ids(bot)
note = note.lower()
if note not in disabled_ids["ro_section"].keys():
disabled_ids["ro_section"][note] = {}
for key, content in ro_section.items():
disabled_ids["ro_section"][note][key] = content.lower()
set_disabled_ids(bot, disabled_ids)
return True
return False
def remove_disabled_ro_section(bot, note: str) -> bool:
disabled_ids = get_disabled_ids(bot)
note = note.lower()
if note in disabled_ids["ro_section"].keys():
del disabled_ids["ro_section"][note]
set_disabled_ids(bot, disabled_ids)
return True
return False

View file

@ -1,48 +0,0 @@
import json
import os
def get_disabled_tids_path(bot) -> str:
return os.path.join(bot.state_dir, "data/disabled_tids.json")
def is_tid_valid(tid: str) -> bool:
return len(tid) == 16 and tid.isalnum()
def get_disabled_tids(bot) -> dict[str, str]:
if os.path.isfile(get_disabled_tids_path(bot)):
with open(get_disabled_tids_path(bot), "r") as f:
return json.load(f)
return {}
def set_disabled_tids(bot, contents: dict[str, str]):
with open(get_disabled_tids_path(bot), "w") as f:
json.dump(contents, f)
def is_tid_disabled(bot, tid: str) -> bool:
disabled_tids = get_disabled_tids(bot)
tid = tid.lower()
return tid in disabled_tids.keys()
def add_disabled_tid(bot, tid: str, note="") -> bool:
disabled_tids = get_disabled_tids(bot)
tid = tid.lower()
if tid not in disabled_tids.keys():
disabled_tids[tid] = note
set_disabled_tids(bot, disabled_tids)
return True
return False
def remove_disabled_tid(bot, tid: str) -> bool:
disabled_tids = get_disabled_tids(bot)
tid = tid.lower()
if tid in disabled_tids.keys():
del disabled_tids[tid]
set_disabled_tids(bot, disabled_tids)
return True
return False