From 942365efb2d598df739f7ea4475da2390079e79a Mon Sep 17 00:00:00 2001 From: HypoxiE Date: Mon, 7 Jul 2025 00:28:19 +0300 Subject: [PATCH] first commit --- .gitignore | 11 + README.md | 0 exclude.txt | 6 + filter.txt | 8 + requirements.txt | 20 + src/CoreMod.py | 679 +++++++++++++++++++++++++++ src/cogs/administrators.py | 210 +++++++++ src/cogs/moderators.py | 686 ++++++++++++++++++++++++++++ src/cogs/resetsupcommands.py | 14 + src/cogs/users.py | 280 ++++++++++++ src/constants/global_constants.py | 24 + src/database/db_classes.py | 206 +++++++++ src/database/settings/config.py | 17 + src/managers/DataBaseManager.py | 137 ++++++ src/managers/old_DataBaseManager.py | 239 ++++++++++ src/test.py | 103 +++++ 16 files changed, 2640 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 exclude.txt create mode 100644 filter.txt create mode 100644 requirements.txt create mode 100644 src/CoreMod.py create mode 100644 src/cogs/administrators.py create mode 100644 src/cogs/moderators.py create mode 100644 src/cogs/resetsupcommands.py create mode 100644 src/cogs/users.py create mode 100644 src/constants/global_constants.py create mode 100644 src/database/db_classes.py create mode 100644 src/database/settings/config.py create mode 100644 src/managers/DataBaseManager.py create mode 100644 src/managers/old_DataBaseManager.py create mode 100644 src/test.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0264038 --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ + +venv/ + +__pycache__/ +*.pyc + +.env +.secrets +db_settings.py +src/data/secrets/ +src/backups/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/exclude.txt b/exclude.txt new file mode 100644 index 0000000..55641d2 --- /dev/null +++ b/exclude.txt @@ -0,0 +1,6 @@ +venv/ +__pycache__/ +*.pyc +.env +.secrets +backups/ \ No newline at end of file diff --git a/filter.txt b/filter.txt new file mode 100644 index 0000000..c3cc3d5 --- /dev/null +++ b/filter.txt @@ -0,0 +1,8 @@ ++ /ModBot/** +- /ModBot/venv/** +- /ModBot/__pycache__/** +- /ModBot/backups/** +- /ModBot/.env +- /ModBot/.secrets +- /ModBot/*.pyc +- * \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2106fbf --- /dev/null +++ b/requirements.txt @@ -0,0 +1,20 @@ +aiohappyeyeballs==2.6.1 +aiohttp==3.12.12 +aiosignal==1.3.2 +async-timeout==5.0.1 +asyncpg==0.30.0 +attrs==25.3.0 +certifi==2025.4.26 +charset-normalizer==3.4.2 +disnake==2.10.1 +frozenlist==1.7.0 +greenlet==3.2.3 +idna==3.10 +multidict==6.4.4 +numpy==2.2.6 +propcache==0.3.2 +requests==2.32.4 +SQLAlchemy==2.0.41 +typing_extensions==4.14.0 +urllib3==2.4.0 +yarl==1.20.1 diff --git a/src/CoreMod.py b/src/CoreMod.py new file mode 100644 index 0000000..d9016e5 --- /dev/null +++ b/src/CoreMod.py @@ -0,0 +1,679 @@ +import inspect +import disnake +from disnake.ext import commands +from disnake.ext import tasks +import asyncio +import sys +import os +import shutil +import datetime +from collections import Counter +from fnmatch import fnmatch +import traceback +import json +import re + +from constants.global_constants import * +from data.TOKENS import TOKENS +from database.db_classes import all_data as DataBaseClasses +from managers.DataBaseManager import DatabaseManager +from managers.old_DataBaseManager import DatabaseManager as old_DatabaseManager +from database.settings import config + +from sqlalchemy.orm import declarative_base, relationship +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker +from sqlalchemy.orm import sessionmaker +from sqlalchemy.schema import CreateTable + +class AnyBots(commands.Bot): + ''' + + + Any bot class + + + ''' + def __init__(self, DataBaseManager): + super().__init__( + command_prefix="=", + intents=disnake.Intents.all() + ) + self.DataBaseManager = DataBaseManager + self.constants = constants + + async def on_ready(self): + self.krekchat = await self.fetch_guild(constants["krekchat"]) + print(self.krekchat.name) + self.sponsors = [disnake.utils.get(self.krekchat.roles, id=i) for i in constants["sponsors"]] + self.text_mute = disnake.utils.get(self.krekchat.roles, id=constants["mutes"][0]) + self.voice_mute = disnake.utils.get(self.krekchat.roles, id=constants["mutes"][1]) + self.ban_role = disnake.utils.get(self.krekchat.roles, id=constants["ban_role"]) + self.me = disnake.utils.get(self.krekchat.roles, id=constants["me"]) + self.moder = disnake.utils.get(self.krekchat.roles, id=constants["moder"]) + self.curator = disnake.utils.get(self.krekchat.roles, id=constants["curator"]) + self.everyone = disnake.utils.get(self.krekchat.roles, id=constants["everyone"]) + self.staff = disnake.utils.get(self.krekchat.roles, id=constants["staff"]) + self.level_roles = [disnake.utils.get(self.krekchat.roles, id=i) for i in constants["level_roles"]] + self.bots_talk_protocol_channel_id = constants["bots_talk_protocol_channel"] + self.databases_backups_channel_id = constants["databases_backups_channel"] + # lists + self.moderators = [disnake.utils.get(self.krekchat.roles, id=i) for i in constants["moderators"]] + self.hierarchy = [disnake.utils.get(self.krekchat.roles, id=i) for i in constants["hierarchy"]] + # /lists + await self.change_presence(status=disnake.Status.online, activity=disnake.Game("Работаю")) + print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: KrekModBot activated") + + def TimeFormater(self, time_str: str = "", *, + years: float = 0, months: float = 0, weeks: float = 0, days: float = 0, hours: float = 0, minutes: float = 0, seconds: float = 0, + now_timestamp = None): + """ + Форматирует строку времени в timestamp и разложенное время + Поддерживает форматы: 1d2h30m, 1д2ч30мин, 1.5d, 1 день 2 часа 30 минут и т.п. + Возвращает объект класса FormatedTime + """ + + class FormatedTime: + def __init__(self, time_units): + self.translator = {'years': 'лет', 'months': 'месяцев', 'weeks': 'недель', 'days': 'дней', 'hours': 'часов', 'minutes': 'минут', 'seconds': 'секунд'} + + delta = datetime.timedelta( + weeks=time_units['weeks'], + days=time_units['days'] + time_units['years'] * 365 + time_units['months'] * 30, + hours=time_units['hours'], + minutes=time_units['minutes'], + seconds=time_units['seconds'] + ) + total_seconds = delta.total_seconds() + + minutes = total_seconds // 60 + seconds = total_seconds % 60 + hours, minutes = divmod(minutes, 60) + days, hours = divmod(hours, 24) + + months = days // 30 + days = days % 30 + years = months // 12 + months = months % 12 + + self.time_units = { + 'years': years, + 'months': months, + 'days': days, + 'hours': hours, + 'minutes': minutes, + 'seconds': seconds + } + + self.future_time = 0 + if now_timestamp is None: + self.future_time = datetime.datetime.now() + delta + else: + self.future_time = datetime.datetime.fromtimestamp(now_timestamp) + delta + self.timestamp = self.future_time.timestamp() + + def __float__(self): + return self.timestamp + + def __int__(self): + return int(self.timestamp) + + def __repr__(self): + return self.__str__() + f" [{self.timestamp}]" + + def __str__(self): + if self.time_is_null(): + return "вечность" + else: + result = [] + for key, value in self.time_units.items(): + if value > 0: + result.append(f"{int(value) if key != 'seconds' else round(value, 2)} {self.translator[key]}") + + return ", ".join(result) + + def time_is_null(self): + return not any([i for i in self.time_units.values()]) + + def to_dict(self): + return self.time_units + + time_units = {'years': 0, 'months': 0, 'weeks': 0, 'days': 0, 'hours': 0, 'minutes': 0, 'seconds': 0} + if any([years, months, weeks, days, hours, minutes, seconds]): + time_units = {'years': years, 'months': months, 'weeks': weeks, 'days': days, 'hours': hours, 'minutes': minutes, 'seconds': seconds} + + return FormatedTime(time_units) + else: + time_str = time_str.lower().replace(' ', '').replace(',', '.') + + replacements = { + # Русские + 'лет': 'years', + 'год': 'years', + 'мес': 'months', + 'нед': 'weeks', + 'дн': 'days', + 'д': 'days', + 'день': 'days', + 'дней': 'days', + 'дня': 'days', + 'ч': 'hours', + 'час': 'hours', + 'часов': 'hours', + 'часа': 'hours', + 'м': 'minutes', + 'мин': 'minutes', + 'минут': 'minutes', + 'минуты': 'minutes', + 'с': 'seconds', + 'сек': 'seconds', + 'секунд': 'seconds', + 'секунды': 'seconds', + # Английские + 'y': 'years', + 'w': 'weeks', + 'd': 'days', + 'h': 'hours', + 'm': 'minutes', + 's': 'seconds', + 'c': 'seconds' + } + + pattern = re.compile(r'(\d+(?:\.\d+)?)([a-zа-я]+)', re.IGNORECASE) + + def replacer(match): + number, unit = match.groups() + replacement = replacements.get(unit, unit) + return f"{number}{replacement}" + + time_str = pattern.sub(replacer, time_str) + + pattern = r'(\d+(\.\d+)?)(years|months|weeks|days|hours|minutes|seconds)' + matches = re.findall(pattern, time_str) + + for value, _, unit in matches: + time_units[unit] += float(value) + + return FormatedTime(time_units) + + async def bt_send(self, info: dict = {}): + def get_all_keys(dct, keys_list=None): + if keys_list is None: + keys_list = [] + for key, value in dct.items(): + keys_list.append(key) + if isinstance(value, dict): + get_all_keys(value, keys_list) + return keys_list + + krekchat = await self.fetch_guild(self.krekchat.id) + bt_channel = await krekchat.fetch_channel(self.bots_talk_protocol_channel_id) + + punishment_keys = ['type', 'options', 'severity', 'member', 'moderator'] + complaint_keys = ['type', 'options', 'accepted', 'attack_member', 'defence_member', 'moderator'] + unpunishment_keys = ['type', 'options', 'severity', 'member'] + if not 'type' in info: + await bt_channel.send(f"<@479210801891115009> Передан запрос без типа :: bt_send\n {traceback.extract_stack()[-2]}") + return 1 + if info['type'] == "punishment": + if get_all_keys(info) != punishment_keys: + if len(get_all_keys(info)) != punishment_keys: + await bt_channel.send(f"<@479210801891115009> Требуется {len(punishment_keys)}, а принято {len(get_all_keys(info))} ключей для punishment :: bt_send\n {traceback.extract_stack()[-2]}") + else: + await bt_channel.send(f"<@479210801891115009> Требуются ключи {punishment_keys}, а приняты {get_all_keys(info)} для punishment :: bt_send\n {traceback.extract_stack()[-2]}") + return 1 + elif info['type'] == "complaint": + if get_all_keys(info) != complaint_keys: + if len(get_all_keys(info)) != complaint_keys: + await bt_channel.send(f"<@479210801891115009> Требуется {len(complaint_keys)}, а принято {len(get_all_keys(info))} ключей для complaint :: bt_send\n {traceback.extract_stack()[-2]}") + else: + await bt_channel.send(f"<@479210801891115009> Требуются ключи {complaint_keys}, а приняты {get_all_keys(info)} для complaint :: bt_send\n {traceback.extract_stack()[-2]}") + return 1 + elif info['type'] == "unpunishment": + if get_all_keys(info) != unpunishment_keys: + if len(get_all_keys(info)) != unpunishment_keys: + await bt_channel.send(f"<@479210801891115009> Требуется {len(unpunishment_keys)}, а принято {len(get_all_keys(info))} ключей для unpunishment :: bt_send\n {traceback.extract_stack()[-2]}") + else: + await bt_channel.send(f"<@479210801891115009> Требуются ключи {unpunishment_keys}, а приняты {get_all_keys(info)} для unpunishment :: bt_send\n {traceback.extract_stack()[-2]}") + return 1 + else: + await bt_channel.send(f"<@479210801891115009> Передан неизвестный тип запроса {info['type']} :: bt_send\n {traceback.extract_stack()[-2]}") + return 1 + + + + info["sender"] = "ModBot" + await bt_channel.send(json.dumps(info)) + return 0 + + class ErrorOutHelper: + def __init__(self, send_function, err_name: str = "", err_description: str = "", ephemeral: bool = False, echo: bool = False, thumbnail = None): + self.err_name = err_name + self.err_description = err_description + self.send_function = send_function + self.ephemeral = ephemeral + self.echo = echo + self.thumbnail = thumbnail + self.colour = 0xff0000 + + async def out(self, err_description: str = "", err_name: str = "", d: str = "", n: str = ""): + if d: + err_description = d + if n: + err_name = n + + embed = disnake.Embed(title="", description="", colour = self.colour) + if err_name: + embed.title = err_name + else: + embed.title = self.err_name + + if err_description: + embed.description = err_description + else: + embed.description = self.err_description + + if not self.thumbnail is None: + embed.set_thumbnail(url = self.thumbnail) + + if self.echo: + print(f"{embed.title}: {embed.description}") + if 'ephemeral' in inspect.signature(self.send_function).parameters: + await self.send_function(embed = embed, ephemeral = self.ephemeral) + else: + await self.send_function(embed = embed) + + +class MainBot(AnyBots): + ''' + + + Main bot class + + + ''' + def __init__(self, DataBase, stop_event): + super().__init__(DataBase) + self.stop_event = stop_event + + async def on_ready(self): + await super().on_ready() + + self.CheckDataBases.cancel() + self.MakeBackups.cancel() + + self.MakeBackups.start() + self.CheckDataBases.start() + + async def BotOff(self): + self.CheckDataBases.cancel() + self.MakeBackups.cancel() + + self.stop_event.set() + + async def on_disconnect(self): + if self.stop_event.is_set(): + pass + else: + print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Соединение с дискордом разорвано") + await self.BotOff() + + @tasks.loop(seconds=60) + async def CheckDataBases(self): + try: + await self.CheckDataBasesRun() + except Exception as error: + print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: err CheckDataBasesRun: {error}") + + @tasks.loop(seconds=3600) + async def MakeBackups(self): + backup_file = await self.DataBaseManager.pg_dump() + + krekchat = await self.fetch_guild(self.krekchat.id) + backups_channel = await krekchat.fetch_channel(self.databases_backups_channel_id) + await backups_channel.send(content=f"Бэкап бд за {datetime.datetime.now()}:", file=disnake.File(backup_file)) + + async def CheckDataBasesRun(self): + self.krekchat = await self.fetch_guild(self.krekchat.id) + members = [i async for i in self.krekchat.fetch_members(limit=None)] + textmute = {'mute': [], 'unmute': list(filter(lambda m: self.text_mute in m.roles, members))} + voicemute = {'mute': [], 'unmute': list(filter(lambda m: self.voice_mute in m.roles, members))} + ban = {'ban': [], 'unban': list(filter(lambda m: self.ban_role in m.roles, members))} + #муты + async with self.DataBaseManager.session() as session: + async with session.begin(): + stmt = self.DataBaseManager.delete(self.DataBaseManager.model_classes['punishment_mutes_text']).where( + self.DataBaseManager.and_( + self.DataBaseManager.model_classes['punishment_mutes_text'].time_warn - datetime.datetime.now().timestamp() <= 0, + self.DataBaseManager.model_classes['punishment_mutes_text'].time_warn != None + ) + ) + await session.execute(stmt) + + stmt = self.DataBaseManager.select(self.DataBaseManager.model_classes['punishment_mutes_text']).where( + self.DataBaseManager.or_( + self.DataBaseManager.and_( + self.DataBaseManager.model_classes['punishment_mutes_text'].time_end - datetime.datetime.now().timestamp() <= 0, + self.DataBaseManager.model_classes['punishment_mutes_text'].time_warn == None + ), + self.DataBaseManager.and_( + self.DataBaseManager.model_classes['punishment_mutes_text'].time_end != None + ) + ) + ).with_for_update() + result = (await session.execute(stmt)).scalars().all() + + for penalt in result: + member = disnake.utils.get(members, id=penalt.user_id) + + if not member: + continue + + if penalt.time_warn is None and penalt.time_end-datetime.datetime.now().timestamp()<=0: + penalt.time_end = None + penalt.time_warn = (self.TimeFormater("30d"))[0] + + + if (not penalt.time_warn is None) and (not penalt.time_end is None): + penalt.time_end = None + + if not penalt.time_end is None: + if member in textmute['unmute']: + stmt = self.DataBaseManager.select(self.DataBaseManager.model_classes['punishment_mutes_text']).where( + self.DataBaseManager.and_( + self.DataBaseManager.model_classes['punishment_mutes_text'].user_id == member.id, + self.DataBaseManager.model_classes['punishment_mutes_text'].time_end != None + ) + ) + member_pens = (await session.execute(stmt)).scalars().all() + if len(member_pens)>0: + textmute['unmute'].remove(member) + if not member in textmute['mute']: + textmute['mute'].append(member) + + for member in textmute['mute']: + await member.add_roles(self.text_mute) + for member in textmute['unmute']: + await member.remove_roles(self.text_mute) + + async with session.begin(): + stmt = self.DataBaseManager.delete(self.DataBaseManager.model_classes['punishment_mutes_voice']).where( + self.DataBaseManager.and_( + self.DataBaseManager.model_classes['punishment_mutes_voice'].time_warn - datetime.datetime.now().timestamp() <= 0, + self.DataBaseManager.model_classes['punishment_mutes_voice'].time_warn != None + ) + ) + await session.execute(stmt) + + stmt = self.DataBaseManager.select(self.DataBaseManager.model_classes['punishment_mutes_voice']).where( + self.DataBaseManager.or_( + self.DataBaseManager.and_( + self.DataBaseManager.model_classes['punishment_mutes_voice'].time_end - datetime.datetime.now().timestamp() <= 0, + self.DataBaseManager.model_classes['punishment_mutes_voice'].time_warn == None + ), + self.DataBaseManager.and_( + self.DataBaseManager.model_classes['punishment_mutes_voice'].time_end != None + ) + ) + ).with_for_update() + result = (await session.execute(stmt)).scalars().all() + + for penalt in result: + member = disnake.utils.get(members, id=penalt.user_id) + + if not member: + continue + + if penalt.time_warn is None and penalt.time_end - datetime.datetime.now().timestamp() <= 0: + penalt.time_end = None + penalt.time_warn = (self.TimeFormater("30d"))[0] + + + if (not penalt.time_warn is None) and (not penalt.time_end is None): + penalt.time_end = None + + if not penalt.time_end is None: + if member in voicemute['unmute']: + stmt = self.DataBaseManager.select(self.DataBaseManager.model_classes['punishment_mutes_voice']).where( + self.DataBaseManager.and_( + self.DataBaseManager.model_classes['punishment_mutes_voice'].user_id == member.id, + self.DataBaseManager.model_classes['punishment_mutes_voice'].time_end != None + ) + ) + member_pens = (await session.execute(stmt)).scalars().all() + if len(member_pens)>0: + voicemute['unmute'].remove(member) + if not member in voicemute['mute']: + voicemute['mute'].append(member) + + for member in voicemute['mute']: + await member.add_roles(self.voice_mute) + await member.move_to(None) + for member in voicemute['unmute']: + await member.remove_roles(self.voice_mute) + #/муты + + #баны + async with session.begin(): + stmt = self.DataBaseManager.select(self.DataBaseManager.model_classes['punishment_bans']).where(self.DataBaseManager.model_classes['punishment_bans'].time_end != None).with_for_update() + result = (await session.execute(stmt)).scalars().all() + + for penalt in result: + member = disnake.utils.get(members, id=penalt.user_id) + + if not member: + continue + + if penalt.time_end - datetime.datetime.now().timestamp() <= 0: + penalt.time_end = None + + if penalt.time_end != None: + if member in ban['unban']: + stmt = self.DataBaseManager.select(self.DataBaseManager.model_classes['punishment_bans']).where( + self.DataBaseManager.and_( + self.DataBaseManager.model_classes['punishment_bans'].user_id == member.id, + self.DataBaseManager.model_classes['punishment_bans'].time_end != None + ) + ) + member_pens = (await session.execute(stmt)).scalars().all() + if len(member_pens)>0: + ban['unban'].remove(member) + if not member in ban['ban']: + ban['ban'].append(member) + + async with session.begin(): + stmt = self.DataBaseManager.select(self.DataBaseManager.model_classes['punishment_perms']) + result = (await session.execute(stmt)).scalars().all() + for penalt in result: + member = disnake.utils.get(members, id=penalt.user_id) + + if not member: + continue + + if member in ban['unban']: + stmt = self.DataBaseManager.select(self.DataBaseManager.model_classes['punishment_perms']).where( + self.DataBaseManager.model_classes['punishment_perms'].user_id == member.id + ) + member_perms = (await session.execute(stmt)).scalars().all() + if len(member_perms): + ban['unban'].remove(member) + if not member in ban['ban']: + ban['ban'].append(member) + + for member in ban['ban']: + await member.add_roles(self.ban_role) + await member.move_to(None) + for member in ban['unban']: + await member.remove_roles(self.ban_role) + #/баны + + #преды + async with session.begin(): + stmt = self.DataBaseManager.delete(self.DataBaseManager.model_classes['punishment_warns']).where(self.DataBaseManager.model_classes['punishment_warns'].time_warn - datetime.datetime.now().timestamp() <= 0) + await session.execute(stmt) + stmt = self.DataBaseManager.delete(self.DataBaseManager.model_classes['punishment_reprimands']).where(self.DataBaseManager.model_classes['punishment_reprimands'].time_warn - datetime.datetime.now().timestamp() <= 0) + await session.execute(stmt) + #/преды + + async def on_message(self, msg): + if msg.author.id == 479210801891115009 and msg.content == "botsoff": + await msg.reply(embed=disnake.Embed(description=f'Бот отключён', colour=0xff9900)) + await self.BotOff() + if type(msg.channel).__name__!="DMChannel" and fnmatch(msg.channel.name, "⚠жалоба-от-*-на-*"): + log_reports = disnake.utils.get(msg.guild.channels, id=1242373230384386068) + files=[] + for att in msg.attachments: + files = files + [await att.to_file()] + log_mess = await log_reports.send(f"Чат: `{msg.channel.name}`({msg.channel.id}).\n" + f"Автор: `{msg.author.name} ({msg.author.id})`\n" + + (f"Сообщение: ```{msg.content}```\n" if msg.content else ""), + files = files) + + message_words = msg.content.replace("/", " ").split(" ") + if "discord.gg" in message_words: + for i in range(len(message_words)): + if message_words[i]=="discord.gg" and not msg.author.bot: + log = disnake.utils.get(msg.guild.channels, id=893065482263994378) + try: + inv = await self.fetch_invite(url = "https://discord.gg/"+message_words[i+1]) + if inv.guild.id != 490445877903622144: + await log.send(f"{msg.author.mention}({msg.author.id}) отправил в чат {msg.channel.mention} сомнительную ссылку на сервер '{inv.guild.name}':```{msg.content}```") + mess = await msg.reply(embed=disnake.Embed(description=f'Ссылки-приглашения запрещены!', colour=0xff9900)) + await msg.delete() + await asyncio.sleep(20) + await mess.delete() + break + except disnake.errors.NotFound: + await log.send(f"{msg.author.mention}({msg.author.id}) отправил в чат {msg.channel.mention} [сомнительную ссылку]({msg.jump_url}) на неизвестный сервер:```{msg.content}```") + + + + +async def init_db(): + DataBaseEngine = create_async_engine( + config.Settings().DB_URL, + pool_size=20, + max_overflow=10, + pool_recycle=300, + pool_pre_ping=True, + #echo=True, + ) + async with DataBaseEngine.begin() as conn: + await conn.run_sync(DataBaseClasses['base'].metadata.create_all) + + return DatabaseManager(DataBaseEngine, DataBaseClasses) + +async def db_migration(DB_MANAGER): + new_DataBase = DB_MANAGER + DataBase = await old_DatabaseManager.connect("data/penalties.db") + await DataBase.execute("PRAGMA journal_mode=WAL") + await DataBase.execute("PRAGMA synchronous=NORMAL") + await DataBase.execute("PRAGMA foreign_keys = ON") + try: + async with new_DataBase.engine.begin() as conn: + await conn.run_sync(new_DataBase.metadata.drop_all) + await conn.run_sync(new_DataBase.metadata.create_all) + async with new_DataBase.session() as session: + for penaltid, userid, reason, timend, timewarn in await DataBase.SelectBD('punishment_text_mutes'): + penault = DB_MANAGER.model_classes['punishment_mutes_text'](user_id = userid, reason = reason, time_end = timend if timend else None, time_warn = timewarn if timewarn else None) + async with session.begin(): + session.add(penault) + + for penaltid, userid, reason, timend, timewarn in await DataBase.SelectBD('punishment_voice_mutes'): + penault = DB_MANAGER.model_classes['punishment_mutes_voice'](user_id = userid, reason = reason, time_end = timend if timend else None, time_warn = timewarn if timewarn else None) + async with session.begin(): + session.add(penault) + + for penaltid, userid, reason, timend in await DataBase.SelectBD('punishment_bans'): + penault = DB_MANAGER.model_classes['punishment_bans'](user_id = userid, reason = reason, time_end = timend if timend else None) + async with session.begin(): + session.add(penault) + + for penaltid, userid, reason in await DataBase.SelectBD('punishment_perms'): + penault = DB_MANAGER.model_classes['punishment_perms'](user_id = userid, reason = reason) + async with session.begin(): + session.add(penault) + + for penaltid, userid, reason, timend in await DataBase.SelectBD('punishment_warns'): + penault = DB_MANAGER.model_classes['punishment_warns'](user_id = userid, reason = reason, time_warn = timend) + async with session.begin(): + session.add(penault) + + for penaltid, userid, reason, timend in await DataBase.SelectBD('punishment_reprimands'): + penault = DB_MANAGER.model_classes['punishment_reprimands'](user_id = userid, reason = reason, time_warn = timend) + async with session.begin(): + session.add(penault) + finally: + await DataBase.close() + + raise Exception("Миграция БД завершена. требуется переименовывание файлов") + +async def run_bot(bot, token, stop_event): + try: + await bot.start(token) + except Exception as e: + print(f"Бот {bot.user.name if hasattr(bot, 'user') else 'Unknown'} упал с ошибкой: {e}") + stop_event.set() # Сигнализируем об остановке + +async def monitor_stop(stop_event, bots): + await stop_event.wait() + print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Получен сигнал остановки, завершаю всех ботов...") + + for bot in bots: + if not bot.is_closed(): + try: + await bot.close() + except Exception as e: + print(f"Ошибка при закрытии бота: {e}") + + await asyncio.sleep(0.1) + + +async def main(): + stop_event = asyncio.Event() + DataBase = None + all_bots = [] + bot = None + + try: + DataBase = await init_db() + + # Инициализация ботов + bot = MainBot(DataBase, stop_event) + all_bots = [bot] + + # Загрузка когов + bot.load_extension("cogs.users") + bot.load_extension("cogs.moderators") + bot.load_extension("cogs.administrators") + + # Запуск монитора остановки и ботов + monitor_task = asyncio.create_task(monitor_stop(stop_event, all_bots)) + bot_tasks = [ + asyncio.create_task(run_bot(bot, TOKENS["KrekModBot"], stop_event)) + ] + + await asyncio.gather(*bot_tasks, monitor_task) + + except KeyboardInterrupt: + print("Боты остановлены по запросу пользователя") + except Exception as e: + print(f"Произошла критическая ошибка: {e}") + finally: + await bot.BotOff() + + for bot in all_bots: + if not bot.is_closed(): + await bot.close() + + await DataBase.close() + + current_task = asyncio.current_task() + pending = [t for t in asyncio.all_tasks() if t is not current_task and not t.done()] + for task in pending: + task.cancel() + await asyncio.gather(*pending, return_exceptions=True) + + await asyncio.sleep(0.1) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/src/cogs/administrators.py b/src/cogs/administrators.py new file mode 100644 index 0000000..0d7d981 --- /dev/null +++ b/src/cogs/administrators.py @@ -0,0 +1,210 @@ +import disnake +from disnake.ext import commands +from disnake.ext import tasks +import requests +import numpy as np + +import asyncio +import sys +import os +import copy +import datetime +import math +import random +import json +import shutil + + +def setup(bot): + bot.add_cog(AdminModule(bot)) + +class AdminModule(commands.Cog): + def __init__(self, client): + self.client = client + self.DataBaseManager = client.DataBaseManager + + @commands.Cog.listener() + async def on_ready(self): + print(f'KrekModBot admin module activated') + + @commands.slash_command(name="bot_mod_off") + async def BotModOff(self, ctx: disnake.ApplicationCommandInteraction): + if self.client.me in ctx.author.roles: + await ctx.send(embed=disnake.Embed(description=f'Бот отключён', colour=0xff9900), ephemeral=True) + await self.client.BotOff() + else: + await ctx.send(embed=disnake.Embed(description=f'Не допустимо', colour=0xff9900), ephemeral=True) + + @commands.slash_command(name="очистка", administrator=True) + async def clear(self, ctx: disnake.AppCmdInter, count: int): + if self.client.me in ctx.author.roles: + await ctx.channel.purge(limit=count) + await ctx.send(embed = disnake.Embed(description = f'очищено {count} сообщений!', colour = 0xff9900), ephemeral=True) + else: + await ctx.send(embed = disnake.Embed(description = f'Ты чего удумал?', colour = 0xff9900), ephemeral=True) + + @commands.slash_command(description="Позволяет менять/создавать/удалять ветви", name="правка_ветви", administrator=True) + async def edit_branch(self, ctx: disnake.AppCmdInter, purpose: str = commands.Param(description="Укажите цель этой ветки (например, \"модерация\" или \"администрация\")", name="цель", default=None), + layer: int = commands.Param(description="Укажите слой этой ветки (у ролей нижних слоёв есть власть над верхними)", name="слой", default=None), + branchid: int = commands.Param(description="Необходимо для изменения уже существующей ветви", name="id", default=None), + is_admin: bool = commands.Param(description="Имеют ли пользователи в этой роли права администратора? *(только при создании)", name="администратор", default=False), + is_moder: bool = commands.Param(description="Имеют ли пользователи в этой роли права модератора? *(только при создании)", name="модератор", default=False), + delete_branch: str = commands.Param(description="Вы уверены, что хотите удалить ветвь? Для подтверждения впишите \"уверен\"", name="удаление", default=None)): + + if not self.client.me in ctx.author.roles: + await ctx.send(embed = disnake.Embed(description = f'Недостаточно прав', colour = 0xff9900), ephemeral=True) + return 1 + + async with self.DataBaseManager.session() as session: + + if delete_branch == "уверен": + if branchid is None: + await ctx.send(embed = disnake.Embed(description = f'Для удаления ветки необходимо указать её id', colour = 0xff9900)) + return 1 + + async with session.begin(): + stmt = self.DataBaseManager.delete(self.DataBaseManager.models['staff_branches'].m).where(self.DataBaseManager.models['staff_branches'].m.id == branchid) + await session.execute(stmt) + await ctx.send(embed = disnake.Embed(description = f'Ветка {branchid} успешно удалена', colour = 0xff9900)) + return 0 + + if branchid is None: + if layer is None or purpose is None: + await ctx.send(embed = disnake.Embed(description = f'Для создания роли необходимо ввести все данные', colour = 0xff9900)) + return 1 + async with session.begin(): + new_branch = self.DataBaseManager.models['staff_branches'].m(layer = layer, purpose = purpose, is_admin = is_admin, is_moder = is_moder) + session.add(new_branch) + await ctx.send(embed = disnake.Embed(description = f'Ветвь \"{purpose}\" успешно создана. Её новый id: {new_branch.id}', colour = 0xff9900)) + return 0 + else: + async with session.begin(): + branch = await session.get(self.DataBaseManager.models['staff_branches'].m, branchid, with_for_update = True) + if branch is None: + await ctx.send(embed = disnake.Embed(description = f'Ветви с таким идентификатором пока не существует', colour = 0xff9900)) + return 1 + + else: + if purpose is None and layer is None: + await ctx.send(embed = disnake.Embed(description = f'Для изменения ветви необходимы новые значения', colour = 0xff9900)) + return 1 + + if not purpose is None: + branch.purpose = purpose + + if not layer is None: + branch.layer = layer + + await ctx.send(embed = disnake.Embed(description = f'Ветвь \"{branch.purpose}\"({branch.id}) успешно изменена', colour = 0xff9900)) + return 0 + + + @commands.slash_command(description="Позволяет менять/создавать/удалять роли в системе персонала", name="правка_роли", administrator=True) + async def edit_role(self, ctx: disnake.AppCmdInter, roleid: str = commands.Param(description="Укажите id роли (используются идентификаторы дискорда)", name="id"), + staffsalary: int = commands.Param(description="Укажите зарплату этой роли", name="зарплата", default=0), + branchid: int = commands.Param(description="Укажите id ветви для этой роли *(только при создании)", name="ветвь", default=None), + layer: int = commands.Param(description="Укажите слой этой роли в ветке (у ролей нижних слоёв есть власть над верхними)", name="слой", default=None), + delete_role: str = commands.Param(description="Вы уверены, что хотите удалить роль из системы? Для подтверждения впишите \"уверен\"", name="удаление", default=None),): + if not self.client.me in ctx.author.roles: + await ctx.send(embed = disnake.Embed(description = f'Недостаточно прав', colour = 0xff9900), ephemeral=True) + return 1 + + roleid = int(roleid) + + staff_roles_model = self.DataBaseManager.models['staff_roles'].m + async with self.DataBaseManager.session() as session: + + if delete_role == "уверен": + async with session.begin(): + stmt = self.DataBaseManager.delete(staff_roles_model).where(staff_roles_model.id == roleid) + await session.execute(stmt) + await ctx.send(embed = disnake.Embed(description = f'Роль <@&{roleid}> успешно удалена из системы', colour = 0xff9900)) + return 0 + + async with session.begin(): + role = await session.get(staff_roles_model, roleid, with_for_update = True) + if not role is None: + if staffsalary != 0: + role.staff_salary = staffsalary + await ctx.send(embed = disnake.Embed(description = f'Зарплата роли <@&{roleid}> успешно изменена на {staffsalary}', colour = 0xff9900)) + return 0 + elif not layer is None: + role.layer = layer + await ctx.send(embed = disnake.Embed(description = f'Слой роли <@&{roleid}> успешно изменён на {layer}', colour = 0xff9900)) + return 0 + else: + await ctx.send(embed = disnake.Embed(description = f'Для изменения существующей роли необходимо ввести новые параметры: layer или staffsalary', colour = 0xff9900)) + return 1 + else: + if branchid is None: + await ctx.send(embed = disnake.Embed(description = f'Для внесения этой роли в систему необходимо указать id ветви', colour = 0xff9900)) + return 1 + if layer is None: + await ctx.send(embed = disnake.Embed(description = f'Для внесения этой роли в систему необходимо указать слой роли', colour = 0xff9900)) + return 1 + branch = await session.get(self.DataBaseManager.models['staff_branches'].m, branchid) + if branch is None: + await ctx.send(embed = disnake.Embed(description = f'Ветви с таким id пока не существует', colour = 0xff9900)) + return 1 + role = staff_roles_model(id = roleid, staff_salary = staffsalary, branch_id = branchid, layer = layer) + session.add(role) + await ctx.send(embed = disnake.Embed(description = f'Роль <@&{roleid}> успешно добавлена в ветвь {branchid}', colour = 0xff9900)) + return 0 + + @commands.slash_command(description="Позволяет создавать/удалять пользователей в системе персонала", name="правка_пользователя", administrator=True) + async def edit_member(self, ctx: disnake.AppCmdInter, userid: str = commands.Param(description="Укажите id пользователя (используются идентификаторы дискорда)", name="id"), + delete_user: str = commands.Param(description="Вы уверены, что хотите удалить пользователя из системы? Для подтверждения впишите \"уверен\"", name="удаление", default=None)): + if not self.client.me in ctx.author.roles: + await ctx.send(embed = disnake.Embed(description = f'Недостаточно прав', colour = 0xff9900), ephemeral=True) + return 1 + + userid = int(userid) + + staff_users_model = self.DataBaseManager.models['staff_users'].m + async with self.DataBaseManager.session() as session: + + if delete_user == "уверен": + async with session.begin(): + stmt = self.DataBaseManager.delete(staff_users_model).where(staff_users_model.id == userid) + await session.execute(stmt) + await ctx.send(embed = disnake.Embed(description = f'Пользователь <@{userid}> успешно удален из системы', colour = 0xff9900)) + member = await self.client.krekchat.fetch_member(userid) + await member.remove_roles(self.client.staff) + return 0 + + async with session.begin(): + user = await session.get(staff_users_model, userid, with_for_update = True) + if user is None: + member = await self.client.krekchat.fetch_member(userid) + await member.add_roles(self.client.staff) + user = staff_users_model(id = userid) + session.add(user) + await ctx.send(embed = disnake.Embed(description = f'Пользователь <@{userid}> успешно добавлен в систему', colour = 0xff9900)) + return 0 + else: + await ctx.send(embed = disnake.Embed(description = f'Пользователь <@{userid}> уже есть в системе', colour = 0xff9900)) + return 1 + + @commands.slash_command(description="!!ВАЖНО!! ИСПОЛЬЗОВАНИЕ ТОЛЬКО В ЭКСТРЕННЫХ СЛУЧАЯХ! Назначает пользователей на роль", name="назначить_пользователя", administrator=True) + async def appoint_member(self, ctx: disnake.AppCmdInter, userid: str = commands.Param(description="Укажите id пользователя (используются идентификаторы дискорда)", name="пользователь"), + roleid: str = commands.Param(description="Укажите id роли (используются идентификаторы дискорда)", name="роль"), + description: str = commands.Param(description="Описание", name="описание", default=None)): + if not self.client.me in ctx.author.roles: + await ctx.send(embed = disnake.Embed(description = f'Недостаточно прав', colour = 0xff9900), ephemeral=True) + return 1 + + userid = int(userid) + roleid = int(roleid) + + async with self.DataBaseManager.session() as session: + async with session.begin(): + + user = await session.get(self.DataBaseManager.model_classes['staff_users'], userid) + if user is None: + user = self.DataBaseManager.model_classes['staff_users'](id = userid) + session.add(user) + await session.flush() + user_role = await self.DataBaseManager.model_classes['staff_users_roles'].create_with_auto_branch(session, user_id = userid, role_id = roleid, description = description) + session.add(user_role) + await ctx.send(embed = disnake.Embed(description = f'Пользователю <@{userid}> успешно назначена роль <@&{roleid}>. \n```diff\n- Эта функция не выдаёт роли автоматически, поэтому требуется выдача вручную.```', colour = 0xff9900)) + return 0 \ No newline at end of file diff --git a/src/cogs/moderators.py b/src/cogs/moderators.py new file mode 100644 index 0000000..02f60ae --- /dev/null +++ b/src/cogs/moderators.py @@ -0,0 +1,686 @@ +import disnake +from disnake.ext import commands +from disnake.ext import tasks +import asyncio +import sys +import os +import copy +import datetime +import math +import random +import json +import shutil + +translate = {"textmute":"Текстовый мут", "voicemute":"Голосовой мут", "ban":"Бан", "warning":"Предупреждение",\ + "time":"Время", "reason":"Причина", "changenick":"Сменить ник", "reprimand":"Выговор", "newnick":"Новый ник"} + +form_to_send = {'punishment_mutes_voice': 'voice_mutes', 'punishment_mutes_text': 'text_mutes', 'punishment_bans': 'bans', 'punishment_warns': 'warns', 'punishment_reprimands': 'reprimand', 'punishment_perms': 'perm'} + +count_translate = {"textmute":3, "voicemute":3, "ban":9, "warning":1, "changenick":0, "punishment_mutes_text":3, "punishment_mutes_voice":3, "punishment_bans":9, "punishment_warns":1} + +def setup(bot): + bot.add_cog(ModerModule(bot)) + +class ModerModule(commands.Cog): + def __init__(self, client): + self.client = client + self.DataBaseManager = self.client.DataBaseManager + + @commands.Cog.listener() + async def on_ready(self): + print(f'KrekModBot moderation module activated') + + @commands.slash_command(name="действие") + async def action_slash(self, ctx: disnake.AppCmdInter, member: disnake.Member): + models = self.client.DataBaseManager.model_classes + DataBaseManager = self.client.DataBaseManager + + staff_users_roles_model = self.DataBaseManager.model_classes['staff_users_roles'] + staff_users_model = self.DataBaseManager.model_classes['staff_users'] + staff_roles_model = self.DataBaseManager.model_classes['staff_roles'] + staff_branches_model = self.DataBaseManager.model_classes['staff_branches'] + + class ActionSelect(disnake.ui.Select): + client = self.client + max_strength_role = self.max_strength_role + def __init__(self, member, reprimand_branches: list, other_on: bool): + self.member=member + placeholder = "Выберите действие" + options = [] + for branch in reprimand_branches: + options += [disnake.SelectOption(label=f"Выговор в ветке {branch.purpose}", value=f"reprimand:{branch.id}")] + if other_on: + options += [ + disnake.SelectOption(label="Предупреждение", value="warning"), + disnake.SelectOption(label="Войс мут", value="voicemute"), + disnake.SelectOption(label="Текстовый мут", value="textmute"), + disnake.SelectOption(label="Бан", value="ban"), + disnake.SelectOption(label="Сменить ник", value="changenick"), + ] + options += [disnake.SelectOption(label="Удалить наказание", value="deletepenalties")] + super().__init__(placeholder = placeholder, min_values = 1, max_values = 1, options = options) + + async def callback(self, interaction:disnake.MessageInteraction): + + class ActionModal(disnake.ui.Modal): + client = self.client + def __init__(self, title, member): + self.member = member + self.title = title + if title.split(":")[0] == "textmute" or title.split(":")[0] == "voicemute" or title.split(":")[0] == "ban": + components = [ + disnake.ui.TextInput(label="Время", placeholder="Например: 1д15ч9мин", custom_id="time", style=disnake.TextInputStyle.short, max_length=100), + disnake.ui.TextInput(label = "Причина", placeholder="Например: правило 1.3" , custom_id="reason", style=disnake.TextInputStyle.paragraph, max_length=100) + ] + if title.split(":")[0] == "warning": + components = [ + disnake.ui.TextInput(label = "Причина", placeholder="Например: правило 1.3" , custom_id="reason", style=disnake.TextInputStyle.paragraph, max_length=100) + ] + if title.split(":")[0] == "reprimand": + components = [ + disnake.ui.TextInput(label = "Причина", placeholder="Например: неправомерный бан" , custom_id="reason", style=disnake.TextInputStyle.paragraph, max_length=100) + ] + if title.split(":")[0] == "changenick": + components = [ + disnake.ui.TextInput(label="Новый ник", placeholder="Например: Humanoid", custom_id="newnick", style=disnake.TextInputStyle.short, max_length=32) + ] + super().__init__(title = title, components = components, timeout=300) + + async def callback(self, interaction: disnake.Interaction): + + async def voicemute(interaction: disnake.MessageInteraction, member: disnake.Member, time, reason): + role = self.client.voice_mute + await member.add_roles(role) + await member.move_to(None) + async with DataBaseManager.session() as session: + async with session.begin(): + new_punishment = models['punishment_mutes_voice'](user_id = member.id, reason = reason, time_end = time, time_warn = None, moderator_id = interaction.author.id) + session.add(new_punishment) + async def textmute(interaction: disnake.MessageInteraction, member: disnake.Member, time, reason): + role = self.client.text_mute + await member.add_roles(role) + async with DataBaseManager.session() as session: + async with session.begin(): + new_punishment = models['punishment_mutes_text'](user_id = member.id, reason = reason, time_end = time, time_warn = None, moderator_id = interaction.author.id) + session.add(new_punishment) + async def ban(interaction: disnake.MessageInteraction, member: disnake.Member, time, reason): + role = self.client.ban_role + await member.move_to(None) + if time-datetime.datetime.timestamp(datetime.datetime.now())>0: + await member.add_roles(role) + async with DataBaseManager.session() as session: + async with session.begin(): + new_punishment = models['punishment_bans'](user_id = member.id, reason = reason, time_end = time, moderator_id = interaction.author.id) + session.add(new_punishment) + return + await member.add_roles(role) + async with DataBaseManager.session() as session: + async with session.begin(): + new_punishment = models['punishment_perms'](user_id = member.id, reason = reason, moderator_id = interaction.author.id) + session.add(new_punishment) + for channel in interaction.guild.channels: + if isinstance(channel, disnake.TextChannel): + await channel.purge(limit=10, check=lambda m: m.author==member) + async def warning(interaction: disnake.MessageInteraction, member: disnake.Member, reason): + async with DataBaseManager.session() as session: + async with session.begin(): + new_punishment = models['punishment_warns'](user_id = member.id, reason = reason, time_warn = float(self.client.TimeFormater(days = 30)), moderator_id = interaction.author.id) + session.add(new_punishment) + async def reprimand(interaction: disnake.MessageInteraction, member: disnake.Member, branchid, reason): + async with DataBaseManager.session() as session: + async with session.begin(): + new_punishment = models['punishment_reprimands'](user_id = member.id, branch_id = branchid, reason = reason, time_warn = float(self.client.TimeFormater(days = 80)), designated_user_id = interaction.author.id) + session.add(new_punishment) + async def changenick(interaction: disnake.MessageInteraction, member: disnake.Member, newnick): + await member.edit(nick=newnick) + + await interaction.response.defer(ephemeral=True) + embed = disnake.Embed(title=f"{translate[self.title.split(':')[0]]}", description = f"{self.member.mention} ({self.member.id})") + bans_channel = disnake.utils.get(interaction.guild.channels, id = 1219644103973671035) + warns_channel = disnake.utils.get(interaction.guild.channels, id = 1219644151184887838) + muts_channel = disnake.utils.get(interaction.guild.channels, id = 1219644125469474889) + reprimands_channel = disnake.utils.get(interaction.guild.channels, id = self.client.constants['reprimandlog_channel']) + logs_channel = disnake.utils.get(interaction.guild.channels, id = 490730651629387776) + formated_time = None + reason = "" + newnick = "" + if self.title.split(':')[0] == "reprimand": + embed.add_field(name = 'Ветка', value = self.title.split(':')[1], inline = False) + for key, value in interaction.text_values.items(): + if key == "time": + formated_time = self.client.TimeFormater(value) + embed.add_field(name = translate[key], value = str(formated_time), inline = True) + elif key == "reason": + reason = value + embed.add_field(name = translate[key], value = f"```{value}```", inline = False) + elif key == "newnick": + newnick = value + embed.add_field(name = "", value=f"`{self.member.nick!r} ==> {newnick!r}`", inline = False) + else: + embed.add_field(name = translate[key], value = value, inline = False) + embed.set_footer(text = f"{interaction.author.name}\n{interaction.author.id}") + embed.set_thumbnail(url=self.member.avatar) + + await self.client.bt_send({"type": "punishment", "options": {"severity": self.title.split(':')[0], "member": self.member.id, "moderator": interaction.author.id}}) + + if self.title.split(":")[0] == "reprimand": + + async with DataBaseManager.session() as session: + async with session.begin(): + stmt = ( + DataBaseManager.select(models['punishment_reprimands']) + .where( + models['punishment_reprimands'].user_id == self.member.id, + models['punishment_reprimands'].branch_id == int(self.title.split(':')[1]) + ) + ) + result = (await session.execute(stmt)).scalars().all() + + await reprimand(interaction, self.member, int(self.title.split(':')[1]), reason) + embed.add_field(name = "Всего выговоров в этой ветке", value = str(len(result)+1), inline = False) + await reprimands_channel.send(embed=embed) + + else: + if self.title == "textmute": + await textmute(interaction, self.member, float(formated_time), reason) + await muts_channel.send(embed=embed) + + await interaction.author.send("Скопируй и вставь следующее сообщение с прикреплением доказательств в канал \"[mutlog](https://discord.com/channels/1219110919536115802/1219114151566114847)\"!!!") + await interaction.author.send(f"```{self.member.id}/{str(formated_time)}/{reason}```") + + elif self.title == "voicemute": + await voicemute(interaction, self.member, float(formated_time), reason) + await muts_channel.send(embed=embed) + + await interaction.author.send("Скопируй и вставь следующее сообщение с прикреплением доказательств в канал \"[mutlog](https://discord.com/channels/1219110919536115802/1219114151566114847)\"!!!") + await interaction.author.send(f"```{self.member.id}/{str(formated_time)}/{reason}```") + + elif self.title == "ban": + await ban(interaction, self.member, float(formated_time), reason) + await bans_channel.send(embed=embed) + + await interaction.author.send("Скопируй и вставь следующее сообщение с прикреплением доказательств в канал \"[banlog](https://discord.com/channels/1219110919536115802/1219110920060407850)\"!!!") + await interaction.author.send(f"```{self.member.id}/{str(formated_time)}/{reason}```") + + elif self.title == "warning": + await warning(interaction, self.member, reason) + await warns_channel.send(embed=embed) + + await interaction.author.send("Скопируй и вставь следующее сообщение с прикреплением доказательств в канал \"[warnlog](https://discord.com/channels/1219110919536115802/1219129098257956947)\"!!!") + await interaction.author.send(f"```{self.member.id}/{reason}```") + + elif self.title == "changenick": + await changenick(interaction, self.member, newnick) + await logs_channel.send(embed=embed) + else: + await logs_channel.send(embed=embed) + + counter = 0 + async with DataBaseManager.session() as session: + async with session.begin(): + for i in ["punishment_mutes_text", "punishment_mutes_voice", "punishment_bans", "punishment_warns"]: + stmt = DataBaseManager.select(models[i]).where(models[i].user_id == self.member.id) + result = (await session.execute(stmt)).scalars().all() + counter += count_translate[i] * len(result) + if counter>=18: + await ban(interaction, self.member, 0, "Набрал больше 18 'очков наказаний'") + await bans_channel.send(embed=disnake.Embed(title=f"Достойное достижение!", description = f"Поздравляем! {self.member.mention}({self.member.id}) наконец набрал целых {counter} очков наказания и получил свою награду: вечный бан!")) + + await interaction.edit_original_response(embed=embed) + + class DeletePenaltiesSelect(disnake.ui.Select): + client = self.client + max_strength_role = self.max_strength_role + def __init__(self, member): + self.member=member + + async def initialization(self, embed): + member = self.member + placeholder = "Выберите наказание для снятия" + options=[] + async with DataBaseManager.session() as session: + async with session.begin(): + + result = [] + stmt = self.max_strength_role(user_id = ctx.author.id, is_moder = True, is_admin = True) + author_is_moderator = (await session.execute(stmt)).first() is not None + + stmt = self.max_strength_role(user_id = ctx.author.id, is_admin = True) + author_is_admin = (await session.execute(stmt)).first() + + if author_is_admin: + stmt = DataBaseManager.select(models['punishment_reprimands']).where(models['punishment_reprimands'].user_id == member.id) + result += (await session.execute(stmt)).scalars().all() + + if author_is_moderator: + stmt = DataBaseManager.select(models['punishment_mutes_text']).where(models['punishment_mutes_text'].user_id == member.id) + result += (await session.execute(stmt)).scalars().all() + stmt = DataBaseManager.select(models['punishment_mutes_voice']).where(models['punishment_mutes_voice'].user_id == member.id) + result += (await session.execute(stmt)).scalars().all() + stmt = DataBaseManager.select(models['punishment_bans']).where(models['punishment_bans'].user_id == member.id) + result += (await session.execute(stmt)).scalars().all() + stmt = DataBaseManager.select(models['punishment_warns']).where(models['punishment_warns'].user_id == member.id) + result += (await session.execute(stmt)).scalars().all() + stmt = DataBaseManager.select(models['punishment_perms']).where(models['punishment_perms'].user_id == member.id) + result += (await session.execute(stmt)).scalars().all() + + result = sorted(result, key=lambda a: a.time_begin, reverse=True) + + for penalt in result: + match penalt.get_table_name(): + case 'punishment_mutes_text': + embed.add_field(name = f"{penalt.id} : Текстовый мут", value = f"Заканчивается {''.format(time_end = int(penalt.time_end)) if penalt.time_warn is None else ' (предупреждение)'.format(time_warn = int(penalt.time_warn))}\n```{penalt.reason}```", inline = False) + options.append(disnake.SelectOption(label=f"{penalt.id}) Текстовый мут", value=f"{penalt.get_table_name()}:{member.id}:{penalt.id}")) + case 'punishment_mutes_voice': + embed.add_field(name = f"{penalt.id} : Голосовой мут", value = f"Заканчивается {''.format(time_end = int(penalt.time_end)) if penalt.time_warn is None else ' (предупреждение)'.format(time_warn = int(penalt.time_warn))}\n```{penalt.reason}```", inline = False) + options.append(disnake.SelectOption(label=f"{penalt.id}) Голосовой мут", value=f"{penalt.get_table_name()}:{member.id}:{penalt.id}")) + case 'punishment_bans': + embed.add_field(name = f"{penalt.id} : Бан", value = f"Заканчивается {''.format(time_end = int(penalt.time_end)) if not penalt.time_end is None else 'никогда (предупреждение)'}\n```{penalt.reason}```", inline = False) + options.append(disnake.SelectOption(label=f"{penalt.id}) Бан", value=f"{penalt.get_table_name()}:{member.id}:{penalt.id}")) + case 'punishment_warns': + embed.add_field(name = f"{penalt.id} : Предупреждение", value = f"Заканчивается \n```{penalt.reason}```", inline = False) + options.append(disnake.SelectOption(label=f"{penalt.id}) Предупреждение", value=f"{penalt.get_table_name()}:{member.id}:{penalt.id}")) + case 'punishment_reprimands': + embed.add_field(name = f"{penalt.id} : Выговор", value = f"Заканчивается \n```{penalt.reason}```", inline = False) + options.append(disnake.SelectOption(label=f"{penalt.id}) Выговор", value=f"{penalt.get_table_name()}:{member.id}:{penalt.id}")) + case 'punishment_perms': + embed.add_field(name = f"{penalt.id} : Вечный бан", value = f"```{penalt.reason}```", inline = False) + options.append(disnake.SelectOption(label=f"{penalt.id}) Вечный бан", value=f"{penalt.get_table_name()}:{member.id}:{penalt.id}")) + + if len(options)==0: + embed.add_field(name = f"Наказаний нет", value = f" ", inline = False) + options.append(disnake.SelectOption(label="Наказаний нет")) + super().__init__(placeholder = placeholder, min_values = 1, max_values = 1, options = options) + + return self + async def callback(self, interaction:disnake.MessageInteraction): + values = interaction.values[0] + try: + pentype, member, penaltid = values.split(":") + except ValueError: + return + if values: + logs_channel = disnake.utils.get(interaction.guild.channels, id = 490730651629387776) + + async with DataBaseManager.session() as session: + async with session.begin(): + stmt = DataBaseManager.select(models[pentype]).where( + DataBaseManager.and_( + models[pentype].id == int(penaltid), + models[pentype].user_id == int(member) + ) + ).with_for_update() + result = (await session.execute(stmt)).scalars().first() + await session.delete(result) + + if pentype=="punishment_perms" or pentype=="punishment_bans": + await self.member.remove_roles(self.client.ban_role) + await self.client.bt_send({"type": "unpunishment", "options": {"severity": "ban", "member": int(member)}}) + elif pentype=="punishment_mutes_text": + await self.member.remove_roles(self.client.text_mute) + await self.client.bt_send({"type": "unpunishment", "options": {"severity": "textmute", "member": int(member)}}) + elif pentype=="punishment_mutes_voice": + await self.member.remove_roles(self.client.voice_mute) + await self.client.bt_send({"type": "unpunishment", "options": {"severity": "voicemute", "member": int(member)}}) + elif pentype=="punishment_reprimands": + await self.client.bt_send({"type": "unpunishment", "options": {"severity": "reprimand", "member": int(member)}}) + elif pentype=="punishment_warns": + await self.client.bt_send({"type": "unpunishment", "options": {"severity": "warning", "member": int(member)}}) + + view = disnake.ui.View(timeout=30) + embed = disnake.Embed(title="Выберите наказание для снятия", description = f"{self.member.mention}\n({self.member.id})", colour = 0x008000) + view.add_item(await DeletePenaltiesSelect(self.member).initialization(embed)) + await interaction.response.edit_message(embed = embed, view=view) + + await interaction.followup.send(embed = disnake.Embed(description = f"Наказание {pentype} успешно аннулировано!", colour = 0x008000), ephemeral=True) + await logs_channel.send(embed = disnake.Embed(description = f"{interaction.author.mention}({interaction.author.id}) успешно аннулировал наказание {pentype}({penaltid}) у пользователя {member}", colour = 0x008000),) + + + if "textmute" in self.values[0] or "voicemute" in self.values[0] or "ban" in self.values[0] or\ + "warning" in self.values[0] or "changenick" in self.values[0] or "reprimand" in self.values[0]: + modal = ActionModal(self.values[0], self.member) + await interaction.response.send_modal(modal) + elif "deletepenalties" in self.values[0]: + view = disnake.ui.View(timeout=30) + embed = disnake.Embed(title="Выберите наказание для снятия", description = f"{self.member.mention}\n({self.member.id})", colour = 0x008000) + view.add_item(await DeletePenaltiesSelect(self.member).initialization(embed)) + await interaction.response.send_message(embed = embed, ephemeral=True, view=view) + await ctx.response.defer(ephemeral=True) + + async with self.DataBaseManager.session() as session: + + stmt = self.max_strength_role(ctx.author.id) + author_strenght = (await session.execute(stmt)).first() + if author_strenght is None: + await ctx.edit_original_message(embed = disnake.Embed(description = f"Вы должны иметь должность в персонале полей, чтобы действовать", colour = 0xff9900)) + return + + stmt = ( + self.DataBaseManager.select(staff_users_roles_model) + .options( + self.DataBaseManager.joinedload(staff_users_roles_model.role), + self.DataBaseManager.joinedload(staff_users_roles_model.branch) + ) + .where(staff_users_roles_model.user_id == member.id) + ) + member_roles = (await session.execute(stmt)).scalars().all() + + stmt = ( + self.DataBaseManager.select(staff_users_roles_model.role_id) + .where(staff_users_roles_model.user_id == ctx.author.id) + ) + author_roles = set((await session.execute(stmt)).scalars().all()) + + reprimand_branches = [] + + for role in member_roles: + stmt = ( + self.DataBaseManager.select(staff_roles_model.id) + .join(staff_branches_model, staff_roles_model.branch_id == staff_branches_model.id) + .where( + self.DataBaseManager.tuple_(staff_branches_model.layer, staff_roles_model.layer) < (role.branch.layer, role.role.layer) + ) + .order_by( + staff_branches_model.layer.asc(), + staff_roles_model.layer.asc() + ) + ) + grand_roles = set((await session.execute(stmt)).scalars().all()) + + if bool(author_roles & grand_roles): + reprimand_branches += [role.branch] + + stmt = self.max_strength_role(ctx.author.id, is_moder = True, is_admin = True) + author_is_moderator = (await session.execute(stmt)).all() + stmt = self.max_strength_role(member.id, is_moder = True, is_admin = True) + member_is_moderator = (await session.execute(stmt)).all() + + class View(disnake.ui.View): + async def on_timeout(self): + await ctx.edit_original_message(view=None) + + if (not reprimand_branches) and (not ((not member_is_moderator) and author_is_moderator)): + await ctx.edit_original_message(embed = disnake.Embed(description = f"Вы ничего не можете сделать этому пользователю", colour = 0xff9900)) + return + + embed = disnake.Embed(title="Выберите действие", description = f"{member.mention}\n({member.id})", colour = 0x008000) + embed.set_thumbnail(url=member.avatar) + view = View(timeout=30) + view.add_item(ActionSelect(member = member, reprimand_branches = reprimand_branches, other_on = (not member_is_moderator) and author_is_moderator)) + await ctx.edit_original_message(embed = embed, view=view) + + ''' + + Продвижения и т.п. + + ''' + + def max_strength_role(self, user_id: int, branch_ids: list = [], is_admin: bool = False, is_moder: bool = False): + staff_users_roles_model = self.DataBaseManager.model_classes['staff_users_roles'] + staff_users_model = self.DataBaseManager.model_classes['staff_users'] + staff_roles_model = self.DataBaseManager.model_classes['staff_roles'] + staff_branches_model = self.DataBaseManager.model_classes['staff_branches'] + + stmt = ( + self.DataBaseManager.select(staff_branches_model.layer, staff_roles_model.layer) + .select_from(staff_users_roles_model) + .join(staff_users_model, staff_users_roles_model.user_id == staff_users_model.id) + .join(staff_roles_model, staff_users_roles_model.role_id == staff_roles_model.id) + .join(staff_branches_model, staff_roles_model.branch_id == staff_branches_model.id) + .order_by( + staff_branches_model.layer.asc(), + staff_roles_model.layer.asc() + ) + ) + if is_admin or is_moder or branch_ids: + stmt = stmt.where( + self.DataBaseManager.and_( + self.DataBaseManager.or_( + staff_branches_model.id.in_(branch_ids), + staff_branches_model.is_admin == is_admin if is_admin else self.DataBaseManager.false(), + staff_branches_model.is_moder == is_moder if is_moder else self.DataBaseManager.false() + ), + staff_users_model.id == user_id + ) + ) + else: + stmt = stmt.where( + staff_users_model.id == user_id + ) + + return stmt + + async def promotions_add_remove_role(self, ctx, userid, add_roleid = None, remove_roleid = None): + krekchat = await self.client.fetch_guild(self.client.krekchat.id) + member = await krekchat.fetch_member(userid) + if not add_roleid is None: + add_role = await krekchat.fetch_role(add_roleid) + try: + await member.add_roles(add_role) + except disnake.errors.Forbidden: + await ctx.channel.send(ctx.author.mention, embed = disnake.Embed(description = f'Недостаточно прав. Пользователю {member.mention} необходимо ручное добавление роли {add_role.mention}', colour = 0xff9900)) + + if not remove_roleid is None: + remove_role = await krekchat.fetch_role(remove_roleid) + try: + await member.remove_roles(remove_role) + except disnake.errors.Forbidden: + await ctx.channel.send(ctx.author.mention, embed = disnake.Embed(description = f'Недостаточно прав. Пользователю {member.mention} необходимо ручное удаление роли {remove_role.mention}', colour = 0xff9900)) + + + @commands.slash_command(description="Позволяет повысить пользователя в указанной ветке", name="повысить", administrator=True) + async def promote(self, ctx: disnake.AppCmdInter, branchid: int = commands.Param(description="Укажите id ветви", name="ветвь", default = None), + userid: str = commands.Param(description="Укажите id пользователя (используются идентификаторы дискорда)", name="пользователь")): + + userid = int(userid) + + async with self.DataBaseManager.session() as session: + async with session.begin(): + + staff_users_roles_model = self.DataBaseManager.model_classes['staff_users_roles'] + staff_users_model = self.DataBaseManager.model_classes['staff_users'] + staff_roles_model = self.DataBaseManager.model_classes['staff_roles'] + staff_branches_model = self.DataBaseManager.model_classes['staff_branches'] + + if branchid is None: + stmt = ( + self.DataBaseManager.select(staff_users_roles_model) + .options( + self.DataBaseManager.selectinload(staff_users_roles_model.role) + .selectinload(staff_roles_model.branch) + ) + .where( + staff_users_roles_model.user_id == userid + ) + ) + user_roles = (await session.execute(stmt)).scalars().all() + + if len(user_roles) != 1: + await ctx.send(embed = disnake.Embed(description = f'Для данной операции необходимо уточнение id ветки!', colour = 0xff9900)) + return 1 + + else: + branchid = user_roles[0].role.branch.id + + stmt = self.max_strength_role(user_id = ctx.author.id, branch_ids = [branchid], is_admin = True) + author_upper_role = (await session.execute(stmt)).first() + + stmt = self.max_strength_role(user_id = userid, branch_ids = [branchid]) + member_upper_role = (await session.execute(stmt)).first() + + if author_upper_role is None or ((not member_upper_role is None) and author_upper_role >= member_upper_role): + await ctx.send(embed = disnake.Embed(description = f'Вы должны быть выше должности, на которую назначаете пользователя', colour = 0xff9900)) + return 1 + + stmt = ( + self.DataBaseManager.select(staff_roles_model) + .options( + self.DataBaseManager.selectinload(staff_roles_model.branch) + ) + .where(staff_roles_model.branch_id == branchid) + .order_by(staff_roles_model.layer.desc()) + ) + branch_roles = (await session.execute(stmt)).scalars().all() + + stmt = ( + self.DataBaseManager.select(staff_users_roles_model) + .join(staff_users_model, staff_users_roles_model.user_id == staff_users_model.id) + .join(staff_roles_model, staff_users_roles_model.role_id == staff_roles_model.id) + .join(staff_branches_model, staff_roles_model.branch_id == staff_branches_model.id) + .options( + self.DataBaseManager.contains_eager(staff_users_roles_model.user), + self.DataBaseManager.contains_eager(staff_users_roles_model.role) + .contains_eager(staff_roles_model.branch) + ) + .where( + self.DataBaseManager.and_( + staff_branches_model.id == branchid, + staff_users_model.id == userid + ) + ) + ).with_for_update() + member_role = (await session.execute(stmt)).scalars().first() + + if member_role is not None: + member_role_index = next((i for i, role in enumerate(branch_roles) if role.id == member_role.role.id), None) + + if member_role_index + 1 >= len(branch_roles): + await ctx.send(embed = disnake.Embed(description = f'Этот пользователь уже находится на самой высокой должности в данной ветви', colour = 0xff9900)) + return 1 + + target_role = branch_roles[member_role_index + 1] + if author_upper_role >= (target_role.branch.layer, target_role.layer): + await ctx.send(embed = disnake.Embed(description = f'Вы должны быть выше должности, на которую назначаете пользователя', colour = 0xff9900)) + return 1 + + previous_role_id = member_role.role_id + member_role.role_id = target_role.id + await ctx.send(embed = disnake.Embed(description = f'Пользователь <@{userid}> успешно назначен на роль <@&{target_role.id}>', colour = 0xff9900)) + await self.promotions_add_remove_role(ctx, userid, add_roleid = target_role.id, remove_roleid = previous_role_id) + return 0 + else: + member_check = await session.get(staff_users_model, userid) + if member_check is None: + await ctx.send(embed = disnake.Embed(description = f'Необходимо изначально зарегистрировать пользователя, попросите разработчика использовать \"/правка_пользователя\"', colour = 0xff9900)) + return 1 + + member_role_index = 0 + + if member_role_index >= len(branch_roles): + await ctx.send(embed = disnake.Embed(description = f'В данной ветви пока нет ролей', colour = 0xff9900)) + return 1 + + target_role = branch_roles[member_role_index] + if author_upper_role >= (target_role.branch.layer, target_role.layer): + await ctx.send(embed = disnake.Embed(description = f'Вы должны быть выше должности, на которую назначаете пользователя', colour = 0xff9900)) + return 1 + + user_role = await staff_users_roles_model.create_with_auto_branch(session, user_id = userid, role_id = target_role.id) + session.add(user_role) + await ctx.send(embed = disnake.Embed(description = f'Пользователь <@{userid}> успешно назначен на роль <@&{target_role.id}>', colour = 0xff9900)) + await self.promotions_add_remove_role(ctx, userid, add_roleid = target_role.id) + return 0 + + @commands.slash_command(description="Позволяет понизить пользователя в указанной ветке", name="понизить", administrator=True) + async def demote(self, ctx: disnake.AppCmdInter, branchid: int = commands.Param(description="Укажите id ветви", name="ветвь", default = None), + userid: str = commands.Param(description="Укажите id пользователя (используются идентификаторы дискорда)", name="пользователь")): + + userid = int(userid) + + async with self.DataBaseManager.session() as session: + async with session.begin(): + + staff_users_roles_model = self.DataBaseManager.model_classes['staff_users_roles'] + staff_users_model = self.DataBaseManager.model_classes['staff_users'] + staff_roles_model = self.DataBaseManager.model_classes['staff_roles'] + staff_branches_model = self.DataBaseManager.model_classes['staff_branches'] + + if branchid is None: + stmt = ( + self.DataBaseManager.select(staff_users_roles_model) + .options( + self.DataBaseManager.selectinload(staff_users_roles_model.role) + .selectinload(staff_roles_model.branch) + ) + .where( + staff_users_roles_model.user_id == userid + ) + ) + user_roles = (await session.execute(stmt)).scalars().all() + + if len(user_roles) != 1: + await ctx.send(embed = disnake.Embed(description = f'Для данной операции необходимо уточнение id ветки!', colour = 0xff9900)) + return 1 + + else: + branchid = user_roles[0].role.branch.id + + stmt = self.max_strength_role(user_id = ctx.author.id, branch_ids = [branchid], is_admin = True) + author_upper_role = (await session.execute(stmt)).first() + + stmt = self.max_strength_role(user_id = userid, branch_ids = [branchid]) + member_upper_role = (await session.execute(stmt)).first() + + if author_upper_role is None or ((not member_upper_role is None) and author_upper_role >= member_upper_role): + await ctx.send(embed = disnake.Embed(description = f'Вы должны быть выше пользователя в должности', colour = 0xff9900)) + return 1 + + stmt = ( + self.DataBaseManager.select(staff_roles_model) + .options( + self.DataBaseManager.selectinload(staff_roles_model.branch) + ) + .where(staff_roles_model.branch_id == branchid) + .order_by(staff_roles_model.layer.desc()) + ) + branch_roles = (await session.execute(stmt)).scalars().all() + + stmt = ( + self.DataBaseManager.select(staff_users_roles_model) + .join(staff_users_model, staff_users_roles_model.user_id == staff_users_model.id) + .join(staff_roles_model, staff_users_roles_model.role_id == staff_roles_model.id) + .join(staff_branches_model, staff_roles_model.branch_id == staff_branches_model.id) + .options( + self.DataBaseManager.contains_eager(staff_users_roles_model.user), + self.DataBaseManager.contains_eager(staff_users_roles_model.role) + .contains_eager(staff_roles_model.branch) + ) + .where( + self.DataBaseManager.and_( + staff_branches_model.id == branchid, + staff_users_model.id == userid + ) + ) + ).with_for_update() + member_role = (await session.execute(stmt)).scalars().first() + + if member_role is not None: + member_role_index = next((i for i, role in enumerate(branch_roles) if role.id == member_role.role.id), None) + + if member_role_index == 0: + stmt = ( + self.DataBaseManager.select(staff_users_roles_model) + .where(staff_users_roles_model.user_id == userid) + ) + result = (await session.execute(stmt)).scalars().all() + await self.promotions_add_remove_role(ctx, userid, remove_roleid = member_role.role_id) + if len(result) <= 1: + stmt = self.DataBaseManager.delete(staff_users_model).where(staff_users_model.id == userid) + await session.execute(stmt) + await ctx.send(embed = disnake.Embed(description = f'Пользователь <@{userid}> полностью удалён из системы', colour = 0xff9900)) + await self.promotions_add_remove_role(ctx, userid, remove_roleid = self.client.staff.id) + return 0 + else: + await session.delete(member_role) + await ctx.send(embed = disnake.Embed(description = f'Пользователь <@{userid}> снят со всех должностей в ветви {branchid}', colour = 0xff9900)) + return 0 + + target_role = branch_roles[member_role_index - 1] + previous_role_id = member_role.role_id + member_role.role_id = target_role.id + await ctx.send(embed = disnake.Embed(description = f'Пользователь <@{userid}> успешно понижен до роли <@&{target_role.id}> в ветви {branchid}', colour = 0xff9900)) + await self.promotions_add_remove_role(ctx, userid, add_roleid = target_role.id, remove_roleid = previous_role_id) + return 0 + else: + await ctx.send(embed = disnake.Embed(description = f'Данный пользователь не имеет роли в ветке {branchid}', colour = 0xff9900)) + return 1 \ No newline at end of file diff --git a/src/cogs/resetsupcommands.py b/src/cogs/resetsupcommands.py new file mode 100644 index 0000000..3b24100 --- /dev/null +++ b/src/cogs/resetsupcommands.py @@ -0,0 +1,14 @@ +import disnake +from disnake.ext import commands + +def setup(bot: commands.Bot): + bot.add_cog(ExampleCog(bot)) + print("ExampleCog загружен!") + +class ExampleCog(commands.Cog): + def __init__(self, bot: commands.Bot): + self.bot = bot + + @commands.slash_command(name="ping", description="Проверка бота на работоспособность") + async def ping(self, inter: disnake.ApplicationCommandInteraction): + await inter.response.send_message(f"Понг! {round(self.bot.latency * 1000)}мс") \ No newline at end of file diff --git a/src/cogs/users.py b/src/cogs/users.py new file mode 100644 index 0000000..e81e2fd --- /dev/null +++ b/src/cogs/users.py @@ -0,0 +1,280 @@ +import disnake +from disnake.ext import commands +from disnake.ext import tasks +import requests +import numpy as np +import asyncio +import sys +import os +import copy +import datetime +import math +import random +import json +import shutil + + +def setup(bot): + bot.add_cog(UIModule(bot)) + +class UIModule(commands.Cog): + def __init__(self, client): + self.client = client + self.DataBaseManager = self.client.DataBaseManager + + @commands.Cog.listener() + async def on_ready(self): + print(f'KrekModBot UI module activated') + + @commands.slash_command(description="Показывает действительные наказания пользователя", name="наказания") + async def penalties(self, ctx: disnake.AppCmdInter, member: disnake.Member = None): + models = self.client.DataBaseManager.model_classes + if not member: + member = ctx.author + embed = disnake.Embed(title="Наказания", description = f"{member.mention}", colour = 0x008000) + embed.set_thumbnail(url=member.avatar) + + async with self.DataBaseManager.session() as session: + async with session.begin(): + stmt = self.DataBaseManager.select(models['punishment_mutes_text']).where(models['punishment_mutes_text'].user_id == member.id) + result = (await session.execute(stmt)).scalars().all() + stmt = self.DataBaseManager.select(models['punishment_mutes_voice']).where(models['punishment_mutes_voice'].user_id == member.id) + result += (await session.execute(stmt)).scalars().all() + stmt = self.DataBaseManager.select(models['punishment_bans']).where(models['punishment_bans'].user_id == member.id) + result += (await session.execute(stmt)).scalars().all() + stmt = self.DataBaseManager.select(models['punishment_warns']).where(models['punishment_warns'].user_id == member.id) + result += (await session.execute(stmt)).scalars().all() + stmt = self.DataBaseManager.select(models['punishment_reprimands']).where(models['punishment_reprimands'].user_id == member.id) + result += (await session.execute(stmt)).scalars().all() + stmt = self.DataBaseManager.select(models['punishment_perms']).where(models['punishment_perms'].user_id == member.id) + result += (await session.execute(stmt)).scalars().all() + + result = sorted(result, key=lambda a: a.time_begin, reverse=True) + + for penalt in result: + match penalt.get_table_name(): + case 'punishment_mutes_text': + embed.add_field(name = f"Текстовый мут", value = f"Заканчивается {''.format(time_end = int(penalt.time_end)) if penalt.time_warn is None else ' (предупреждение)'.format(time_warn = int(penalt.time_warn))}\n```{penalt.reason}```", inline = False) + case 'punishment_mutes_voice': + embed.add_field(name = f"Голосовой мут", value = f"Заканчивается {''.format(time_end = int(penalt.time_end)) if penalt.time_warn is None else ' (предупреждение)'.format(time_warn = int(penalt.time_warn))}\n```{penalt.reason}```", inline = False) + case 'punishment_bans': + embed.add_field(name = f"Бан", value = f"Заканчивается {''.format(time_end = int(penalt.time_end)) if not penalt.time_end is None else 'никогда (предупреждение)'}\n```{penalt.reason}```", inline = False) + case 'punishment_warns': + embed.add_field(name = f"Предупреждение", value = f"Заканчивается \n```{penalt.reason}```", inline = False) + case 'punishment_reprimands': + embed.add_field(name = f"Выговор", value = f"Заканчивается \n```{penalt.reason}```", inline = False) + case 'punishment_perms': + embed.add_field(name = f"Вечный бан", value = f"```{penalt.reason}```", inline = False) + + if len(embed.fields)==0: + embed.add_field(name = f"Наказаний нет", value = f"", inline = False) + + await ctx.response.send_message(embed = embed) + + + @commands.slash_command(description="Подайте жалобу на нарушение правил сервера или действия модератора", name="жалоба") + async def report(self, ctx: disnake.AppCmdInter, member: disnake.Member = commands.Param(description="На кого хотите подать жалобу?", name="пользователь"), reason: str = commands.Param(description="Кратко опишите причину жалобы", name="причина")): + + async def ReportCallback(ctx, member, report_message, reason, embed, mentions, mod): + + async def AcceptCallback(report_message, member, embed, mod, call, channel): + if call.author == mod.author: + await channel.delete() + embed.set_footer(text = f"Рассмотрено в пользу исца\n{mod.author.name} ({mod.author.id})") + embed.colour=0x008000 + await report_message.edit(content = "", embed=embed, view=None) + await member.send(embed = disnake.Embed(title=f"", description = f"После разбора нарушения модератор {mod.author.mention} признал вас виновным", colour = 0xff9900)) + + await self.client.bt_send({"type": "complaint", "options": {"accepted": True, "attack_member": ctx.author.id, "defence_member": member.id, "moderator": mod.author.id}}) + else: + await call.send(embed = disnake.Embed(title=f"", description = f"Только судья может использовать эти команды", colour = 0xff9900), ephemeral=True) + + async def DenyCallback(report_message, member, embed, mod, call, channel): + if call.author == mod.author: + await channel.delete() + embed.set_footer(text = f"Рассмотрено в пользу ответчика\n{mod.author.name} ({mod.author.id})") + embed.colour=0x008000 + await report_message.edit(content = "", embed=embed, view=None) + #await member.send(embed = disnake.Embed(title=f"", description = f"После разбора нарушения модератор {mod.author.mention} признал вас невиновным", colour = 0xff9900)) + + await self.client.bt_send({"type": "complaint", "options": {"accepted": False, "attack_member": ctx.author.id, "defence_member": member.id, "moderator": mod.author.id}}) + else: + await call.send(embed = disnake.Embed(title=f"", description = f"Только судья может использовать эти команды", colour = 0xff9900), ephemeral=True) + + class AttackClass: + def __init__(self, member, channel, mod): + self.member=member + self.channel=channel + self.mod=mod + self.permatt=True + async def callback(self, mod): + if mod.author == self.mod.author: + await self.channel.set_permissions(self.member, read_messages = True, read_message_history=True, send_messages=self.permatt) #read_messages=self.permatt + await mod.send(embed = disnake.Embed(title=f"", description = f"{self.member.mention}-{'право ответа включено' if self.permatt else 'право ответа выключено'}", colour = 0x008000), ephemeral=True) + self.permatt=not self.permatt + else: + await mod.send(embed = disnake.Embed(title=f"", description = f"Только судья может использовать эти команды", colour = 0xff9900), ephemeral=True) + class DefenceClass: + def __init__(self, member, channel, mod): + self.member=member + self.channel=channel + self.mod=mod + self.permdef=True + async def callback(self, mod): + if mod.author == self.mod.author: + await self.channel.set_permissions(self.member, read_messages = True, read_message_history=True, send_messages=self.permdef) #read_messages=self.permdef + await mod.send(embed = disnake.Embed(title=f"", description = f"{self.member.mention}-{'право ответа включено' if self.permdef else 'право ответа выключено'}", colour = 0x008000), ephemeral=True) + self.permdef=not self.permdef + else: + await mod.send(embed = disnake.Embed(title=f"", description = f"Только судья может использовать эти команды", colour = 0xff9900), ephemeral=True) + + if not any(i.mention in mentions for i in mod.author.roles): + await mod.send(embed = disnake.Embed(description = f"Вы не можете принять этот репорт", colour = 0xff9900), ephemeral=True) + return + + if mod.author == ctx.author: + await mod.send(embed = disnake.Embed(description = f"Вы не можете принять свой же репорт", colour = 0xff9900), ephemeral=True) + return + + embed.set_footer(text = f"Принято\n{mod.author.name} ({mod.author.id})") + embed.colour=0x008000 + await report_message.edit(content = "", embed=embed, view=None) + + parsing_channel = await ctx.guild.create_text_channel( + name=f"⚠️Жалоба от {ctx.author.name} на {member}", + overwrites = {ctx.author: disnake.PermissionOverwrite(read_messages=True, send_messages=False, read_message_history=True, attach_files=True), + mod.author: disnake.PermissionOverwrite(read_messages=True, send_messages=True, read_message_history=True, attach_files=True), + member: disnake.PermissionOverwrite(read_messages=True, send_messages=False, read_message_history=True, attach_files=True), + self.client.everyone: disnake.PermissionOverwrite(read_messages=False, send_messages=False, read_message_history=False)}, + category=ctx.guild.get_channel(1220744958961778850) + ) + + view = disnake.ui.View(timeout=86400) + + btnatt = disnake.ui.Button(label="⚔️", style=disnake.ButtonStyle.primary) + view.add_item(btnatt) + btndef = disnake.ui.Button(label="⚰️", style=disnake.ButtonStyle.primary) + view.add_item(btndef) + btnacc = disnake.ui.Button(label="✅", style=disnake.ButtonStyle.primary) + view.add_item(btnacc) + btnden = disnake.ui.Button(label="❌", style=disnake.ButtonStyle.primary) + view.add_item(btnden) + + attack = AttackClass(ctx.author, parsing_channel, mod) + defence = DefenceClass(member, parsing_channel, mod) + + pin = await parsing_channel.send( f"{mod.author.mention}" ,embed = disnake.Embed(title=f"Жалоба", description = f"Вы вызвались судить {member.mention} по жалобе от {ctx.author.mention}\n\ + ⚔️ - Дать {ctx.author.mention} право ответа\n\ + ⚰️ - Дать {member.mention} право ответа\n\ + ✅ - Виновен\n\ + ❌ - Невиновен\n\ + Перед закрытием дела убедитесь, что сохранили все доказательства", colour = 0x008000), view=view) + btnatt.callback = lambda mod: attack.callback(mod) + btndef.callback = lambda mod: defence.callback(mod) + btnacc.callback = lambda call: AcceptCallback(report_message, member, embed, mod, call, parsing_channel) + btnden.callback = lambda call: DenyCallback(report_message, member, embed, mod, call, parsing_channel) + await pin.pin() + + if member==ctx.author: + await ctx.send(embed = disnake.Embed(description = f'Нельзя подать жалобу на самого себя!', colour = 0xFF4500), ephemeral=True) + return + mentions = [] + + report_channel = disnake.utils.get(ctx.guild.channels, id = 1219644036378394746) + + highest = [i for i in self.client.hierarchy if i in member.roles][0] + for i in range(0, self.client.hierarchy.index(highest)): + mentions.append(f"{self.client.hierarchy[i].mention}") + + + if len(mentions)==0: + mentions.append(f"{self.client.me.mention}") + + report_embed = disnake.Embed(title=f"**Жалоба**", colour = 0xDC143C) + + report_embed.add_field(name=f"Обвинитель: ", value = f"{ctx.author.mention}\n({ctx.author.id})", inline=True) + report_embed.add_field(name=f"Обвиняемый: ", value = f"{member.mention}\n({member.id})", inline=True) + report_embed.add_field(name=f"Причина: ", value = f"```{reason}```", inline=False) + view = disnake.ui.View(timeout=86400) + btn = disnake.ui.Button(label="✅", style=disnake.ButtonStyle.primary) + view.add_item(btn) + report_embed.set_thumbnail(url=member.avatar) + report_message = await report_channel.send(", ".join(mentions), embed = report_embed, view=view) + btn.callback = lambda mod: ReportCallback(ctx,member,report_message,reason,report_embed,mentions,mod) + + await ctx.send(embed = disnake.Embed(description = f'Жалоба на {member.mention} успешно подана', colour = 0x008000), ephemeral=True) + + ''' + + Иерархия + + ''' + + @commands.slash_command(description="Показывает весь персонал, по ролям и веткам", name="иерархия", administrator=True) + async def hierarchy(self, ctx: disnake.AppCmdInter, branchid: int = commands.Param(description="Укажите id ветки, в которой вам нужна иерархия", name="ветвь", default=None), + devmod: bool = commands.Param(description="Показывать подробную информацию?", name="devmode", default=False)): + + async with self.DataBaseManager.session() as session: + async with session.begin(): + + if branchid is None: + stmt = ( + self.DataBaseManager.select(self.DataBaseManager.model_classes['staff_branches']) + .options( + self.DataBaseManager.selectinload(self.DataBaseManager.model_classes['staff_branches'].roles) + ) + .order_by( + self.DataBaseManager.model_classes['staff_branches'].layer.asc() + ) + ) + branches = (await session.execute(stmt)).scalars().all() + for branch in branches: + branch.roles.sort(key=lambda role: role.layer) + + embed = disnake.Embed(title = f"", description = f"# Общая иерархия\n", colour = 0xff9900) + for branchcounter, branch in enumerate(branches, start=1): + embed.description += f"## {branchcounter}) {branch.purpose}" + if devmod: + embed.description += f" id:({branch.id}) layer:({branch.layer})" + embed.description += "\n" + + for rolecounter, role in enumerate(branch.roles, start=1): + embed.description += f"### {rolecounter}. <@&{role.id}>" + if devmod: + embed.description += f" layer:({role.layer})" + embed.description += "\n" + await ctx.send(embed = embed) + else: + branch = await session.get(self.DataBaseManager.model_classes['staff_branches'], branchid) + + stmt = ( + self.DataBaseManager.select(self.DataBaseManager.model_classes['staff_branches']) + .options( + self.DataBaseManager.selectinload(self.DataBaseManager.model_classes['staff_branches'].roles) + .selectinload(self.DataBaseManager.model_classes['staff_roles'].users) + ) + .where( + self.DataBaseManager.model_classes['staff_branches'].id == branchid + ) + ) + branch = (await session.execute(stmt)).scalars().first() + if branch is None: + await ctx.send(embed = disnake.Embed(description = f'Ветви с идентификатором {branchid} не существует', colour = 0xff9900)) + return 1 + + branch.roles.sort(key=lambda role: role.layer) + + embed = disnake.Embed(title = f"", description = f"# Иерархия по ветви {branch.purpose}\n", colour = 0xff9900) + + for rolecounter, role in enumerate(branch.roles, start=1): + embed.description += f"## {rolecounter}) <@&{role.id}>" + if devmod: + embed.description += f" layer:({role.layer})" + embed.description += "\n" + + for membercounter, user in enumerate(role.users, start=1): + embed.description += f"### - <@{user.user_id}>" + if devmod: + embed.description += f" update_time:()" + embed.description += "\n" + await ctx.send(embed = embed) \ No newline at end of file diff --git a/src/constants/global_constants.py b/src/constants/global_constants.py new file mode 100644 index 0000000..d90e9ac --- /dev/null +++ b/src/constants/global_constants.py @@ -0,0 +1,24 @@ +constants = { + "krekchat": 490445877903622144, + "sponsors": [648932777755934740, 926021742151999509, 959416622144180265, 995602360325902386, 995602360325902387, + 995602360325902388, 995602360325902389, 712751688741814343, 1244009309000437780, 1285667409365176372], + "mutes": [1219615791750709268, #text + 1219615979185770597],#voice + "ban_role": 1219276355875639407, + "me": 887696340920963072, + "moder": 490712181927837722, + "curator": 490712205445169162, + "everyone": 490445877903622144, + "staff": 1229765545310818377, + "level_roles": [None, 490707199526567946, 490707605711486985, 490707615429689375, 490707634710642700], + "moderators": [490712181927837722, 490712205445169162], + "hierarchy": [490712205445169162, 490712181927837722, 490445877903622144], + + # logs_channels + "mutelog_channel": 1219644125469474889, + "banlog_channel": 1219644103973671035, + "warnlog_channel": 1219644151184887838, + "reprimandlog_channel": 1380593220513173534, + "bots_talk_protocol_channel": 1376233239202758827, + "databases_backups_channel": 1382363252683706448, + } \ No newline at end of file diff --git a/src/database/db_classes.py b/src/database/db_classes.py new file mode 100644 index 0000000..e774973 --- /dev/null +++ b/src/database/db_classes.py @@ -0,0 +1,206 @@ +from sqlalchemy import Column, Integer, BigInteger, Text, Float, ForeignKey, UniqueConstraint, MetaData, Boolean +from sqlalchemy.orm import declarative_base, relationship, Mapped, mapped_column +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession +from sqlalchemy.orm import sessionmaker, relationship, DeclarativeBase +from sqlalchemy.schema import CreateTable +from sqlalchemy.sql import text +import datetime +from typing import Annotated + +class Base(DeclarativeBase): + + def get_table_name(self): + return self.__tablename__ + + def to_dict(self, exclude: list[str] = None): + """Конвертирует модель в словарь, исключая указанные поля.""" + if exclude is None: + exclude = [] + + return { + c.name: getattr(self, c.name) + for c in self.__table__.columns + if c.name not in exclude + } + +discord_identificator_pk = Annotated[int, mapped_column(BigInteger, primary_key=True, nullable=False, index = True)] +identificator_pk = Annotated[int, mapped_column(Integer, primary_key=True, nullable=False, autoincrement=True, index = True)] +discord_identificator = Annotated[int, mapped_column(BigInteger, nullable=False, index=True)] + +# Модели для системы наказаний +class PunishmentTextMute(Base): + __tablename__ = 'punishment_mutes_text' + id: Mapped[identificator_pk] + user_id: Mapped[discord_identificator] + reason: Mapped[str | None] = mapped_column(Text) + time_end: Mapped[float | None] = mapped_column(Float) + time_warn: Mapped[float | None] = mapped_column(Float) + + time_begin: Mapped[float] = mapped_column(Float, nullable=False, server_default=text("EXTRACT(EPOCH FROM NOW())")) + moderator_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True, index=True, default = None) + + +class PunishmentVoiceMute(Base): + __tablename__ = 'punishment_mutes_voice' + id: Mapped[identificator_pk] + user_id: Mapped[discord_identificator] + reason: Mapped[str | None] = mapped_column(Text) + time_end: Mapped[float | None] = mapped_column(Float) + time_warn: Mapped[float | None] = mapped_column(Float) + + time_begin: Mapped[float] = mapped_column(Float, nullable=False, server_default=text("EXTRACT(EPOCH FROM NOW())")) + moderator_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True, index=True, default = None) + + +class PunishmentWarn(Base): + __tablename__ = 'punishment_warns' + id: Mapped[identificator_pk] + user_id: Mapped[discord_identificator] + reason: Mapped[str | None] = mapped_column(Text) + time_warn: Mapped[float] = mapped_column(Float) + + time_begin: Mapped[float] = mapped_column(Float, nullable=False, server_default=text("EXTRACT(EPOCH FROM NOW())")) + moderator_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True, index=True, default = None) + + +class PunishmentBan(Base): + __tablename__ = 'punishment_bans' + id: Mapped[identificator_pk] + user_id: Mapped[discord_identificator] + reason: Mapped[str | None] = mapped_column(Text) + time_end: Mapped[float | None] = mapped_column(Float) + + time_begin: Mapped[float] = mapped_column(Float, nullable=False, server_default=text("EXTRACT(EPOCH FROM NOW())")) + moderator_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True, index=True, default = None) + + +class PunishmentPerm(Base): + __tablename__ = 'punishment_perms' + id: Mapped[identificator_pk] + user_id: Mapped[discord_identificator] + reason: Mapped[str | None] = mapped_column(Text) + + time_begin: Mapped[float] = mapped_column(Float, nullable=False, server_default=text("EXTRACT(EPOCH FROM NOW())")) + moderator_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True, index=True, default = None) + + +class PunishmentReprimand(Base): + __tablename__ = 'punishment_reprimands' + id: Mapped[identificator_pk] + user_id: Mapped[discord_identificator] + reason: Mapped[str | None] = mapped_column(Text) + time_warn: Mapped[float] = mapped_column(Float) + branch_id: Mapped[int] = mapped_column(ForeignKey('staff_branches.id', ondelete='CASCADE')) + + time_begin: Mapped[float] = mapped_column(Float, nullable=False, server_default=text("EXTRACT(EPOCH FROM NOW())")) + designated_user_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True, index=True, default = None) + + +# Модели для системы персонала +class StaffBranch(Base): + __tablename__ = 'staff_branches' + id: Mapped[identificator_pk] + layer: Mapped[int] = mapped_column(Integer, nullable=False) + purpose: Mapped[str] = mapped_column(Text) + is_admin: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) + is_moder: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) + + roles: Mapped[list["StaffRole"]] = relationship( + back_populates="branch", + primaryjoin="StaffBranch.id==StaffRole.branch_id" + ) + users: Mapped[list["StaffUserRole"]] = relationship( + back_populates="branch", + primaryjoin="StaffBranch.id==StaffUserRole.branch_id" + ) + curations: Mapped[list["StaffCuration"]] = relationship( + back_populates="branch", + primaryjoin="StaffBranch.id==StaffCuration.branch_id" + ) + +class StaffRole(Base): + __tablename__ = 'staff_roles' + id: Mapped[discord_identificator_pk] + layer: Mapped[int] = mapped_column(Integer, nullable=False) + staff_salary: Mapped[int] = mapped_column(Integer, nullable=False) + branch_id: Mapped[int] = mapped_column(ForeignKey('staff_branches.id', ondelete='CASCADE')) + + branch: Mapped["StaffBranch"] = relationship( + back_populates="roles", + primaryjoin="StaffRole.branch_id==StaffBranch.id" + ) + users: Mapped[list["StaffUserRole"]] = relationship( + back_populates="role", + primaryjoin="StaffRole.id==StaffUserRole.role_id" + ) + +class StaffUser(Base): + __tablename__ = 'staff_users' + id: Mapped[discord_identificator_pk] + + roles: Mapped[list["StaffUserRole"]] = relationship(back_populates="user") + curators: Mapped[list["StaffCuration"]] = relationship( + back_populates="apprentice", + primaryjoin="StaffUser.id==StaffCuration.apprentice_id" + ) + apprentices: Mapped[list["StaffCuration"]] = relationship( + back_populates="curator", + primaryjoin="StaffUser.id==StaffCuration.curator_id" + ) + + +class StaffUserRole(Base): + __tablename__ = 'staff_users_roles' + id: Mapped[identificator_pk] + user_id: Mapped[int] = mapped_column(ForeignKey('staff_users.id', ondelete='CASCADE')) + role_id: Mapped[int] = mapped_column(ForeignKey('staff_roles.id', ondelete='CASCADE')) + branch_id: Mapped[int] = mapped_column(ForeignKey('staff_branches.id', ondelete='CASCADE')) + description: Mapped[str | None] = mapped_column(Text) + update_time: Mapped[float] = mapped_column(Float, + server_default=text("EXTRACT(EPOCH FROM NOW())"), + server_onupdate=text("EXTRACT(EPOCH FROM NOW())") + ) + + user: Mapped["StaffUser"] = relationship(back_populates="roles") + branch: Mapped["StaffBranch"] = relationship(back_populates="users") + role: Mapped["StaffRole"] = relationship(back_populates="users") + + __table_args__ = ( + UniqueConstraint('user_id', 'branch_id', name='uq_user_branch'), + ) + + @classmethod + async def create_with_auto_branch(cls, session, user_id: int, role_id: int, **kwargs): + # Получаем роль + role = await session.get(StaffRole, role_id) + if not role: + raise ValueError("Роль не найдена") + + return cls(user_id=user_id, role_id=role_id, branch_id=role.branch_id, **kwargs) + + +class StaffCuration(Base): + __tablename__ = 'staff_curation' + id: Mapped[identificator_pk] + apprentice_id: Mapped[int] = mapped_column(ForeignKey('staff_users.id', ondelete='CASCADE')) + curator_id: Mapped[int] = mapped_column(ForeignKey('staff_users.id', ondelete='CASCADE')) + branch_id: Mapped[int] = mapped_column(ForeignKey('staff_branches.id', ondelete='CASCADE')) + + apprentice: Mapped["StaffUser"] = relationship( + back_populates="curators", + foreign_keys=[apprentice_id] + ) + curator: Mapped["StaffUser"] = relationship( + back_populates="apprentices", + foreign_keys=[curator_id] + ) + branch: Mapped["StaffBranch"] = relationship(back_populates="curations") + + __table_args__ = ( + UniqueConstraint('apprentice_id', 'curator_id', 'branch_id', name='uq_apprentice_curator_branch'), + ) + + +all_data = { + 'base': Base +} \ No newline at end of file diff --git a/src/database/settings/config.py b/src/database/settings/config.py new file mode 100644 index 0000000..b64e250 --- /dev/null +++ b/src/database/settings/config.py @@ -0,0 +1,17 @@ +from database.settings.db_settings import * + +class Settings: + + def __init__(self): + self.DB_HOST = DB_HOST + self.DB_PORT = DB_PORT + self.DB_USER = DB_USER + self.DB_PASS = DB_PASS + self.DB_NAME = DB_NAME + + @property + def DB_URL(self): + return f"postgresql+asyncpg://{self.DB_USER}:{self.DB_PASS}@{self.DB_HOST}:{self.DB_PORT}/{self.DB_NAME}" + +settings = Settings() + \ No newline at end of file diff --git a/src/managers/DataBaseManager.py b/src/managers/DataBaseManager.py new file mode 100644 index 0000000..6470c4c --- /dev/null +++ b/src/managers/DataBaseManager.py @@ -0,0 +1,137 @@ +import disnake +from disnake.ext import commands +from disnake.ext import tasks +from typing import Optional, Union, List, Dict, Any, AsyncIterator, Tuple +import datetime +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker +from sqlalchemy.exc import IntegrityError +from sqlalchemy.orm import joinedload, selectinload, contains_eager, aliased +from sqlalchemy import select, delete, insert, update +from sqlalchemy import func, asc, desc, false, tuple_ +from sqlalchemy import and_, or_, not_ +import subprocess +import os +from types import SimpleNamespace + +class Model: + def __init__(self, model): + self.model = model + self.m = model + + async def __aenter__(self): + return self.model + + async def __aexit__(self, exc_type, exc_val, exc_tb): + return None + +class DatabaseManager: + + def __init__(self, engine, tables_data): + self.engine = engine + self.session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + self.metadata = tables_data['base'].metadata + + self.model_classes = {} + tables = self.metadata.tables + for table_name, table in tables.items(): + model_class = next((cls for cls in tables_data['base'].__subclasses__() if cls.__tablename__ == table_name), None) + if model_class: + #print(f"Table: {table_name}, Model Class: {model_class.__name__}") + self.model_classes[table_name] = model_class + + self.models = {table_name: Model(model) for table_name, model in self.model_classes.items()} + + self.tables_data = tables_data + + #функции sqlalchemy + self.joinedload = joinedload + self.selectinload = selectinload + self.contains_eager = contains_eager + self.aliased = aliased + self.select = select + self.delete = delete + self.insert = insert + self.update = update + self.func = func + self.desc = desc + self.asc = asc + self.false = false + self.tuple_ = tuple_ + self.and_ = and_ + self.or_ = or_ + self.not_ = not_ + + #ошибки sqlalchemy + exceptions = SimpleNamespace( + IntegrityError = IntegrityError + ) + + async def close(self): + await self.engine.dispose() + + async def pg_dump(self, echo=False, backup_file='src/backups/discord_moderation_bot_backup.sql'): + conn = await self.engine.connect() + db_name = self.engine.url.database + user = self.engine.url.username + host = self.engine.url.host + port = self.engine.url.port + password = self.engine.url.password + + os.environ['PGPASSWORD'] = password + command = [ + 'pg_dump', + '-h', host, + '-p', str(port), + '-U', user, + '-F', 'p', # <-- plain text SQL + ] + (['-v'] if echo else []) + [ + '-f', backup_file, + db_name + ] + + try: + subprocess.run(command, check=True) + if echo: + print(f"{datetime.datetime.now():%H:%M:%S %d-%m-%Y} :: SQL backup of '{db_name}' created.") + except subprocess.CalledProcessError as e: + print(f"{datetime.datetime.now():%H:%M:%S %d-%m-%Y} :: Backup failed: {e}") + finally: + await conn.close() + return backup_file + + async def pg_restore(self, echo = False, backup_file = 'backups/backup_file.backup'): + conn = await self.DataBaseManager.engine.connect() + db_name = self.DataBaseManager.engine.url.database + user = self.DataBaseManager.engine.url.username + host = self.DataBaseManager.engine.url.host + port = self.DataBaseManager.engine.url.port + password = self.DataBaseManager.engine.url.password + + os.environ['PGPASSWORD'] = password # Установка пароля для подключения + command = [ + 'pg_restore', + '-h', host, + '-p', str(port), + '-U', user, + '-d', db_name, # Имя базы данных, в которую будет восстановлено + ] + ([ + '-v' # Подробный вывод + ] if echo else []) + [ + backup_file # Путь к файлу резервной копии + ] + + try: + subprocess.run(command, check=True) + if echo: + print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Database '{db_name}' restored successfully from '{backup_file}'.") + except subprocess.CalledProcessError as e: + print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Error during restore: {e}") + finally: + await conn.close() + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + pass + \ No newline at end of file diff --git a/src/managers/old_DataBaseManager.py b/src/managers/old_DataBaseManager.py new file mode 100644 index 0000000..7012dda --- /dev/null +++ b/src/managers/old_DataBaseManager.py @@ -0,0 +1,239 @@ +try: + import aiosqlite + import disnake + from disnake.ext import commands + from disnake.ext import tasks +except: + import pip + + pip.main(['install', 'disnake']) + pip.main(['install', 'aiosqlite']) + import disnake + from disnake.ext import commands + from disnake.ext import tasks + import aiosqlite +from typing import Optional, Union, List, Dict, Any, AsyncIterator, Tuple +from asyncio import Lock +import datetime + +class DatabaseManager: + """Расширенное соединение с БД с поддержкой транзакций и удобных методов""" + + def __init__(self, connection: aiosqlite.Connection): + self._connection = connection + self._transaction_depth = 0 + self._closed = False + self._transaction_lock = Lock() + self.last_error = None + + @classmethod + async def connect(cls, database: str, **kwargs) -> 'DatabaseManager': + """Альтернатива конструктору для подключения""" + connection = await aiosqlite.connect(database, **kwargs) + return cls(connection) + + async def UpdateBD(self, table: str, *, change: dict, where: dict, whereandor = "AND"): + request = () + + change_request = [] + for i in change.keys(): + change_request.append(f"{i} = ?") + request = request + (change[i],) + + where_request = [] + for i in where.keys(): + where_request.append(f"{i} = ?") + request = request + (where[i],) + + await self.execute('UPDATE {table} SET {change} WHERE {where}' + .format(table = table, change = ", ".join(change_request), where = f" {whereandor} ".join(where_request)), request) + return 0 + + async def SelectBD(self, table: str, *, select: list = ["*"], where: dict = None, where_ops: dict = None, whereandor = "AND", order_by: str = None, limit: int = None): + + where_combined = {} + if where: + where_combined.update({f"{k} =": v for k, v in where.items()}) + if where_ops: + where_combined.update(where_ops) + + request = () + where_clauses = "" + + for condition, value in where_combined.items(): + field_op = condition.split() + field = field_op[0] + op = "=" if len(field_op) == 1 else field_op[1] + if where_clauses == "": + where_clauses= where_clauses + f"{field} {op} ? " + else: + where_clauses= where_clauses + f"{whereandor} {field} {op} ? " + request += (value,) + + query = "SELECT {select} FROM {table}".format( + select=", ".join(select), + table=table + ) + + if where_clauses: + query += f" WHERE {where_clauses}" + + if order_by: + query += f" ORDER BY {order_by}" + + if limit: + query += f" LIMIT {limit}" + + async with await self.execute(query, request) as cursor: + return [i for i in await cursor.fetchall()] + + async def GetStaffJoins(): + query = \ + """ + SELECT sur.userid, sur.roleid, sur.description, sur.starttime, sr.staffsalary, sbr.layer as rolelayer, sbr.branchid, sb.layer as branchlayer, sb.purpose + FROM staff_users_roles AS sur + JOIN staff_roles as sr ON sr.roleid = sur.roleid + JOIN staff_branches_roles as sbr ON sbr.roleid = sur.roleid + JOIN staff_branches as sb ON sb.branchid = sbr.branchid + ORDER BY branchlayer ASC, rolelayer ASC; + """ + async with await self.execute(query, request) as cursor: + answer = [i for i in await cursor.fetchall()] + result = [{'userid': userid, 'roleid': roleid, 'description': description, 'starttime': starttime, 'staffsalary': staffsalary, 'rolelayer': rolelayer, 'branchid': branchid, 'branchlayer': branchlayer, 'purpose': purpose} for userid, roleid, description, starttime, staffsalary, rolelayer, branchid, branchlayer, purpose in answer] + return result + + async def DeleteBD(self, table: str, *, where: dict, whereandor = "AND"): + request = () + where_request = [] + for i in where.keys(): + where_request.append(f"{i} = ?") + request = request + (where[i],) + await self.execute("DELETE FROM {table} where {where}" + .format(table = table, where = f" {whereandor} ".join(where_request)), request) + return 0 + + async def InsertBD(self, table: str, *, data: dict): + request = () + keys = list(data.keys()) + qstring = [] + for i in keys: + request = request + (data[i],) + qstring.append("?") + await self.execute("INSERT INTO {table}({keys}) VALUES({values})" + .format(table = table, keys = ", ".join(keys), values = ", ".join(qstring)), request) + return 0 + + async def execute(self, sql: str, parameters: Optional[Union[Tuple, Dict]] = None, **kwargs) -> aiosqlite.Cursor: + """ + Универсальный execute, который автоматически определяет: + - Нужно ли начинать транзакцию (для INSERT/UPDATE/DELETE вне транзакции) + - Работает ли уже внутри транзакции (не создаёт вложенные транзакции) + """ + is_modifying = sql.strip().upper().startswith(("INSERT", "UPDATE", "DELETE")) + + # Если это модифицирующий запрос И мы НЕ внутри транзакции + if is_modifying and self._transaction_depth == 0: + async with self: # Автоматические begin/commit + cursor = await self._connection.execute(sql, parameters or (), **kwargs) + await cursor.close() # Важно: закрываем курсор для COMMIT + return cursor + else: + # Для SELECT или работы внутри существующей транзакции + return await self._connection.execute(sql, parameters or (), **kwargs) + + async def fetch_all(self, sql: str, parameters: Optional[Union[Tuple, Dict]] = None) -> List[Tuple]: + """Выполняет запрос и возвращает все строки""" + async with await self.execute(sql, parameters) as cursor: + return await cursor.fetchall() + + async def fetch_one(self, sql: str, parameters: Optional[Union[Tuple, Dict]] = None) -> Optional[Tuple]: + """Выполняет запрос и возвращает первую строку""" + async with await self.execute(sql, parameters) as cursor: + return await cursor.fetchone() + + async def fetch_val(self, sql: str, parameters: Optional[Union[Tuple, Dict]] = None, column: int = 0) -> Any: + """Возвращает значение из первого столбца""" + row = await self.fetch_one(sql, parameters) + return row[column] if row else None + + async def insert(self, table: str, data: Dict[str, Any], on_conflict: str = None) -> int: + """Упрощенный INSERT с поддержкой ON CONFLICT""" + keys = data.keys() + values = list(data.values()) + + sql = f""" + INSERT INTO {table} ({', '.join(keys)}) + VALUES ({', '.join(['?']*len(keys))}) + """ + + if on_conflict: + sql += f" ON CONFLICT {on_conflict}" + + await self.execute(sql, values) + return self.lastrowid + + async def update(self, table: str, where: Dict[str, Any], changes: Dict[str, Any], where_operator: str = "AND") -> int: + """Упрощенный UPDATE с автоматическим построением WHERE""" + set_clause = ", ".join([f"{k} = ?" for k in changes.keys()]) + where_clause = f" {where_operator} ".join([f"{k} = ?" for k in where.keys()]) + + sql = f""" + UPDATE {table} + SET {set_clause} + WHERE {where_clause} + """ + + result = await self.execute(sql, [*changes.values(), *where.values()]) + return result.rowcount + + async def begin(self): + """Начать транзакцию (с поддержкой вложенности)""" + async with self._transaction_lock: + if self._transaction_depth == 0: + await self._connection.execute("BEGIN IMMEDIATE") + self._transaction_depth += 1 + + async def commit(self): + """Зафиксировать транзакцию""" + async with self._transaction_lock: + if self._transaction_depth == 1: + await self._connection.commit() + self._transaction_depth = max(0, self._transaction_depth - 1) + + async def rollback(self): + """Откатить транзакцию""" + async with self._transaction_lock: + if self._transaction_depth > 0: + await self._connection.rollback() + self._transaction_depth = 0 + + async def close(self) -> None: + """Безопасное закрытие соединения с учётом транзакций""" + async with self._transaction_lock: + try: + # Откатываем активную транзакцию, если есть + if self._transaction_depth > 0: + await self._connection.rollback() + self._transaction_depth = 0 + + # Закрываем соединение + if hasattr(self._connection, '_connection'): # Проверка внутреннего состояния + await self._connection.close() + except Exception as e: + self.last_error = f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Ошибка при закрытии соединения: {e}" + print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Ошибка при закрытии соединения: {e}") + finally: + # Помечаем соединение как закрытое + self._closed = True + + async def __aenter__(self): + await self.begin() # Используем собственный метод begin + return self # Возвращаем сам менеджер, а не соединение + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if exc_type is None: + await self.commit() + else: + self.last_error = f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Во время записи в бд произошла ошибка: {exc_type}({exc_val}): {exc_tb.tb_frame.f_code.co_filename}(строка {exc_tb.tb_lineno})!" + print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Во время записи в бд произошла ошибка: {exc_type}({exc_val}): {exc_tb.tb_frame.f_code.co_filename}(строка {exc_tb.tb_lineno})!") + await self.rollback() \ No newline at end of file diff --git a/src/test.py b/src/test.py new file mode 100644 index 0000000..e785194 --- /dev/null +++ b/src/test.py @@ -0,0 +1,103 @@ +try: + import disnake + from disnake.ext import commands + from disnake.ext import tasks + import requests + import numpy as np + import aiohttp + #from colorthief import ColorThief +except: + import pip + + pip.main(['install', 'disnake']) + #pip.main(['install', 'matplotlib']) + pip.main(['install', 'requests']) + #pip.main(['install', 'Pillow']) + pip.main(['install', 'numpy']) + pip.main(['install', 'aiohttp']) + #pip.main(['install', 'colorthief']) + import disnake + from disnake.ext import commands + from disnake.ext import tasks + import numpy as np + import requests + import aiohttp + #from colorthief import ColorThief +import asyncio +import sys +import os +import copy +import datetime +import math +import random +import json +import shutil +from constants.global_constants import * +from data.TOKENS import TOKENS + +import CoreMod + + +async def main(): + stop_event = asyncio.Event() + sup_bot = None + DataBase = None + all_bots = [] + + try: + DataBase = await CoreMod.init_db() + #sup_bot = CoreMod.MainBot(DataBase, stop_event) + sup_bot = CoreMod.AnyBots(DataBase) + all_bots = [sup_bot] + + #НЕ СМЕЙ РАСКОММЕНТИРОВАТЬ + #await CoreMod.db_migration(DataBase) + + ''' + users = [ + DataBase.model_classes['staff_users'](id = 78173123), + DataBase.model_classes['staff_users'](id = 6345345) + ] + + async with DataBase.session() as session: + async with session.begin(): + #session.add_all(users) + + stmt = CoreMod.select(DataBase.model_classes['staff_users']).where(DataBase.model_classes['staff_users'].id == 78173123) + result = (await session.execute(stmt)).scalars().all() + for i in result: + await session.delete(i) + print(result) + ''' + + + + # Загрузка когов + sup_bot.load_extension("cogs.resetsupcommands") + sup_bot.load_extension("cogs.moderators") + sup_bot.load_extension("cogs.users") + sup_bot.load_extension("cogs.administrators") + + # Запуск монитора остановки и ботов + monitor_task = asyncio.create_task(CoreMod.monitor_stop(stop_event, all_bots)) + bot_tasks = [ + asyncio.create_task(CoreMod.run_bot(sup_bot, TOKENS["KrekSupBot"], stop_event)), + ] + + await asyncio.gather(*bot_tasks, monitor_task) + + except KeyboardInterrupt: + print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Боты остановлены по запросу пользователя") + except Exception as e: + print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Произошла критическая ошибка: {e}") + finally: + # Остановка всех ботов + stop_event.set() + for bot in all_bots: + if not bot.is_closed(): + await bot.close() + if DataBase is not None: + await DataBase.close() + +if __name__ == "__main__": + asyncio.run(main())