Add automated Yubico OTP revoke support
Thanks @linuxgemini for all your help with this!
This commit is contained in:
parent
b2dd805f2d
commit
44518e810e
3 changed files with 129 additions and 0 deletions
|
@ -49,5 +49,6 @@ I (ave) would like to thank the following, in no particular order:
|
||||||
- ReSwitched community, for being amazing
|
- ReSwitched community, for being amazing
|
||||||
- ihaveamac/ihaveahax and f916253 for the original kurisu/robocop
|
- ihaveamac/ihaveahax and f916253 for the original kurisu/robocop
|
||||||
- misson20000 for adding in reaction removal feature and putting up with my many BS requests on PR reviews
|
- misson20000 for adding in reaction removal feature and putting up with my many BS requests on PR reviews
|
||||||
|
- linuxgemini for helping out with Yubico OTP revocation code (which is based on their work)
|
||||||
- Everyone who contributed to robocop-ng in any way (reporting a bug, sending a PR, forking and hosting their own at their own guild, etc).
|
- Everyone who contributed to robocop-ng in any way (reporting a bug, sending a PR, forking and hosting their own at their own guild, etc).
|
||||||
|
|
||||||
|
|
119
cogs/yubicootp.py
Normal file
119
cogs/yubicootp.py
Normal file
|
@ -0,0 +1,119 @@
|
||||||
|
from discord.ext.commands import Cog
|
||||||
|
import re
|
||||||
|
import config
|
||||||
|
import secrets
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
|
||||||
|
class YubicoOTP(Cog):
|
||||||
|
def __init__(self, bot):
|
||||||
|
self.bot = bot
|
||||||
|
self.otp_re = re.compile("^[cbdefghijklnrtuv]{44}$")
|
||||||
|
self.api_servers = [
|
||||||
|
"https://api.yubico.com",
|
||||||
|
"https://api2.yubico.com",
|
||||||
|
"https://api3.yubico.com",
|
||||||
|
"https://api4.yubico.com",
|
||||||
|
"https://api5.yubico.com",
|
||||||
|
]
|
||||||
|
self.reuse_responses = ["BAD_OTP", "REPLAYED_OTP"]
|
||||||
|
self.bad_responses = [
|
||||||
|
"MISSING_PARAMETER",
|
||||||
|
"NO_SUCH_CLIENT",
|
||||||
|
"OPERATION_NOT_ALLOWED",
|
||||||
|
]
|
||||||
|
self.modhex_to_hex_conversion_map = {
|
||||||
|
"c": "0",
|
||||||
|
"b": "1",
|
||||||
|
"d": "2",
|
||||||
|
"e": "3",
|
||||||
|
"f": "4",
|
||||||
|
"g": "5",
|
||||||
|
"h": "6",
|
||||||
|
"i": "7",
|
||||||
|
"j": "8",
|
||||||
|
"k": "9",
|
||||||
|
"l": "a",
|
||||||
|
"n": "b",
|
||||||
|
"r": "c",
|
||||||
|
"t": "d",
|
||||||
|
"u": "e",
|
||||||
|
"v": "f",
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_serial(self, otp):
|
||||||
|
"""Get OTP from serial, based on code by linuxgemini"""
|
||||||
|
if otp[:2] != "cc":
|
||||||
|
return False
|
||||||
|
|
||||||
|
hexconv = []
|
||||||
|
|
||||||
|
for modhexletter in otp[0:12]:
|
||||||
|
hexconv.append(self.modhex_to_hex_conversion_map[modhexletter])
|
||||||
|
|
||||||
|
return int("".join(hexconv), 16)
|
||||||
|
|
||||||
|
async def validate_yubico_otp(self, otp):
|
||||||
|
nonce = secrets.token_hex(15) # Random number in the valid range
|
||||||
|
for api_server in self.api_servers:
|
||||||
|
url = f"{api_server}/wsapi/2.0/verify?id={config.yubico_otp_client_id}&otp={otp}&nonce={nonce}"
|
||||||
|
try:
|
||||||
|
resp = await self.bot.aiosession.get(url)
|
||||||
|
assert resp.status == 200
|
||||||
|
except Exception as ex:
|
||||||
|
self.bot.log.warning(
|
||||||
|
f"Got {repr(ex)} on {api_server} with otp {otp}."
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
resptext = await resp.text()
|
||||||
|
|
||||||
|
# Turn the fields to a python dict for easier parsing
|
||||||
|
datafields = resptext.strip().split("\r\n")
|
||||||
|
datafields = {line.split("=")[0]: line.split("=")[1] for line in datafields}
|
||||||
|
|
||||||
|
datafields["nonce"] != nonce
|
||||||
|
# If we got a success, then return True
|
||||||
|
if datafields["status"] == "OK":
|
||||||
|
return True
|
||||||
|
elif datafields["status"] in self.reuse_responses:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# If status isn't an expected one, log it
|
||||||
|
self.bot.log.warning(
|
||||||
|
f"Got {repr(datafields)} on {api_server} with otp {otp} and nonce {nonce}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# If we fucked up in a way we can't recover from, just return None
|
||||||
|
if datafields["status"] in self.bad_responses:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Return None if we fail to get responses from any server
|
||||||
|
return None
|
||||||
|
|
||||||
|
@Cog.listener()
|
||||||
|
async def on_message(self, message):
|
||||||
|
await self.bot.wait_until_ready()
|
||||||
|
strin = self.otp_re.match(message.content)
|
||||||
|
if strin and (strin.string[:2] == "cc" or strin.string[:2] == "vv"):
|
||||||
|
otp = strin.string
|
||||||
|
# Validate OTP
|
||||||
|
validation_result = await self.validate_yubico_otp(otp)
|
||||||
|
if validation_result is not True:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Derive serial and a string to use it
|
||||||
|
serial = self.get_serial(otp)
|
||||||
|
serial_str = f" (serial: `{serial}`)" if serial else ""
|
||||||
|
|
||||||
|
# If OTP is valid, tell user that it was revoked
|
||||||
|
msg = await message.channel.send(
|
||||||
|
f"{message.author.mention}: Ate Yubico OTP `{otp}`{serial_str}"
|
||||||
|
". This message will self destruct in 5 seconds."
|
||||||
|
)
|
||||||
|
# and delete message after 5s to help SNR
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
await msg.delete()
|
||||||
|
|
||||||
|
|
||||||
|
def setup(bot):
|
||||||
|
bot.add_cog(YubicoOTP(bot))
|
|
@ -39,6 +39,7 @@ initial_cogs = [
|
||||||
"cogs.robocronp",
|
"cogs.robocronp",
|
||||||
"cogs.meme",
|
"cogs.meme",
|
||||||
"cogs.invites",
|
"cogs.invites",
|
||||||
|
"cogs.yubicootp"
|
||||||
]
|
]
|
||||||
|
|
||||||
# The following cogs are also available but aren't loaded by default:
|
# The following cogs are also available but aren't loaded by default:
|
||||||
|
@ -323,3 +324,11 @@ self_assignable_roles = {
|
||||||
pingmods_allow = [named_roles["community"]] + staff_role_ids
|
pingmods_allow = [named_roles["community"]] + staff_role_ids
|
||||||
pingmods_role = 360138431524765707
|
pingmods_role = 360138431524765707
|
||||||
modtoggle_role = 360138431524765707
|
modtoggle_role = 360138431524765707
|
||||||
|
|
||||||
|
# == Only if you want to use cogs.yubicootp ==
|
||||||
|
# Client ID from https://upgrade.yubico.com/getapikey/
|
||||||
|
yubico_otp_client_id = 1
|
||||||
|
yubico_otp_secret = ""
|
||||||
|
# Note: YOU CAN KEEP THIS ON 1, IT WILL STILL FUNCTION.
|
||||||
|
# Note: Secret is not currently used, but it's recommended for you to add
|
||||||
|
# if you use your own client ID so if I ever implement it, your bot won't break.
|
||||||
|
|
Loading…
Reference in a new issue