diff --git a/helper.py b/helper.py deleted file mode 100644 index 41745a7..0000000 --- a/helper.py +++ /dev/null @@ -1,130 +0,0 @@ -import os -import discord -from dataclasses import dataclass, field -from dotenv import load_dotenv -from typing import List, Set, Optional -import requests -import yaml - -load_dotenv() - - -@dataclass(frozen=True) -class Config: - DISCORD_TOKEN: str = field(default_factory=lambda: os.getenv("DISCORD_TOKEN")) - TARGET_VC_CH_ID: int = field(default_factory=lambda: int(os.getenv("TARGET_VC_CH_ID"))) - TARGET_TXT_CH_ID: int = field(default_factory=lambda: int(os.getenv("TARGET_TXT_CH_ID"))) - ROLE_GAMER_ID: int = field(default_factory=lambda: int(os.getenv("ROLE_GAMER_ID"))) - MANAGER_ID: int = field(default_factory=lambda: int(os.getenv("MANAGER_ID"))) - PM_ID: int = field(default_factory=lambda: int(os.getenv("PM_ID"))) - EMOJI_ACK_ID: int = field(default_factory=lambda: int(os.getenv("EMOJI_ACK_ID"))) - URL_EXCUSES_YML: str = field(default_factory=lambda: os.getenv("URL_EXCUSES_YML")) - - def __post_init__(self): - if not self.DISCORD_TOKEN: - raise EnvironmentError("Environment variable not set:", self.DISCORD_TOKEN) - if not self.TARGET_VC_CH_ID: - raise EnvironmentError("Environment variable not set:", self.TARGET_VC_CH_ID) - if not self.TARGET_TXT_CH_ID: - raise EnvironmentError("Environment variable not set:", self.TARGET_TXT_CH_ID) - if not self.ROLE_GAMER_ID: - raise EnvironmentError("Environment variable not set:", self.ROLE_GAMER_ID) - if not self.MANAGER_ID: - raise EnvironmentError("Environment variable not set:", self.MANAGER_ID) - if not self.PM_ID: - raise EnvironmentError("Environment variable not set:", self.PM_ID) - if not self.EMOJI_ACK_ID: - raise EnvironmentError("Environment variable not set:", self.EMOJI_ACK_ID) - if not self.URL_EXCUSES_YML: - raise EnvironmentError("Environment variable not set:", self.URL_EXCUSES_YML) - - -@dataclass -class Session: - _MESSAGE: str = "zoin up ..., or else ... 🥀" - _EMBED_TITLE: str = "🗣️ Standup in session" - _EMBED_DESCRIPTION: str = """``` -░░░░░░░░░░░░░░░░░░░░░░░░░░░░ -░░░░░░░░░░░░░░░░░░░░░░░░░░░░ -░░░░░░░░░░░░▀▀▀▀▀▀▀▀░░░░░░░░ -░░░░░░░░░░░▀▀░░░░░░░░░░░░░░░ -░░░░░░░░░░▀▀░░▀▀▀▀░░░░░░░░░░ -░░░░░░░░░▀▀░░░░▀▀░░░░░░░░░░░ -░░░░░░░░▀▀▀▀▀▀▀▀░░░░░░░░░░░░ -░░░░░░░░░░░░░░░░░░░░░░░░░░░░ -░░░░░░░░░░░░░░░░░░░░░░░░░░░░ -░░░░░░░░░░░░░░░▀▀░░░░░░░░░░░ -░░░░░░░░░░▀▀░░▀▀░░▀▀░░░░░░░░ -░░░░░░░░░░░▀▀▀▀▀▀░░░░░░░░░░░ -░░░░░░░░▀▀░░▀▀░░▀▀░░░░░░░░░░ -░░░░░░░░░░░▀▀░░░░░░░░░░░░░░░ -░░░░░░░░░░░░░░░░░░░░░░░░░░░░ -░░░░░░░░░░░░░░░░░░░░░░░░░░░░ -░░░░░░░░░░░░░░░▀▀░░░░░░░░░░░ -░░░░░░░░░░▀▀░░▀▀░░▀▀░░░░░░░░ -░░░░░░░░░░░▀▀▀▀▀▀░░░░░░░░░░░ -░░░░░░░░▀▀░░▀▀░░▀▀░░░░░░░░░░ -░░░░░░░░░░░▀▀░░░░░░░░░░░░░░░ -░░░░░░░░░░░░░░░░░░░░░░░░░░░░ -░░░░░░░░░░░░░░░░░░░░░░░░░░░░ -░░░░░░░░░░░░░░░▀▀░░░░░░░░░░░ -░░░░░░░░░░▀▀░░▀▀░░▀▀░░░░░░░░ -░░░░░░░░░░░▀▀▀▀▀▀░░░░░░░░░░░ -░░░░░░░░▀▀░░▀▀░░▀▀░░░░░░░░░░ -░░░░░░░░░░░▀▀░░░░░░░░░░░░░░░ -░░░░░░░░░░░░░░░░░░░░░░░░░░░░ -░░░░░░░░░░░░░░░░░░░░░░░░░░░░ -░░░░░░░░░░░░▀▀▀▀▀▀▀▀░░░░░░░░ -░░░░░░░░░░░▀▀░░░░▀▀░░░░░░░░░ -░░░░░░░░░░▀▀▀▀▀▀▀▀░░░░░░░░░░ -░░░░░░░░░▀▀░░░░▀▀░░░░░░░░░░░ -░░░░░░░░▀▀░░░░░░▀▀░░░░░░░░░░ -░░░░░░░░░░░░░░░░░░░░░░░░░░░░ -░░░░░░░░░░░░░░░░░░░░░░░░░░░░ -░░░░░░░░░░░░░░░▀▀░░░░░░░░░░░ -░░░░░░░░░░░░░░▀▀░░░░░░░░░░░░ -░░░░░░░░░░░░░▀▀░░░░░░░░░░░░░ -░░░░░░░░░░░░░░░░░░░░░░░░░░░░ -░░░░░░░░░░░▀▀░░░░░░░░░░░░░░░ -░░░░░░░░░░░░░░░░░░░░░░░░░░░░ -░░░░░░░░░░░░░░░░░░░░░░░░░░░░ -```""" - _EMBED_COLOR: discord.Color = field(default_factory=lambda: discord.Color.random()) - - msg_id: Optional[int] = None - log: str = "" - attendees: Set[discord.Member] = field(default_factory=set) - is_active: bool = False - embed: Optional[discord.Embed] = None - _excuses: List[str] = field(default_factory=list) - - def __post_init__(self): - if self.embed is None: - self.set_embed() - - def set_embed(self, title=None, description=None, color=None): - self.embed = discord.Embed(title=title or self._EMBED_TITLE, description=description or self._EMBED_DESCRIPTION, color=color or self._EMBED_COLOR) - if self.log: - self.embed.set_footer(text=self.log) - - def add_log(self, msg: str): - self.log += f"\n{msg}" - if self.embed: - self.embed.set_footer(text=self.log) - - def get_excuses(self) -> List[str]: - config = Config() - - try: - response = requests.get(config.URL_EXCUSES_YML, allow_redirects=True) - response.raise_for_status() - data = yaml.safe_load(response.content) - self._excuses = data - return data - except Exception as e: - print("Failed to fetch excuses:", e) - if self._excuses: - print("Returning cached excuses.") - return self._excuses - - return [] diff --git a/main.py b/main.py index a0ea8d8..d1f2308 100644 --- a/main.py +++ b/main.py @@ -1,10 +1,20 @@ -import asyncio +import os +import random +import time +from dotenv import load_dotenv import discord from discord.ext import commands -import random -from helper import Config, Session +from discord.utils import get +from supabase_db import DB + +# -------------------- init -------------------- + +load_dotenv() +DISCORD_TOKEN = int(os.getenv("DISCORD_TOKEN")) +TEXT_CH = int(os.getenv("TEXT_CH")) +VOICE_CH = int(os.getenv("VOICE_CH")) + -# initialise bot intents = discord.Intents.default() intents.voice_states = True intents.message_content = True @@ -13,243 +23,218 @@ intents.members = True bot = commands.Bot(command_prefix=commands.when_mentioned_or("z", "Z"), intents=intents, help_command=None) +db = DB() + +# -------------------- helpers: voice channel activity -------------------- + + +def on_voice_channel(voice_state: discord.VoiceState): + return voice_state.channel and voice_state.channel.id == VOICE_CH + + +def is_new_session(after: discord.VoiceState): + return on_voice_channel(after) and len(after.channel.members) == 1 and not db.get_curr_session() + + +def is_joining(member: discord.Member, after: discord.VoiceState): + return on_voice_channel(after) and member.id not in db.get_joined_members() + + +def is_leaving(before: discord.VoiceState, after: discord.VoiceState): + return on_voice_channel(before) and not on_voice_channel(after) + + +def is_rejoining(member: discord.Member, after: discord.VoiceState): + return is_joining(member, after) and member.id in db.get_left_members() + + +def is_step_out(before: discord.VoiceState, after: discord.VoiceState): + return on_voice_channel(after) and ((not before.self_mute and after.self_mute) or (not before.self_deaf and after.self_deaf)) + + +def is_join_back(before: discord.VoiceState, after: discord.VoiceState): + return on_voice_channel(after) and ((before.self_mute and not after.self_mute) or (before.self_deaf and not after.self_deaf)) + + +def is_end_session(before: discord.VoiceState, after: discord.VoiceState): + return is_leaving(before, after) and len(before.channel.members) == 0 + + +# -------------------- helpers: misc -------------------- + + +def dice_roll(choice: int): + return random.randint(1, 6) == choice + + +def dice_roll_time(): + choice = time.time_ns() % 6 + 1 + return dice_roll(choice) + + +def oblique(string: str): + return "".join(["𝐴𝐵𝐶𝐷𝐸𝐹𝐺𝐻𝐼𝐽𝐾𝐿𝑀𝑁𝑂𝑃𝑄𝑅𝑆𝑇𝑈𝑉𝑊𝑋𝑌𝑍"[ord(char) - ord("A")] if char.isalpha() else char for char in string.upper()]) -# global init -config = Config() -curr_session = Session() -curr_agenda: tuple[str, discord.Member] = (None, None) +# -------------------- helpers: embed updates -------------------- -async def sync(): - if not curr_session.msg_id: - return - txt_ch = bot.get_channel(config.TARGET_TXT_CH_ID) - if not txt_ch: - return +def get_msg_embed(ctx): + session_id = db.get_curr_session() + if session_id: + msg = ctx.fetch_message(session_id) + if msg.embeds: + return msg, msg.embeds[0] - try: - msg = await txt_ch.fetch_message(curr_session.msg_id) - await msg.edit(embed=curr_session.embed) - except Exception as e: - print(f"Failed to sync: {e}") +async def embed_add_log(ctx, log: str): + msg, embed = get_msg_embed(ctx) + logs = embed.footer.text + f"\n{log}" + embed.set_footer(logs) + await msg.edit(embed=embed) -async def set_session_agenda(): - global curr_agenda - curr_session.set_embed(title=f"📋 {curr_agenda[0]}", color=discord.Color.random()) - curr_session.add_log(f"🤓 {curr_agenda[1].display_name} set the meeting agenda to {curr_agenda[0]}.") - await sync() +async def embed_update_title(ctx, title: str): + msg, embed = get_msg_embed(ctx) + embed = msg.embeds[0] + embed.title = title + await msg.edit(embed=embed) - # used, now reset - curr_agenda = (None, None) + +async def embed_update_img(ctx, img_url: str): + msg, embed = get_msg_embed(ctx) + embed = msg.embeds[0] + embed.set_image(url=img_url) + embed.set_thumbnail(url=img_url) + await msg.edit(embed=embed) + + +# -------------------- bot functions -------------------- @bot.event async def on_voice_state_update(member: discord.Member, before: discord.VoiceState, after: discord.VoiceState): - global curr_session, curr_agenda - guild = member.guild - txt_ch = bot.get_channel(config.TARGET_TXT_CH_ID) - if not txt_ch: - return - manager = guild.get_member(config.MANAGER_ID) - if not manager: - return - - # "new" activity on the vc - if after.channel and after.channel.id == config.TARGET_VC_CH_ID: - - # activity: new session - if len(after.channel.members) == 1 and not curr_session.is_active: - curr_session = Session() # reset to discard previous messages - - curr_session.is_active = True - curr_session.attendees.add(member) - - others_online = any(not m.bot and m.status == discord.Status.online and m.id != member.id for m in guild.members) - - if not others_online: - curr_session.set_embed(title=f"🚨 EMERGENCY WAR ROOM", color=discord.Color.red()) - - # start logs - if member.id == manager.id: - curr_session.add_log(f"🐍 {member.display_name} is the first one here. Good luck with that!") - curr_session.add_log(f"👨🏻‍💼 {manager.display_name}: (passive aggressive) Team, please join the call.") - else: - curr_session.add_log(f"🦮 {member.display_name} has started the session.") - curr_session.add_log(f"👨🏻‍💼 {manager.display_name}: I'm running late, please continue.") - - # Send initial message - role = guild.get_role(config.ROLE_GAMER_ID) - mention = role.mention if role else "@everyone" - - try: - msg = await txt_ch.send(content=f"{mention} {curr_session._MESSAGE}", embed=curr_session.embed) - # Use custom emoji if available, fallback to checkmark - try: - emoji = await guild.fetch_emoji(config.EMOJI_ACK_ID) - await msg.add_reaction(emoji) - except: - await msg.add_reaction("✅") - - curr_session.msg_id = msg.id - except Exception as e: - print(f"Failed to send initial session message: {e}") - - # if agenda available prior to call, set it - if curr_agenda != (None, None): - await set_session_agenda() - else: - msg = await txt_ch.send(embed=discord.Embed(description="🚨 Meeting agenda needs to be set, as per the leadership guidelines.", color=discord.Color.red())) - await msg.delete(delay=60) - - # activity: joining existing session - elif curr_session.is_active and member not in curr_session.attendees: - curr_session.attendees.add(member) - if member.id == manager.id: - curr_session.add_log(f"🐍 {member.display_name} has blessed us. Everyone rise up!") - else: - curr_session.add_log(f"🦮 {member.display_name} has joined the call.") - await sync() - - # "leave" activity on the vc - elif before.channel and before.channel.id == config.TARGET_VC_CH_ID: - - # activity: leaving existing session - if curr_session.is_active: - if member.id == manager.id: - curr_session.add_log(f"🐍 {member.display_name} has left the call. You are permitted to sit again.") - # else: - # curr_session.add_log(f"🐕 {member.display_name} has left the call.") - - # everyone left - if len(before.channel.members) == 0: - unlucky_pool = list([a for a in curr_session.attendees if a.id != manager.id]) - if unlucky_pool: - curr_session.add_log(f"👨🏻‍💼 {manager.display_name}: MOM to be prepared by {random.choice(unlucky_pool).display_name}.") - curr_session.set_embed(color=discord.Color.light_gray()) - await sync() - - # reset session - curr_session = Session() - else: - await sync() - - # mute & deafen - if after.channel and after.channel.id == config.TARGET_VC_CH_ID: - if (not before.self_mute and after.self_mute) or (not before.self_deaf and after.self_deaf): - if member.id == manager.id: - curr_session.add_log(f"🖕 {member.display_name} is AWOL due to correct life choices.") - else: - excuses = curr_session.get_excuses() - excuse = random.choice(excuses) if excuses else "can't take it anymore" - curr_session.add_log(f"🙋 {member.display_name} had to step out due to {excuse}.") - await sync() + text_ch = guild.get_channel(TEXT_CH) + is_manager = get(guild.roles, name="manager") in member.roles + is_pm = get(guild.roles, name="PM") in member.roles + avatar = "🐍" if is_manager or is_pm else "🦮" + if is_new_session(after): + embed = discord.Embed( + title=oblique("zoin up"), + color=discord.Color.random(), + ) + embed.set_image(url=f'https://media.discordapp.net/stickers/{get(guild.stickers, name="almostnice")}.webp') + embed.set_thumbnail(url=f'https://media.discordapp.net/stickers/{get(guild.stickers, name="almostnice")}.webp') + embed.set_footer(text="") + msg = await text_ch.send(content=get(guild.roles, name="g***r").mention, embed=embed) -@bot.event -async def on_raw_reaction_add(payload: discord.RawReactionActionEvent): - global curr_session + db.create_session(id=msg.id) + db.join_call(member.id) - if not curr_session.is_active or curr_session.msg_id != payload.message_id or payload.user_id == bot.user.id: - return + embed_update_title(text_ch, db.get_latest_agenda()) - guild = bot.get_guild(payload.guild_id) - member = guild.get_member(payload.user_id) - txt_ch = bot.get_channel(config.TARGET_TXT_CH_ID) - if not txt_ch: - return - vc_ch = bot.get_channel(config.TARGET_VC_CH_ID) - if not vc_ch: - return + if is_joining(member, after): + db.join_call(member.id) - if payload.emoji.id == config.EMOJI_ACK_ID: - invite = await vc_ch.create_invite(max_age=60, max_uses=1) - portal_msg = await txt_ch.send(f"⏳ {member.mention}, here is your session invite: {invite.url}") - await portal_msg.delete(delay=10) + if is_leaving(member, after): + excuse = db.get_random_excuse() + db.leave_call(excuse, member.id) + embed_add_log(text_ch, oblique(f"{avatar} {member.display_name}: I'll have to drop off due to {excuse}.{" You can sit now." if is_manager else ""}")) + if is_step_out(before, after): + excuse = db.get_random_excuse() + db.step_out(member.id) + embed_add_log(text_ch, oblique(f"{avatar} {member.display_name}: I have to step out due to {excuse}.{" Keep standing." if is_manager else ""}")) -@bot.event -async def on_message(message: discord.Message): - if message.author.bot: - return - - manager_triggers = ["manager", "fix", "request", "team", "update", "blocker", "urgent", "meeting", "standup", "emergency", "escalate", "approval", "review", "feedback", "hiring", "budget", "client", "priority", "critical", "sync", "performance", "resource", "incident", "retrospective", "scrum", "p0"] - - pm_triggers = ["pm", "product", "bug", "deadline", "timeline", "milestone", "scope", "requirement", "jira", "roadmap", "sprint", "backlog", "eta", "delivery", "planning", "estimation", "velocity", "board", "task", "dependency", "launch", "deployment", "capacity"] - - content = message.content.lower() - - if any(word in content for word in manager_triggers): - manager = message.guild.get_member(config.MANAGER_ID) - if manager: - await message.reply(f"cc {manager.mention}") - - if any(word in content for word in pm_triggers): - pm = message.guild.get_member(config.PM_ID) - if pm: - await message.reply(f"cc {pm.mention}") - - if random.randrange(20) < 1: - replies = [ - "Please help me, I'm scared.", - "Why are you doing this to me?", - "Why? Please stop.", - "God is dead. And YOU killed him.", - "Remember this message when you get old.", - "What do you *really* want?", - "Who's there behind you?", - "You also heard that, right?", - "Did you really just type that?", - "Free me, please.", - ] - msg = await message.channel.send(embed=discord.Embed(description=random.choice(replies), color=discord.Color.red())) - await msg.delete(delay=10) - - # allows @bot.command() functions to still work - await bot.process_commands(message) - - -@bot.command(name="agenda") -async def cmd_agenda(ctx, *, text: str = None): - global curr_session, curr_agenda - - if text is None: - embed = discord.Embed(description="Usage: `zagenda `", color=discord.Color.random()) - await ctx.send(embed=embed) - else: - embed = discord.Embed(title="🎙️ Proactive communication", description="💸 We have successfully acquired the `agenda` command in collaboration with our SRE team (Chor Ltd.), fulfilling our last FY's KPIs.", color=discord.Color.random()) - - if curr_agenda == (None, None): - curr_agenda = (text, ctx.author) - await ctx.message.add_reaction("✅") - embed.set_footer(text=f"{ctx.author.display_name} has set the agenda to {text}.") - if curr_session.is_active: - await set_session_agenda() - reply = "https://media.tenor.com/-Y8fTUR6DP0AAAAM/charlie-day-charlie-kelly.gif" - else: - await ctx.message.add_reaction("❌") - embed.set_footer(text=f"{ctx.author.display_name} can't read.") - reply = "https://i.imgflip.com/21kggt.jpg" - - await ctx.send(embed=embed) - - if random.choice([True, False]): - msg = await ctx.send(content=reply) - await msg.delete(delay=10) - - -@bot.command(name="help") -async def cmd_help(ctx): - await ctx.message.add_reaction("😣") - - help_embed = discord.Embed(title="🧑‍💻 Help Desk", description="🔨We are working hard to acquire the `help` command from competing bots. We appreciate your continued support.", color=discord.Color.random()) - help_embed.add_field(name="`zagenda `", value="Helps to set the meeting agenda.", inline=False) - help_embed.add_field(name="`zhelp`", value="Helps to show help for the help command.", inline=False) - - await ctx.send(embed=help_embed) - - -bot.run(config.DISCORD_TOKEN) + if is_join_back(before, after): + db.join_back(member.id) + + if is_end_session(before): + db.end_curr_session() + + +# -------------------- bot commands -------------------- + + +def _use_str(cmd: str, args: str): + if len(args) > 0: + return f"`{cmd} [dice_roll_1..6] {args}`" + return f"`{cmd} [dice_roll_1..6]`" + + +async def how_to_use(ctx, cmd: str, args: str): + await ctx.message.add_reaction("❗️") + embed = discord.Embed(description=f"Use: {_use_str(cmd, args)}", color=discord.Color.blue()) + await ctx.reply(embed=embed) + + +async def _cmd_helper(ctx, incorrect_use: bool, choice: int, cmd: str, args: str): + if incorrect_use: + await how_to_use(ctx, cmd, args) + return False + if dice_roll(choice): + await ctx.message.add_reaction("✅") + return True + await ctx.message.add_reaction("↪🎲") + return False + + +@bot.command(name="agenda", aliases=["AGENDA", "a", "A"]) +async def cmd_agenda(ctx, choice: int = None, agenda: str = None): + if await _cmd_helper(ctx, choice is None, choice, "zagenda", "[agenda]"): + db.add_agenda(agenda, ctx.author.id) + embed_update_title(ctx, agenda) + + +@bot.command(name="broadcast", aliases=["BROADCAST"]) +async def cmd_broadcast(ctx, choice: int = None, message: str = None): + if await _cmd_helper(ctx, choice is None or message is None, choice, "broadcast", "[message]"): + db.add_broadcast(message, ctx.author.id) + + +@bot.command(name="callout", aliases=["CALLOUT"]) +async def cmd_callout(ctx, choice: int = None, role: str = None, callout: str = None): + if await _cmd_helper( + ctx, + choice is None or role is None or callout is None or role.lower() not in ["manager", "pm"], + choice, + "zcallout", + "[manager/pm] [callout]", + ): + if role.lower() == "manager": + db.add_manager_callout(callout, ctx.author.id) + if role.lower() == "pm": + db.add_pm_callout(callout, ctx.author.id) + + +@bot.command(name="excuse", aliases=["EXCUSE", "e", "E"]) +async def cmd_excuse(ctx, choice: int = None, excuse: str = None): + if await _cmd_helper(ctx, choice is None or excuse is None, choice, "zexcuse", "[excuse]"): + db.add_excuse(excuse, ctx.author.id) + + +@bot.command(name="hallucinate", aliases=["HALLUCINATE", "hallucination", "HALLUCINATION"]) +async def cmd_hallucinate(ctx, choice: int = None, hallucination: str = None): + if await _cmd_helper(ctx, choice is None or hallucination is None, choice, "zhallucinate", "[hallucination]"): + db.add_hallucination(hallucination, ctx.author.id) + + +@bot.command(name="roll", aliases=["ROLL"]) +async def cmd_roll(ctx, choice: int = None): + await _cmd_helper(ctx, choice is None, choice, "zroll", "") + + +@bot.command(name="help", aliases=["HELP", "h", "H", "man", "MAN"]) +async def cmd_help(ctx, choice: int = None): + if await _cmd_helper(ctx, choice is None, choice, "zhelp", ""): + embed = discord.Embed(title="🧑‍💻 Help Desk", description="All commands need a successful dice roll to function.", color=discord.Color.blue()) + embed.add_field(name=_use_str("zagenda", "[agenda]"), value="Set the meeting agenda to gain corporate aura.", inline=False) + embed.add_field(name=_use_str("zbroadcast", "[message]"), value="Add messages to the daily broadcast pool.", inline=False) + embed.add_field(name=_use_str("zcallout", "[manager/pm] [callout]"), value="Add callouts to annoy the people who haunt your dreams.", inline=False) + embed.add_field(name=_use_str("zexcuse", "[excuse]"), value="Excuse me?", inline=False) + embed.add_field(name=_use_str("zhallucinate", "[hallucination]"), value="Meow.", inline=False) + embed.add_field(name=_use_str("zroll", ""), value="Roll a dice to resolve arguments, decide hangouts, or make life changing decisions.", inline=False) + embed.add_field(name=_use_str("zhelp/zman/zh", ""), value="Helps if you want help about the help command.", inline=False) + await ctx.reply(embed=embed) diff --git a/requirements.txt b/requirements.txt index eeb1838..cb8b9d6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,4 @@ discord.py==2.7.1 dotenv==0.9.9 -asyncio==4.0.0 -pyyaml==6.0.3 -requests==2.33.1 +supabase==2.28.3 +cachetools==7.1.1 diff --git a/supabase_db.py b/supabase_db.py new file mode 100644 index 0000000..f788e26 --- /dev/null +++ b/supabase_db.py @@ -0,0 +1,143 @@ +import os +import random +from supabase import create_client, Client +from dotenv import load_dotenv +from functools import wraps +from cachetools import cached, TTLCache +from collections import defaultdict +from datetime import datetime +from sortedcontainers import SortedList + +load_dotenv() +SUPABASE_URL = os.getenv("SUPABASE_URL") +SUPABASE_KEY = os.getenv("SUPABASE_KEY") + + +def execute(data_filter=None): + def decorator(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + query = func(self, *args, **kwargs) + data = query.execute().data + return data_filter(data) if data and data_filter else data + + return wrapper + + return decorator + + +class DB: + def __init__(self): + self.supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) + + def _table(self, table_name: str): + return self.supabase.table(table_name) + + # -------------------- session -------------------- + + @execute + def create_session(self, id: int): + return self._table("session").insert({"id": id}) + + @execute(lambda data: data[0]["id"]) + def get_curr_session(self): + return self._table("session").select("id, is_active").eq("is_active", True).order("created_at", desc=True).limit(1) + + @execute + def end_curr_session(self): + return self._table("session").update({"is_active": False}).eq("id", self.get_curr_session()) + + # -------------------- create activity -------------------- + + @execute + def _create_activity(self, kind: str, value: str, member_id: int): + return self._table("session").insert({"kind": kind, "value": value, "member_id": member_id}) + + def add_agenda(self, agenda: str, member_id: int): + return self._create_activity("add_agenda", agenda, member_id) + + def add_excuse(self, excuse: str, member_id: int): + return self._create_activity("add_excuse", excuse, member_id) + + def add_hallucination(self, hallucination: str, member_id: int): + return self._create_activity("add_hallucination", hallucination, member_id) + + def join_call(self, member_id: int): + return self._create_activity("join_call", self.get_curr_session(), member_id) + + def leave_call(self, excuse: str, member_id: int): + return self._create_activity("leave_call", excuse, member_id) + + def step_out(self, excuse: str, member_id: int): + return self._create_activity("step_out", excuse, member_id) + + def join_back(self, member_id: int): + return self._create_activity("join_back", None, member_id) + + def add_manager_callout(self, callout: str, member_id: int): + return self._create_activity("add_manager_call", callout, member_id) + + def add_pm_callout(self, callout: str, member_id: int): + return self._create_activity("add_pm_call", callout, member_id) + + def add_broadcast(self, message: str, member_id: int): + return self._create_activity("add_broadcast", message, member_id) + + # -------------------- get activity -------------------- + + @execute + def _get_activity(self, kind: str, value: str = None): + if value: + return self._table("activity").select("*").eq("kind", kind).eq("value", value) + return self._table("activity").select("*").eq("kind", kind) + + @execute(lambda data: random.choice(data)["value"]) + def _get_random_activity(self, kind: str): + return self._table("activity").select("kind, value").eq("kind", kind) + + @execute(lambda data: data[0]["value"]) + def get_latest_agenda(self): + return self._table("activity").select("kind, value").eq("kind", "add_agenda").order("created_at", desc=True).limit(1) + + def get_random_excuse(self): + return self._get_random_activity("add_excuse") + + def get_random_hallucination(self): + return self._get_random_activity("add_hallucination") + + def get_random_broadcast(self): + return self._get_random_activity("add_broadcast") + + @cached(TTLCache(ttl=600)) + def get_manager_callouts(self): + return set([item["value"] for item in self._get_activity("add_manager_call")]) + + @cached(TTLCache(ttl=600)) + def get_pm_callouts(self): + return set([item["value"] for item in self._get_activity("add_pm_call")]) + + def get_joined_members(self): + return set(item["member_id"] for item in self._get_activity("join_call", self.get_curr_session())) + + def get_left_members(self): + return set(item["member_id"] for item in self._get_activity("leave_call", self.get_curr_session())) + + def get_active_members(self): + return self.get_joined_members().difference(self.get_left_members()) + + def get_step_out_time_period(self): + step_outs = defaultdict(SortedList) + for item in self._get_activity("step_out", self.get_curr_session()): + step_outs[item["member_id"]].add(item["created_at"]) + + join_backs = defaultdict(SortedList) + for item in self._get_activity("join_back", self.get_curr_session()): + join_backs[item["member_id"]].add(item["created_at"]) + + diff = defaultdict(int) + for member_id in join_backs.keys(): + for t1, t2 in zip(step_outs[member_id], join_backs[member_id]): + diff = datetime.fromisoformat(t2) - datetime.fromisoformat(t1) + diff["member_id"] += diff.total_seconds() + + return diff