first commit

This commit is contained in:
2025-07-07 00:28:19 +03:00
commit 942365efb2
16 changed files with 2640 additions and 0 deletions

11
.gitignore vendored Normal file
View File

@@ -0,0 +1,11 @@
venv/
__pycache__/
*.pyc
.env
.secrets
db_settings.py
src/data/secrets/
src/backups/

0
README.md Normal file
View File

6
exclude.txt Normal file
View File

@@ -0,0 +1,6 @@
venv/
__pycache__/
*.pyc
.env
.secrets
backups/

8
filter.txt Normal file
View File

@@ -0,0 +1,8 @@
+ /ModBot/**
- /ModBot/venv/**
- /ModBot/__pycache__/**
- /ModBot/backups/**
- /ModBot/.env
- /ModBot/.secrets
- /ModBot/*.pyc
- *

20
requirements.txt Normal file
View File

@@ -0,0 +1,20 @@
aiohappyeyeballs==2.6.1
aiohttp==3.12.12
aiosignal==1.3.2
async-timeout==5.0.1
asyncpg==0.30.0
attrs==25.3.0
certifi==2025.4.26
charset-normalizer==3.4.2
disnake==2.10.1
frozenlist==1.7.0
greenlet==3.2.3
idna==3.10
multidict==6.4.4
numpy==2.2.6
propcache==0.3.2
requests==2.32.4
SQLAlchemy==2.0.41
typing_extensions==4.14.0
urllib3==2.4.0
yarl==1.20.1

679
src/CoreMod.py Normal file
View File

@@ -0,0 +1,679 @@
import inspect
import disnake
from disnake.ext import commands
from disnake.ext import tasks
import asyncio
import sys
import os
import shutil
import datetime
from collections import Counter
from fnmatch import fnmatch
import traceback
import json
import re
from constants.global_constants import *
from data.TOKENS import TOKENS
from database.db_classes import all_data as DataBaseClasses
from managers.DataBaseManager import DatabaseManager
from managers.old_DataBaseManager import DatabaseManager as old_DatabaseManager
from database.settings import config
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import sessionmaker
from sqlalchemy.schema import CreateTable
class AnyBots(commands.Bot):
'''
Any bot class
'''
def __init__(self, DataBaseManager):
super().__init__(
command_prefix="=",
intents=disnake.Intents.all()
)
self.DataBaseManager = DataBaseManager
self.constants = constants
async def on_ready(self):
self.krekchat = await self.fetch_guild(constants["krekchat"])
print(self.krekchat.name)
self.sponsors = [disnake.utils.get(self.krekchat.roles, id=i) for i in constants["sponsors"]]
self.text_mute = disnake.utils.get(self.krekchat.roles, id=constants["mutes"][0])
self.voice_mute = disnake.utils.get(self.krekchat.roles, id=constants["mutes"][1])
self.ban_role = disnake.utils.get(self.krekchat.roles, id=constants["ban_role"])
self.me = disnake.utils.get(self.krekchat.roles, id=constants["me"])
self.moder = disnake.utils.get(self.krekchat.roles, id=constants["moder"])
self.curator = disnake.utils.get(self.krekchat.roles, id=constants["curator"])
self.everyone = disnake.utils.get(self.krekchat.roles, id=constants["everyone"])
self.staff = disnake.utils.get(self.krekchat.roles, id=constants["staff"])
self.level_roles = [disnake.utils.get(self.krekchat.roles, id=i) for i in constants["level_roles"]]
self.bots_talk_protocol_channel_id = constants["bots_talk_protocol_channel"]
self.databases_backups_channel_id = constants["databases_backups_channel"]
# lists
self.moderators = [disnake.utils.get(self.krekchat.roles, id=i) for i in constants["moderators"]]
self.hierarchy = [disnake.utils.get(self.krekchat.roles, id=i) for i in constants["hierarchy"]]
# /lists
await self.change_presence(status=disnake.Status.online, activity=disnake.Game("Работаю"))
print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: KrekModBot activated")
def TimeFormater(self, time_str: str = "", *,
years: float = 0, months: float = 0, weeks: float = 0, days: float = 0, hours: float = 0, minutes: float = 0, seconds: float = 0,
now_timestamp = None):
"""
Форматирует строку времени в timestamp и разложенное время
Поддерживает форматы: 1d2h30m, 1д2ч30мин, 1.5d, 1 день 2 часа 30 минут и т.п.
Возвращает объект класса FormatedTime
"""
class FormatedTime:
def __init__(self, time_units):
self.translator = {'years': 'лет', 'months': 'месяцев', 'weeks': 'недель', 'days': 'дней', 'hours': 'часов', 'minutes': 'минут', 'seconds': 'секунд'}
delta = datetime.timedelta(
weeks=time_units['weeks'],
days=time_units['days'] + time_units['years'] * 365 + time_units['months'] * 30,
hours=time_units['hours'],
minutes=time_units['minutes'],
seconds=time_units['seconds']
)
total_seconds = delta.total_seconds()
minutes = total_seconds // 60
seconds = total_seconds % 60
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
months = days // 30
days = days % 30
years = months // 12
months = months % 12
self.time_units = {
'years': years,
'months': months,
'days': days,
'hours': hours,
'minutes': minutes,
'seconds': seconds
}
self.future_time = 0
if now_timestamp is None:
self.future_time = datetime.datetime.now() + delta
else:
self.future_time = datetime.datetime.fromtimestamp(now_timestamp) + delta
self.timestamp = self.future_time.timestamp()
def __float__(self):
return self.timestamp
def __int__(self):
return int(self.timestamp)
def __repr__(self):
return self.__str__() + f" [{self.timestamp}]"
def __str__(self):
if self.time_is_null():
return "вечность"
else:
result = []
for key, value in self.time_units.items():
if value > 0:
result.append(f"{int(value) if key != 'seconds' else round(value, 2)} {self.translator[key]}")
return ", ".join(result)
def time_is_null(self):
return not any([i for i in self.time_units.values()])
def to_dict(self):
return self.time_units
time_units = {'years': 0, 'months': 0, 'weeks': 0, 'days': 0, 'hours': 0, 'minutes': 0, 'seconds': 0}
if any([years, months, weeks, days, hours, minutes, seconds]):
time_units = {'years': years, 'months': months, 'weeks': weeks, 'days': days, 'hours': hours, 'minutes': minutes, 'seconds': seconds}
return FormatedTime(time_units)
else:
time_str = time_str.lower().replace(' ', '').replace(',', '.')
replacements = {
# Русские
'лет': 'years',
'год': 'years',
'мес': 'months',
'нед': 'weeks',
'дн': 'days',
'д': 'days',
'день': 'days',
'дней': 'days',
'дня': 'days',
'ч': 'hours',
'час': 'hours',
'часов': 'hours',
'часа': 'hours',
'м': 'minutes',
'мин': 'minutes',
'минут': 'minutes',
'минуты': 'minutes',
'с': 'seconds',
'сек': 'seconds',
'секунд': 'seconds',
'секунды': 'seconds',
# Английские
'y': 'years',
'w': 'weeks',
'd': 'days',
'h': 'hours',
'm': 'minutes',
's': 'seconds',
'c': 'seconds'
}
pattern = re.compile(r'(\d+(?:\.\d+)?)([a-zа-я]+)', re.IGNORECASE)
def replacer(match):
number, unit = match.groups()
replacement = replacements.get(unit, unit)
return f"{number}{replacement}"
time_str = pattern.sub(replacer, time_str)
pattern = r'(\d+(\.\d+)?)(years|months|weeks|days|hours|minutes|seconds)'
matches = re.findall(pattern, time_str)
for value, _, unit in matches:
time_units[unit] += float(value)
return FormatedTime(time_units)
async def bt_send(self, info: dict = {}):
def get_all_keys(dct, keys_list=None):
if keys_list is None:
keys_list = []
for key, value in dct.items():
keys_list.append(key)
if isinstance(value, dict):
get_all_keys(value, keys_list)
return keys_list
krekchat = await self.fetch_guild(self.krekchat.id)
bt_channel = await krekchat.fetch_channel(self.bots_talk_protocol_channel_id)
punishment_keys = ['type', 'options', 'severity', 'member', 'moderator']
complaint_keys = ['type', 'options', 'accepted', 'attack_member', 'defence_member', 'moderator']
unpunishment_keys = ['type', 'options', 'severity', 'member']
if not 'type' in info:
await bt_channel.send(f"<@479210801891115009> Передан запрос без типа :: bt_send\n {traceback.extract_stack()[-2]}")
return 1
if info['type'] == "punishment":
if get_all_keys(info) != punishment_keys:
if len(get_all_keys(info)) != punishment_keys:
await bt_channel.send(f"<@479210801891115009> Требуется {len(punishment_keys)}, а принято {len(get_all_keys(info))} ключей для punishment :: bt_send\n {traceback.extract_stack()[-2]}")
else:
await bt_channel.send(f"<@479210801891115009> Требуются ключи {punishment_keys}, а приняты {get_all_keys(info)} для punishment :: bt_send\n {traceback.extract_stack()[-2]}")
return 1
elif info['type'] == "complaint":
if get_all_keys(info) != complaint_keys:
if len(get_all_keys(info)) != complaint_keys:
await bt_channel.send(f"<@479210801891115009> Требуется {len(complaint_keys)}, а принято {len(get_all_keys(info))} ключей для complaint :: bt_send\n {traceback.extract_stack()[-2]}")
else:
await bt_channel.send(f"<@479210801891115009> Требуются ключи {complaint_keys}, а приняты {get_all_keys(info)} для complaint :: bt_send\n {traceback.extract_stack()[-2]}")
return 1
elif info['type'] == "unpunishment":
if get_all_keys(info) != unpunishment_keys:
if len(get_all_keys(info)) != unpunishment_keys:
await bt_channel.send(f"<@479210801891115009> Требуется {len(unpunishment_keys)}, а принято {len(get_all_keys(info))} ключей для unpunishment :: bt_send\n {traceback.extract_stack()[-2]}")
else:
await bt_channel.send(f"<@479210801891115009> Требуются ключи {unpunishment_keys}, а приняты {get_all_keys(info)} для unpunishment :: bt_send\n {traceback.extract_stack()[-2]}")
return 1
else:
await bt_channel.send(f"<@479210801891115009> Передан неизвестный тип запроса {info['type']} :: bt_send\n {traceback.extract_stack()[-2]}")
return 1
info["sender"] = "ModBot"
await bt_channel.send(json.dumps(info))
return 0
class ErrorOutHelper:
def __init__(self, send_function, err_name: str = "", err_description: str = "", ephemeral: bool = False, echo: bool = False, thumbnail = None):
self.err_name = err_name
self.err_description = err_description
self.send_function = send_function
self.ephemeral = ephemeral
self.echo = echo
self.thumbnail = thumbnail
self.colour = 0xff0000
async def out(self, err_description: str = "", err_name: str = "", d: str = "", n: str = ""):
if d:
err_description = d
if n:
err_name = n
embed = disnake.Embed(title="", description="", colour = self.colour)
if err_name:
embed.title = err_name
else:
embed.title = self.err_name
if err_description:
embed.description = err_description
else:
embed.description = self.err_description
if not self.thumbnail is None:
embed.set_thumbnail(url = self.thumbnail)
if self.echo:
print(f"{embed.title}: {embed.description}")
if 'ephemeral' in inspect.signature(self.send_function).parameters:
await self.send_function(embed = embed, ephemeral = self.ephemeral)
else:
await self.send_function(embed = embed)
class MainBot(AnyBots):
'''
Main bot class
'''
def __init__(self, DataBase, stop_event):
super().__init__(DataBase)
self.stop_event = stop_event
async def on_ready(self):
await super().on_ready()
self.CheckDataBases.cancel()
self.MakeBackups.cancel()
self.MakeBackups.start()
self.CheckDataBases.start()
async def BotOff(self):
self.CheckDataBases.cancel()
self.MakeBackups.cancel()
self.stop_event.set()
async def on_disconnect(self):
if self.stop_event.is_set():
pass
else:
print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Соединение с дискордом разорвано")
await self.BotOff()
@tasks.loop(seconds=60)
async def CheckDataBases(self):
try:
await self.CheckDataBasesRun()
except Exception as error:
print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: err CheckDataBasesRun: {error}")
@tasks.loop(seconds=3600)
async def MakeBackups(self):
backup_file = await self.DataBaseManager.pg_dump()
krekchat = await self.fetch_guild(self.krekchat.id)
backups_channel = await krekchat.fetch_channel(self.databases_backups_channel_id)
await backups_channel.send(content=f"Бэкап бд за {datetime.datetime.now()}:", file=disnake.File(backup_file))
async def CheckDataBasesRun(self):
self.krekchat = await self.fetch_guild(self.krekchat.id)
members = [i async for i in self.krekchat.fetch_members(limit=None)]
textmute = {'mute': [], 'unmute': list(filter(lambda m: self.text_mute in m.roles, members))}
voicemute = {'mute': [], 'unmute': list(filter(lambda m: self.voice_mute in m.roles, members))}
ban = {'ban': [], 'unban': list(filter(lambda m: self.ban_role in m.roles, members))}
#муты
async with self.DataBaseManager.session() as session:
async with session.begin():
stmt = self.DataBaseManager.delete(self.DataBaseManager.model_classes['punishment_mutes_text']).where(
self.DataBaseManager.and_(
self.DataBaseManager.model_classes['punishment_mutes_text'].time_warn - datetime.datetime.now().timestamp() <= 0,
self.DataBaseManager.model_classes['punishment_mutes_text'].time_warn != None
)
)
await session.execute(stmt)
stmt = self.DataBaseManager.select(self.DataBaseManager.model_classes['punishment_mutes_text']).where(
self.DataBaseManager.or_(
self.DataBaseManager.and_(
self.DataBaseManager.model_classes['punishment_mutes_text'].time_end - datetime.datetime.now().timestamp() <= 0,
self.DataBaseManager.model_classes['punishment_mutes_text'].time_warn == None
),
self.DataBaseManager.and_(
self.DataBaseManager.model_classes['punishment_mutes_text'].time_end != None
)
)
).with_for_update()
result = (await session.execute(stmt)).scalars().all()
for penalt in result:
member = disnake.utils.get(members, id=penalt.user_id)
if not member:
continue
if penalt.time_warn is None and penalt.time_end-datetime.datetime.now().timestamp()<=0:
penalt.time_end = None
penalt.time_warn = (self.TimeFormater("30d"))[0]
if (not penalt.time_warn is None) and (not penalt.time_end is None):
penalt.time_end = None
if not penalt.time_end is None:
if member in textmute['unmute']:
stmt = self.DataBaseManager.select(self.DataBaseManager.model_classes['punishment_mutes_text']).where(
self.DataBaseManager.and_(
self.DataBaseManager.model_classes['punishment_mutes_text'].user_id == member.id,
self.DataBaseManager.model_classes['punishment_mutes_text'].time_end != None
)
)
member_pens = (await session.execute(stmt)).scalars().all()
if len(member_pens)>0:
textmute['unmute'].remove(member)
if not member in textmute['mute']:
textmute['mute'].append(member)
for member in textmute['mute']:
await member.add_roles(self.text_mute)
for member in textmute['unmute']:
await member.remove_roles(self.text_mute)
async with session.begin():
stmt = self.DataBaseManager.delete(self.DataBaseManager.model_classes['punishment_mutes_voice']).where(
self.DataBaseManager.and_(
self.DataBaseManager.model_classes['punishment_mutes_voice'].time_warn - datetime.datetime.now().timestamp() <= 0,
self.DataBaseManager.model_classes['punishment_mutes_voice'].time_warn != None
)
)
await session.execute(stmt)
stmt = self.DataBaseManager.select(self.DataBaseManager.model_classes['punishment_mutes_voice']).where(
self.DataBaseManager.or_(
self.DataBaseManager.and_(
self.DataBaseManager.model_classes['punishment_mutes_voice'].time_end - datetime.datetime.now().timestamp() <= 0,
self.DataBaseManager.model_classes['punishment_mutes_voice'].time_warn == None
),
self.DataBaseManager.and_(
self.DataBaseManager.model_classes['punishment_mutes_voice'].time_end != None
)
)
).with_for_update()
result = (await session.execute(stmt)).scalars().all()
for penalt in result:
member = disnake.utils.get(members, id=penalt.user_id)
if not member:
continue
if penalt.time_warn is None and penalt.time_end - datetime.datetime.now().timestamp() <= 0:
penalt.time_end = None
penalt.time_warn = (self.TimeFormater("30d"))[0]
if (not penalt.time_warn is None) and (not penalt.time_end is None):
penalt.time_end = None
if not penalt.time_end is None:
if member in voicemute['unmute']:
stmt = self.DataBaseManager.select(self.DataBaseManager.model_classes['punishment_mutes_voice']).where(
self.DataBaseManager.and_(
self.DataBaseManager.model_classes['punishment_mutes_voice'].user_id == member.id,
self.DataBaseManager.model_classes['punishment_mutes_voice'].time_end != None
)
)
member_pens = (await session.execute(stmt)).scalars().all()
if len(member_pens)>0:
voicemute['unmute'].remove(member)
if not member in voicemute['mute']:
voicemute['mute'].append(member)
for member in voicemute['mute']:
await member.add_roles(self.voice_mute)
await member.move_to(None)
for member in voicemute['unmute']:
await member.remove_roles(self.voice_mute)
#/муты
#баны
async with session.begin():
stmt = self.DataBaseManager.select(self.DataBaseManager.model_classes['punishment_bans']).where(self.DataBaseManager.model_classes['punishment_bans'].time_end != None).with_for_update()
result = (await session.execute(stmt)).scalars().all()
for penalt in result:
member = disnake.utils.get(members, id=penalt.user_id)
if not member:
continue
if penalt.time_end - datetime.datetime.now().timestamp() <= 0:
penalt.time_end = None
if penalt.time_end != None:
if member in ban['unban']:
stmt = self.DataBaseManager.select(self.DataBaseManager.model_classes['punishment_bans']).where(
self.DataBaseManager.and_(
self.DataBaseManager.model_classes['punishment_bans'].user_id == member.id,
self.DataBaseManager.model_classes['punishment_bans'].time_end != None
)
)
member_pens = (await session.execute(stmt)).scalars().all()
if len(member_pens)>0:
ban['unban'].remove(member)
if not member in ban['ban']:
ban['ban'].append(member)
async with session.begin():
stmt = self.DataBaseManager.select(self.DataBaseManager.model_classes['punishment_perms'])
result = (await session.execute(stmt)).scalars().all()
for penalt in result:
member = disnake.utils.get(members, id=penalt.user_id)
if not member:
continue
if member in ban['unban']:
stmt = self.DataBaseManager.select(self.DataBaseManager.model_classes['punishment_perms']).where(
self.DataBaseManager.model_classes['punishment_perms'].user_id == member.id
)
member_perms = (await session.execute(stmt)).scalars().all()
if len(member_perms):
ban['unban'].remove(member)
if not member in ban['ban']:
ban['ban'].append(member)
for member in ban['ban']:
await member.add_roles(self.ban_role)
await member.move_to(None)
for member in ban['unban']:
await member.remove_roles(self.ban_role)
#/баны
#преды
async with session.begin():
stmt = self.DataBaseManager.delete(self.DataBaseManager.model_classes['punishment_warns']).where(self.DataBaseManager.model_classes['punishment_warns'].time_warn - datetime.datetime.now().timestamp() <= 0)
await session.execute(stmt)
stmt = self.DataBaseManager.delete(self.DataBaseManager.model_classes['punishment_reprimands']).where(self.DataBaseManager.model_classes['punishment_reprimands'].time_warn - datetime.datetime.now().timestamp() <= 0)
await session.execute(stmt)
#/преды
async def on_message(self, msg):
if msg.author.id == 479210801891115009 and msg.content == "botsoff":
await msg.reply(embed=disnake.Embed(description=f'Бот отключён', colour=0xff9900))
await self.BotOff()
if type(msg.channel).__name__!="DMChannel" and fnmatch(msg.channel.name, "⚠жалоба-от-*-на-*"):
log_reports = disnake.utils.get(msg.guild.channels, id=1242373230384386068)
files=[]
for att in msg.attachments:
files = files + [await att.to_file()]
log_mess = await log_reports.send(f"Чат: `{msg.channel.name}`({msg.channel.id}).\n"
f"Автор: `{msg.author.name} ({msg.author.id})`\n" +
(f"Сообщение: ```{msg.content}```\n" if msg.content else ""),
files = files)
message_words = msg.content.replace("/", " ").split(" ")
if "discord.gg" in message_words:
for i in range(len(message_words)):
if message_words[i]=="discord.gg" and not msg.author.bot:
log = disnake.utils.get(msg.guild.channels, id=893065482263994378)
try:
inv = await self.fetch_invite(url = "https://discord.gg/"+message_words[i+1])
if inv.guild.id != 490445877903622144:
await log.send(f"{msg.author.mention}({msg.author.id}) отправил в чат {msg.channel.mention} сомнительную ссылку на сервер '{inv.guild.name}':```{msg.content}```")
mess = await msg.reply(embed=disnake.Embed(description=f'Ссылки-приглашения запрещены!', colour=0xff9900))
await msg.delete()
await asyncio.sleep(20)
await mess.delete()
break
except disnake.errors.NotFound:
await log.send(f"{msg.author.mention}({msg.author.id}) отправил в чат {msg.channel.mention} [сомнительную ссылку]({msg.jump_url}) на неизвестный сервер:```{msg.content}```")
async def init_db():
DataBaseEngine = create_async_engine(
config.Settings().DB_URL,
pool_size=20,
max_overflow=10,
pool_recycle=300,
pool_pre_ping=True,
#echo=True,
)
async with DataBaseEngine.begin() as conn:
await conn.run_sync(DataBaseClasses['base'].metadata.create_all)
return DatabaseManager(DataBaseEngine, DataBaseClasses)
async def db_migration(DB_MANAGER):
new_DataBase = DB_MANAGER
DataBase = await old_DatabaseManager.connect("data/penalties.db")
await DataBase.execute("PRAGMA journal_mode=WAL")
await DataBase.execute("PRAGMA synchronous=NORMAL")
await DataBase.execute("PRAGMA foreign_keys = ON")
try:
async with new_DataBase.engine.begin() as conn:
await conn.run_sync(new_DataBase.metadata.drop_all)
await conn.run_sync(new_DataBase.metadata.create_all)
async with new_DataBase.session() as session:
for penaltid, userid, reason, timend, timewarn in await DataBase.SelectBD('punishment_text_mutes'):
penault = DB_MANAGER.model_classes['punishment_mutes_text'](user_id = userid, reason = reason, time_end = timend if timend else None, time_warn = timewarn if timewarn else None)
async with session.begin():
session.add(penault)
for penaltid, userid, reason, timend, timewarn in await DataBase.SelectBD('punishment_voice_mutes'):
penault = DB_MANAGER.model_classes['punishment_mutes_voice'](user_id = userid, reason = reason, time_end = timend if timend else None, time_warn = timewarn if timewarn else None)
async with session.begin():
session.add(penault)
for penaltid, userid, reason, timend in await DataBase.SelectBD('punishment_bans'):
penault = DB_MANAGER.model_classes['punishment_bans'](user_id = userid, reason = reason, time_end = timend if timend else None)
async with session.begin():
session.add(penault)
for penaltid, userid, reason in await DataBase.SelectBD('punishment_perms'):
penault = DB_MANAGER.model_classes['punishment_perms'](user_id = userid, reason = reason)
async with session.begin():
session.add(penault)
for penaltid, userid, reason, timend in await DataBase.SelectBD('punishment_warns'):
penault = DB_MANAGER.model_classes['punishment_warns'](user_id = userid, reason = reason, time_warn = timend)
async with session.begin():
session.add(penault)
for penaltid, userid, reason, timend in await DataBase.SelectBD('punishment_reprimands'):
penault = DB_MANAGER.model_classes['punishment_reprimands'](user_id = userid, reason = reason, time_warn = timend)
async with session.begin():
session.add(penault)
finally:
await DataBase.close()
raise Exception("Миграция БД завершена. требуется переименовывание файлов")
async def run_bot(bot, token, stop_event):
try:
await bot.start(token)
except Exception as e:
print(f"Бот {bot.user.name if hasattr(bot, 'user') else 'Unknown'} упал с ошибкой: {e}")
stop_event.set() # Сигнализируем об остановке
async def monitor_stop(stop_event, bots):
await stop_event.wait()
print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Получен сигнал остановки, завершаю всех ботов...")
for bot in bots:
if not bot.is_closed():
try:
await bot.close()
except Exception as e:
print(f"Ошибка при закрытии бота: {e}")
await asyncio.sleep(0.1)
async def main():
stop_event = asyncio.Event()
DataBase = None
all_bots = []
bot = None
try:
DataBase = await init_db()
# Инициализация ботов
bot = MainBot(DataBase, stop_event)
all_bots = [bot]
# Загрузка когов
bot.load_extension("cogs.users")
bot.load_extension("cogs.moderators")
bot.load_extension("cogs.administrators")
# Запуск монитора остановки и ботов
monitor_task = asyncio.create_task(monitor_stop(stop_event, all_bots))
bot_tasks = [
asyncio.create_task(run_bot(bot, TOKENS["KrekModBot"], stop_event))
]
await asyncio.gather(*bot_tasks, monitor_task)
except KeyboardInterrupt:
print("Боты остановлены по запросу пользователя")
except Exception as e:
print(f"Произошла критическая ошибка: {e}")
finally:
await bot.BotOff()
for bot in all_bots:
if not bot.is_closed():
await bot.close()
await DataBase.close()
current_task = asyncio.current_task()
pending = [t for t in asyncio.all_tasks() if t is not current_task and not t.done()]
for task in pending:
task.cancel()
await asyncio.gather(*pending, return_exceptions=True)
await asyncio.sleep(0.1)
if __name__ == "__main__":
asyncio.run(main())

210
src/cogs/administrators.py Normal file
View File

@@ -0,0 +1,210 @@
import disnake
from disnake.ext import commands
from disnake.ext import tasks
import requests
import numpy as np
import asyncio
import sys
import os
import copy
import datetime
import math
import random
import json
import shutil
def setup(bot):
bot.add_cog(AdminModule(bot))
class AdminModule(commands.Cog):
def __init__(self, client):
self.client = client
self.DataBaseManager = client.DataBaseManager
@commands.Cog.listener()
async def on_ready(self):
print(f'KrekModBot admin module activated')
@commands.slash_command(name="bot_mod_off")
async def BotModOff(self, ctx: disnake.ApplicationCommandInteraction):
if self.client.me in ctx.author.roles:
await ctx.send(embed=disnake.Embed(description=f'Бот отключён', colour=0xff9900), ephemeral=True)
await self.client.BotOff()
else:
await ctx.send(embed=disnake.Embed(description=f'Не допустимо', colour=0xff9900), ephemeral=True)
@commands.slash_command(name="очистка", administrator=True)
async def clear(self, ctx: disnake.AppCmdInter, count: int):
if self.client.me in ctx.author.roles:
await ctx.channel.purge(limit=count)
await ctx.send(embed = disnake.Embed(description = f'очищено {count} сообщений!', colour = 0xff9900), ephemeral=True)
else:
await ctx.send(embed = disnake.Embed(description = f'Ты чего удумал?', colour = 0xff9900), ephemeral=True)
@commands.slash_command(description="Позволяет менять/создавать/удалять ветви", name="правкаетви", administrator=True)
async def edit_branch(self, ctx: disnake.AppCmdInter, purpose: str = commands.Param(description="Укажите цель этой ветки (например, \"модерация\" или \"администрация\")", name="цель", default=None),
layer: int = commands.Param(description="Укажите слой этой ветки (у ролей нижних слоёв есть власть над верхними)", name="слой", default=None),
branchid: int = commands.Param(description="Необходимо для изменения уже существующей ветви", name="id", default=None),
is_admin: bool = commands.Param(description="Имеют ли пользователи в этой роли права администратора? *(только при создании)", name="администратор", default=False),
is_moder: bool = commands.Param(description="Имеют ли пользователи в этой роли права модератора? *(только при создании)", name="модератор", default=False),
delete_branch: str = commands.Param(description="Вы уверены, что хотите удалить ветвь? Для подтверждения впишите \"уверен\"", name="удаление", default=None)):
if not self.client.me in ctx.author.roles:
await ctx.send(embed = disnake.Embed(description = f'Недостаточно прав', colour = 0xff9900), ephemeral=True)
return 1
async with self.DataBaseManager.session() as session:
if delete_branch == "уверен":
if branchid is None:
await ctx.send(embed = disnake.Embed(description = f'Для удаления ветки необходимо указать её id', colour = 0xff9900))
return 1
async with session.begin():
stmt = self.DataBaseManager.delete(self.DataBaseManager.models['staff_branches'].m).where(self.DataBaseManager.models['staff_branches'].m.id == branchid)
await session.execute(stmt)
await ctx.send(embed = disnake.Embed(description = f'Ветка {branchid} успешно удалена', colour = 0xff9900))
return 0
if branchid is None:
if layer is None or purpose is None:
await ctx.send(embed = disnake.Embed(description = f'Для создания роли необходимо ввести все данные', colour = 0xff9900))
return 1
async with session.begin():
new_branch = self.DataBaseManager.models['staff_branches'].m(layer = layer, purpose = purpose, is_admin = is_admin, is_moder = is_moder)
session.add(new_branch)
await ctx.send(embed = disnake.Embed(description = f'Ветвь \"{purpose}\" успешно создана. Её новый id: {new_branch.id}', colour = 0xff9900))
return 0
else:
async with session.begin():
branch = await session.get(self.DataBaseManager.models['staff_branches'].m, branchid, with_for_update = True)
if branch is None:
await ctx.send(embed = disnake.Embed(description = f'Ветви с таким идентификатором пока не существует', colour = 0xff9900))
return 1
else:
if purpose is None and layer is None:
await ctx.send(embed = disnake.Embed(description = f'Для изменения ветви необходимы новые значения', colour = 0xff9900))
return 1
if not purpose is None:
branch.purpose = purpose
if not layer is None:
branch.layer = layer
await ctx.send(embed = disnake.Embed(description = f'Ветвь \"{branch.purpose}\"({branch.id}) успешно изменена', colour = 0xff9900))
return 0
@commands.slash_command(description="Позволяет менять/создавать/удалять роли в системе персонала", name="правка_роли", administrator=True)
async def edit_role(self, ctx: disnake.AppCmdInter, roleid: str = commands.Param(description="Укажите id роли (используются идентификаторы дискорда)", name="id"),
staffsalary: int = commands.Param(description="Укажите зарплату этой роли", name="зарплата", default=0),
branchid: int = commands.Param(description="Укажите id ветви для этой роли *(только при создании)", name="ветвь", default=None),
layer: int = commands.Param(description="Укажите слой этой роли в ветке (у ролей нижних слоёв есть власть над верхними)", name="слой", default=None),
delete_role: str = commands.Param(description="Вы уверены, что хотите удалить роль из системы? Для подтверждения впишите \"уверен\"", name="удаление", default=None),):
if not self.client.me in ctx.author.roles:
await ctx.send(embed = disnake.Embed(description = f'Недостаточно прав', colour = 0xff9900), ephemeral=True)
return 1
roleid = int(roleid)
staff_roles_model = self.DataBaseManager.models['staff_roles'].m
async with self.DataBaseManager.session() as session:
if delete_role == "уверен":
async with session.begin():
stmt = self.DataBaseManager.delete(staff_roles_model).where(staff_roles_model.id == roleid)
await session.execute(stmt)
await ctx.send(embed = disnake.Embed(description = f'Роль <@&{roleid}> успешно удалена из системы', colour = 0xff9900))
return 0
async with session.begin():
role = await session.get(staff_roles_model, roleid, with_for_update = True)
if not role is None:
if staffsalary != 0:
role.staff_salary = staffsalary
await ctx.send(embed = disnake.Embed(description = f'Зарплата роли <@&{roleid}> успешно изменена на {staffsalary}', colour = 0xff9900))
return 0
elif not layer is None:
role.layer = layer
await ctx.send(embed = disnake.Embed(description = f'Слой роли <@&{roleid}> успешно изменён на {layer}', colour = 0xff9900))
return 0
else:
await ctx.send(embed = disnake.Embed(description = f'Для изменения существующей роли необходимо ввести новые параметры: layer или staffsalary', colour = 0xff9900))
return 1
else:
if branchid is None:
await ctx.send(embed = disnake.Embed(description = f'Для внесения этой роли в систему необходимо указать id ветви', colour = 0xff9900))
return 1
if layer is None:
await ctx.send(embed = disnake.Embed(description = f'Для внесения этой роли в систему необходимо указать слой роли', colour = 0xff9900))
return 1
branch = await session.get(self.DataBaseManager.models['staff_branches'].m, branchid)
if branch is None:
await ctx.send(embed = disnake.Embed(description = f'Ветви с таким id пока не существует', colour = 0xff9900))
return 1
role = staff_roles_model(id = roleid, staff_salary = staffsalary, branch_id = branchid, layer = layer)
session.add(role)
await ctx.send(embed = disnake.Embed(description = f'Роль <@&{roleid}> успешно добавлена в ветвь {branchid}', colour = 0xff9900))
return 0
@commands.slash_command(description="Позволяет создавать/удалять пользователей в системе персонала", name="правка_пользователя", administrator=True)
async def edit_member(self, ctx: disnake.AppCmdInter, userid: str = commands.Param(description="Укажите id пользователя (используются идентификаторы дискорда)", name="id"),
delete_user: str = commands.Param(description="Вы уверены, что хотите удалить пользователя из системы? Для подтверждения впишите \"уверен\"", name="удаление", default=None)):
if not self.client.me in ctx.author.roles:
await ctx.send(embed = disnake.Embed(description = f'Недостаточно прав', colour = 0xff9900), ephemeral=True)
return 1
userid = int(userid)
staff_users_model = self.DataBaseManager.models['staff_users'].m
async with self.DataBaseManager.session() as session:
if delete_user == "уверен":
async with session.begin():
stmt = self.DataBaseManager.delete(staff_users_model).where(staff_users_model.id == userid)
await session.execute(stmt)
await ctx.send(embed = disnake.Embed(description = f'Пользователь <@{userid}> успешно удален из системы', colour = 0xff9900))
member = await self.client.krekchat.fetch_member(userid)
await member.remove_roles(self.client.staff)
return 0
async with session.begin():
user = await session.get(staff_users_model, userid, with_for_update = True)
if user is None:
member = await self.client.krekchat.fetch_member(userid)
await member.add_roles(self.client.staff)
user = staff_users_model(id = userid)
session.add(user)
await ctx.send(embed = disnake.Embed(description = f'Пользователь <@{userid}> успешно добавлен в систему', colour = 0xff9900))
return 0
else:
await ctx.send(embed = disnake.Embed(description = f'Пользователь <@{userid}> уже есть в системе', colour = 0xff9900))
return 1
@commands.slash_command(description="!!ВАЖНО!! ИСПОЛЬЗОВАНИЕ ТОЛЬКО В ЭКСТРЕННЫХ СЛУЧАЯХ! Назначает пользователей на роль", name="назначить_пользователя", administrator=True)
async def appoint_member(self, ctx: disnake.AppCmdInter, userid: str = commands.Param(description="Укажите id пользователя (используются идентификаторы дискорда)", name="пользователь"),
roleid: str = commands.Param(description="Укажите id роли (используются идентификаторы дискорда)", name="роль"),
description: str = commands.Param(description="Описание", name="описание", default=None)):
if not self.client.me in ctx.author.roles:
await ctx.send(embed = disnake.Embed(description = f'Недостаточно прав', colour = 0xff9900), ephemeral=True)
return 1
userid = int(userid)
roleid = int(roleid)
async with self.DataBaseManager.session() as session:
async with session.begin():
user = await session.get(self.DataBaseManager.model_classes['staff_users'], userid)
if user is None:
user = self.DataBaseManager.model_classes['staff_users'](id = userid)
session.add(user)
await session.flush()
user_role = await self.DataBaseManager.model_classes['staff_users_roles'].create_with_auto_branch(session, user_id = userid, role_id = roleid, description = description)
session.add(user_role)
await ctx.send(embed = disnake.Embed(description = f'Пользователю <@{userid}> успешно назначена роль <@&{roleid}>. \n```diff\n- Эта функция не выдаёт роли автоматически, поэтому требуется выдача вручную.```', colour = 0xff9900))
return 0

686
src/cogs/moderators.py Normal file
View File

@@ -0,0 +1,686 @@
import disnake
from disnake.ext import commands
from disnake.ext import tasks
import asyncio
import sys
import os
import copy
import datetime
import math
import random
import json
import shutil
translate = {"textmute":"Текстовый мут", "voicemute":"Голосовой мут", "ban":"Бан", "warning":"Предупреждение",\
"time":"Время", "reason":"Причина", "changenick":"Сменить ник", "reprimand":"Выговор", "newnick":"Новый ник"}
form_to_send = {'punishment_mutes_voice': 'voice_mutes', 'punishment_mutes_text': 'text_mutes', 'punishment_bans': 'bans', 'punishment_warns': 'warns', 'punishment_reprimands': 'reprimand', 'punishment_perms': 'perm'}
count_translate = {"textmute":3, "voicemute":3, "ban":9, "warning":1, "changenick":0, "punishment_mutes_text":3, "punishment_mutes_voice":3, "punishment_bans":9, "punishment_warns":1}
def setup(bot):
bot.add_cog(ModerModule(bot))
class ModerModule(commands.Cog):
def __init__(self, client):
self.client = client
self.DataBaseManager = self.client.DataBaseManager
@commands.Cog.listener()
async def on_ready(self):
print(f'KrekModBot moderation module activated')
@commands.slash_command(name="действие")
async def action_slash(self, ctx: disnake.AppCmdInter, member: disnake.Member):
models = self.client.DataBaseManager.model_classes
DataBaseManager = self.client.DataBaseManager
staff_users_roles_model = self.DataBaseManager.model_classes['staff_users_roles']
staff_users_model = self.DataBaseManager.model_classes['staff_users']
staff_roles_model = self.DataBaseManager.model_classes['staff_roles']
staff_branches_model = self.DataBaseManager.model_classes['staff_branches']
class ActionSelect(disnake.ui.Select):
client = self.client
max_strength_role = self.max_strength_role
def __init__(self, member, reprimand_branches: list, other_on: bool):
self.member=member
placeholder = "Выберите действие"
options = []
for branch in reprimand_branches:
options += [disnake.SelectOption(label=f"Выговор в ветке {branch.purpose}", value=f"reprimand:{branch.id}")]
if other_on:
options += [
disnake.SelectOption(label="Предупреждение", value="warning"),
disnake.SelectOption(label="Войс мут", value="voicemute"),
disnake.SelectOption(label="Текстовый мут", value="textmute"),
disnake.SelectOption(label="Бан", value="ban"),
disnake.SelectOption(label="Сменить ник", value="changenick"),
]
options += [disnake.SelectOption(label="Удалить наказание", value="deletepenalties")]
super().__init__(placeholder = placeholder, min_values = 1, max_values = 1, options = options)
async def callback(self, interaction:disnake.MessageInteraction):
class ActionModal(disnake.ui.Modal):
client = self.client
def __init__(self, title, member):
self.member = member
self.title = title
if title.split(":")[0] == "textmute" or title.split(":")[0] == "voicemute" or title.split(":")[0] == "ban":
components = [
disnake.ui.TextInput(label="Время", placeholder="Например: 1д15ч9мин", custom_id="time", style=disnake.TextInputStyle.short, max_length=100),
disnake.ui.TextInput(label = "Причина", placeholder="Например: правило 1.3" , custom_id="reason", style=disnake.TextInputStyle.paragraph, max_length=100)
]
if title.split(":")[0] == "warning":
components = [
disnake.ui.TextInput(label = "Причина", placeholder="Например: правило 1.3" , custom_id="reason", style=disnake.TextInputStyle.paragraph, max_length=100)
]
if title.split(":")[0] == "reprimand":
components = [
disnake.ui.TextInput(label = "Причина", placeholder="Например: неправомерный бан" , custom_id="reason", style=disnake.TextInputStyle.paragraph, max_length=100)
]
if title.split(":")[0] == "changenick":
components = [
disnake.ui.TextInput(label="Новый ник", placeholder="Например: Humanoid", custom_id="newnick", style=disnake.TextInputStyle.short, max_length=32)
]
super().__init__(title = title, components = components, timeout=300)
async def callback(self, interaction: disnake.Interaction):
async def voicemute(interaction: disnake.MessageInteraction, member: disnake.Member, time, reason):
role = self.client.voice_mute
await member.add_roles(role)
await member.move_to(None)
async with DataBaseManager.session() as session:
async with session.begin():
new_punishment = models['punishment_mutes_voice'](user_id = member.id, reason = reason, time_end = time, time_warn = None, moderator_id = interaction.author.id)
session.add(new_punishment)
async def textmute(interaction: disnake.MessageInteraction, member: disnake.Member, time, reason):
role = self.client.text_mute
await member.add_roles(role)
async with DataBaseManager.session() as session:
async with session.begin():
new_punishment = models['punishment_mutes_text'](user_id = member.id, reason = reason, time_end = time, time_warn = None, moderator_id = interaction.author.id)
session.add(new_punishment)
async def ban(interaction: disnake.MessageInteraction, member: disnake.Member, time, reason):
role = self.client.ban_role
await member.move_to(None)
if time-datetime.datetime.timestamp(datetime.datetime.now())>0:
await member.add_roles(role)
async with DataBaseManager.session() as session:
async with session.begin():
new_punishment = models['punishment_bans'](user_id = member.id, reason = reason, time_end = time, moderator_id = interaction.author.id)
session.add(new_punishment)
return
await member.add_roles(role)
async with DataBaseManager.session() as session:
async with session.begin():
new_punishment = models['punishment_perms'](user_id = member.id, reason = reason, moderator_id = interaction.author.id)
session.add(new_punishment)
for channel in interaction.guild.channels:
if isinstance(channel, disnake.TextChannel):
await channel.purge(limit=10, check=lambda m: m.author==member)
async def warning(interaction: disnake.MessageInteraction, member: disnake.Member, reason):
async with DataBaseManager.session() as session:
async with session.begin():
new_punishment = models['punishment_warns'](user_id = member.id, reason = reason, time_warn = float(self.client.TimeFormater(days = 30)), moderator_id = interaction.author.id)
session.add(new_punishment)
async def reprimand(interaction: disnake.MessageInteraction, member: disnake.Member, branchid, reason):
async with DataBaseManager.session() as session:
async with session.begin():
new_punishment = models['punishment_reprimands'](user_id = member.id, branch_id = branchid, reason = reason, time_warn = float(self.client.TimeFormater(days = 80)), designated_user_id = interaction.author.id)
session.add(new_punishment)
async def changenick(interaction: disnake.MessageInteraction, member: disnake.Member, newnick):
await member.edit(nick=newnick)
await interaction.response.defer(ephemeral=True)
embed = disnake.Embed(title=f"{translate[self.title.split(':')[0]]}", description = f"{self.member.mention} ({self.member.id})")
bans_channel = disnake.utils.get(interaction.guild.channels, id = 1219644103973671035)
warns_channel = disnake.utils.get(interaction.guild.channels, id = 1219644151184887838)
muts_channel = disnake.utils.get(interaction.guild.channels, id = 1219644125469474889)
reprimands_channel = disnake.utils.get(interaction.guild.channels, id = self.client.constants['reprimandlog_channel'])
logs_channel = disnake.utils.get(interaction.guild.channels, id = 490730651629387776)
formated_time = None
reason = ""
newnick = ""
if self.title.split(':')[0] == "reprimand":
embed.add_field(name = 'Ветка', value = self.title.split(':')[1], inline = False)
for key, value in interaction.text_values.items():
if key == "time":
formated_time = self.client.TimeFormater(value)
embed.add_field(name = translate[key], value = str(formated_time), inline = True)
elif key == "reason":
reason = value
embed.add_field(name = translate[key], value = f"```{value}```", inline = False)
elif key == "newnick":
newnick = value
embed.add_field(name = "", value=f"`{self.member.nick!r} ==> {newnick!r}`", inline = False)
else:
embed.add_field(name = translate[key], value = value, inline = False)
embed.set_footer(text = f"{interaction.author.name}\n{interaction.author.id}")
embed.set_thumbnail(url=self.member.avatar)
await self.client.bt_send({"type": "punishment", "options": {"severity": self.title.split(':')[0], "member": self.member.id, "moderator": interaction.author.id}})
if self.title.split(":")[0] == "reprimand":
async with DataBaseManager.session() as session:
async with session.begin():
stmt = (
DataBaseManager.select(models['punishment_reprimands'])
.where(
models['punishment_reprimands'].user_id == self.member.id,
models['punishment_reprimands'].branch_id == int(self.title.split(':')[1])
)
)
result = (await session.execute(stmt)).scalars().all()
await reprimand(interaction, self.member, int(self.title.split(':')[1]), reason)
embed.add_field(name = "Всего выговоров в этой ветке", value = str(len(result)+1), inline = False)
await reprimands_channel.send(embed=embed)
else:
if self.title == "textmute":
await textmute(interaction, self.member, float(formated_time), reason)
await muts_channel.send(embed=embed)
await interaction.author.send("Скопируй и вставь следующее сообщение с прикреплением доказательств в канал \"[mutlog](https://discord.com/channels/1219110919536115802/1219114151566114847)\"!!!")
await interaction.author.send(f"```{self.member.id}/{str(formated_time)}/{reason}```")
elif self.title == "voicemute":
await voicemute(interaction, self.member, float(formated_time), reason)
await muts_channel.send(embed=embed)
await interaction.author.send("Скопируй и вставь следующее сообщение с прикреплением доказательств в канал \"[mutlog](https://discord.com/channels/1219110919536115802/1219114151566114847)\"!!!")
await interaction.author.send(f"```{self.member.id}/{str(formated_time)}/{reason}```")
elif self.title == "ban":
await ban(interaction, self.member, float(formated_time), reason)
await bans_channel.send(embed=embed)
await interaction.author.send("Скопируй и вставь следующее сообщение с прикреплением доказательств в канал \"[banlog](https://discord.com/channels/1219110919536115802/1219110920060407850)\"!!!")
await interaction.author.send(f"```{self.member.id}/{str(formated_time)}/{reason}```")
elif self.title == "warning":
await warning(interaction, self.member, reason)
await warns_channel.send(embed=embed)
await interaction.author.send("Скопируй и вставь следующее сообщение с прикреплением доказательств в канал \"[warnlog](https://discord.com/channels/1219110919536115802/1219129098257956947)\"!!!")
await interaction.author.send(f"```{self.member.id}/{reason}```")
elif self.title == "changenick":
await changenick(interaction, self.member, newnick)
await logs_channel.send(embed=embed)
else:
await logs_channel.send(embed=embed)
counter = 0
async with DataBaseManager.session() as session:
async with session.begin():
for i in ["punishment_mutes_text", "punishment_mutes_voice", "punishment_bans", "punishment_warns"]:
stmt = DataBaseManager.select(models[i]).where(models[i].user_id == self.member.id)
result = (await session.execute(stmt)).scalars().all()
counter += count_translate[i] * len(result)
if counter>=18:
await ban(interaction, self.member, 0, "Набрал больше 18 'очков наказаний'")
await bans_channel.send(embed=disnake.Embed(title=f"Достойное достижение!", description = f"Поздравляем! {self.member.mention}({self.member.id}) наконец набрал целых {counter} очков наказания и получил свою награду: вечный бан!"))
await interaction.edit_original_response(embed=embed)
class DeletePenaltiesSelect(disnake.ui.Select):
client = self.client
max_strength_role = self.max_strength_role
def __init__(self, member):
self.member=member
async def initialization(self, embed):
member = self.member
placeholder = "Выберите наказание для снятия"
options=[]
async with DataBaseManager.session() as session:
async with session.begin():
result = []
stmt = self.max_strength_role(user_id = ctx.author.id, is_moder = True, is_admin = True)
author_is_moderator = (await session.execute(stmt)).first() is not None
stmt = self.max_strength_role(user_id = ctx.author.id, is_admin = True)
author_is_admin = (await session.execute(stmt)).first()
if author_is_admin:
stmt = DataBaseManager.select(models['punishment_reprimands']).where(models['punishment_reprimands'].user_id == member.id)
result += (await session.execute(stmt)).scalars().all()
if author_is_moderator:
stmt = DataBaseManager.select(models['punishment_mutes_text']).where(models['punishment_mutes_text'].user_id == member.id)
result += (await session.execute(stmt)).scalars().all()
stmt = DataBaseManager.select(models['punishment_mutes_voice']).where(models['punishment_mutes_voice'].user_id == member.id)
result += (await session.execute(stmt)).scalars().all()
stmt = DataBaseManager.select(models['punishment_bans']).where(models['punishment_bans'].user_id == member.id)
result += (await session.execute(stmt)).scalars().all()
stmt = DataBaseManager.select(models['punishment_warns']).where(models['punishment_warns'].user_id == member.id)
result += (await session.execute(stmt)).scalars().all()
stmt = DataBaseManager.select(models['punishment_perms']).where(models['punishment_perms'].user_id == member.id)
result += (await session.execute(stmt)).scalars().all()
result = sorted(result, key=lambda a: a.time_begin, reverse=True)
for penalt in result:
match penalt.get_table_name():
case 'punishment_mutes_text':
embed.add_field(name = f"{penalt.id} : Текстовый мут", value = f"Заканчивается {'<t:{time_end}:f>'.format(time_end = int(penalt.time_end)) if penalt.time_warn is None else '<t:{time_warn}:f> (предупреждение)'.format(time_warn = int(penalt.time_warn))}\n```{penalt.reason}```", inline = False)
options.append(disnake.SelectOption(label=f"{penalt.id}) Текстовый мут", value=f"{penalt.get_table_name()}:{member.id}:{penalt.id}"))
case 'punishment_mutes_voice':
embed.add_field(name = f"{penalt.id} : Голосовой мут", value = f"Заканчивается {'<t:{time_end}:f>'.format(time_end = int(penalt.time_end)) if penalt.time_warn is None else '<t:{time_warn}:f> (предупреждение)'.format(time_warn = int(penalt.time_warn))}\n```{penalt.reason}```", inline = False)
options.append(disnake.SelectOption(label=f"{penalt.id}) Голосовой мут", value=f"{penalt.get_table_name()}:{member.id}:{penalt.id}"))
case 'punishment_bans':
embed.add_field(name = f"{penalt.id} : Бан", value = f"Заканчивается {'<t:{time_end}:f>'.format(time_end = int(penalt.time_end)) if not penalt.time_end is None else 'никогда (предупреждение)'}\n```{penalt.reason}```", inline = False)
options.append(disnake.SelectOption(label=f"{penalt.id}) Бан", value=f"{penalt.get_table_name()}:{member.id}:{penalt.id}"))
case 'punishment_warns':
embed.add_field(name = f"{penalt.id} : Предупреждение", value = f"Заканчивается <t:{int(penalt.time_warn)}:f>\n```{penalt.reason}```", inline = False)
options.append(disnake.SelectOption(label=f"{penalt.id}) Предупреждение", value=f"{penalt.get_table_name()}:{member.id}:{penalt.id}"))
case 'punishment_reprimands':
embed.add_field(name = f"{penalt.id} : Выговор", value = f"Заканчивается <t:{int(penalt.time_warn)}:f>\n```{penalt.reason}```", inline = False)
options.append(disnake.SelectOption(label=f"{penalt.id}) Выговор", value=f"{penalt.get_table_name()}:{member.id}:{penalt.id}"))
case 'punishment_perms':
embed.add_field(name = f"{penalt.id} : Вечный бан", value = f"```{penalt.reason}```", inline = False)
options.append(disnake.SelectOption(label=f"{penalt.id}) Вечный бан", value=f"{penalt.get_table_name()}:{member.id}:{penalt.id}"))
if len(options)==0:
embed.add_field(name = f"Наказаний нет", value = f" ", inline = False)
options.append(disnake.SelectOption(label="Наказаний нет"))
super().__init__(placeholder = placeholder, min_values = 1, max_values = 1, options = options)
return self
async def callback(self, interaction:disnake.MessageInteraction):
values = interaction.values[0]
try:
pentype, member, penaltid = values.split(":")
except ValueError:
return
if values:
logs_channel = disnake.utils.get(interaction.guild.channels, id = 490730651629387776)
async with DataBaseManager.session() as session:
async with session.begin():
stmt = DataBaseManager.select(models[pentype]).where(
DataBaseManager.and_(
models[pentype].id == int(penaltid),
models[pentype].user_id == int(member)
)
).with_for_update()
result = (await session.execute(stmt)).scalars().first()
await session.delete(result)
if pentype=="punishment_perms" or pentype=="punishment_bans":
await self.member.remove_roles(self.client.ban_role)
await self.client.bt_send({"type": "unpunishment", "options": {"severity": "ban", "member": int(member)}})
elif pentype=="punishment_mutes_text":
await self.member.remove_roles(self.client.text_mute)
await self.client.bt_send({"type": "unpunishment", "options": {"severity": "textmute", "member": int(member)}})
elif pentype=="punishment_mutes_voice":
await self.member.remove_roles(self.client.voice_mute)
await self.client.bt_send({"type": "unpunishment", "options": {"severity": "voicemute", "member": int(member)}})
elif pentype=="punishment_reprimands":
await self.client.bt_send({"type": "unpunishment", "options": {"severity": "reprimand", "member": int(member)}})
elif pentype=="punishment_warns":
await self.client.bt_send({"type": "unpunishment", "options": {"severity": "warning", "member": int(member)}})
view = disnake.ui.View(timeout=30)
embed = disnake.Embed(title="Выберите наказание для снятия", description = f"{self.member.mention}\n({self.member.id})", colour = 0x008000)
view.add_item(await DeletePenaltiesSelect(self.member).initialization(embed))
await interaction.response.edit_message(embed = embed, view=view)
await interaction.followup.send(embed = disnake.Embed(description = f"Наказание {pentype} успешно аннулировано!", colour = 0x008000), ephemeral=True)
await logs_channel.send(embed = disnake.Embed(description = f"{interaction.author.mention}({interaction.author.id}) успешно аннулировал наказание {pentype}({penaltid}) у пользователя {member}", colour = 0x008000),)
if "textmute" in self.values[0] or "voicemute" in self.values[0] or "ban" in self.values[0] or\
"warning" in self.values[0] or "changenick" in self.values[0] or "reprimand" in self.values[0]:
modal = ActionModal(self.values[0], self.member)
await interaction.response.send_modal(modal)
elif "deletepenalties" in self.values[0]:
view = disnake.ui.View(timeout=30)
embed = disnake.Embed(title="Выберите наказание для снятия", description = f"{self.member.mention}\n({self.member.id})", colour = 0x008000)
view.add_item(await DeletePenaltiesSelect(self.member).initialization(embed))
await interaction.response.send_message(embed = embed, ephemeral=True, view=view)
await ctx.response.defer(ephemeral=True)
async with self.DataBaseManager.session() as session:
stmt = self.max_strength_role(ctx.author.id)
author_strenght = (await session.execute(stmt)).first()
if author_strenght is None:
await ctx.edit_original_message(embed = disnake.Embed(description = f"Вы должны иметь должность в персонале полей, чтобы действовать", colour = 0xff9900))
return
stmt = (
self.DataBaseManager.select(staff_users_roles_model)
.options(
self.DataBaseManager.joinedload(staff_users_roles_model.role),
self.DataBaseManager.joinedload(staff_users_roles_model.branch)
)
.where(staff_users_roles_model.user_id == member.id)
)
member_roles = (await session.execute(stmt)).scalars().all()
stmt = (
self.DataBaseManager.select(staff_users_roles_model.role_id)
.where(staff_users_roles_model.user_id == ctx.author.id)
)
author_roles = set((await session.execute(stmt)).scalars().all())
reprimand_branches = []
for role in member_roles:
stmt = (
self.DataBaseManager.select(staff_roles_model.id)
.join(staff_branches_model, staff_roles_model.branch_id == staff_branches_model.id)
.where(
self.DataBaseManager.tuple_(staff_branches_model.layer, staff_roles_model.layer) < (role.branch.layer, role.role.layer)
)
.order_by(
staff_branches_model.layer.asc(),
staff_roles_model.layer.asc()
)
)
grand_roles = set((await session.execute(stmt)).scalars().all())
if bool(author_roles & grand_roles):
reprimand_branches += [role.branch]
stmt = self.max_strength_role(ctx.author.id, is_moder = True, is_admin = True)
author_is_moderator = (await session.execute(stmt)).all()
stmt = self.max_strength_role(member.id, is_moder = True, is_admin = True)
member_is_moderator = (await session.execute(stmt)).all()
class View(disnake.ui.View):
async def on_timeout(self):
await ctx.edit_original_message(view=None)
if (not reprimand_branches) and (not ((not member_is_moderator) and author_is_moderator)):
await ctx.edit_original_message(embed = disnake.Embed(description = f"Вы ничего не можете сделать этому пользователю", colour = 0xff9900))
return
embed = disnake.Embed(title="Выберите действие", description = f"{member.mention}\n({member.id})", colour = 0x008000)
embed.set_thumbnail(url=member.avatar)
view = View(timeout=30)
view.add_item(ActionSelect(member = member, reprimand_branches = reprimand_branches, other_on = (not member_is_moderator) and author_is_moderator))
await ctx.edit_original_message(embed = embed, view=view)
'''
Продвижения и т.п.
'''
def max_strength_role(self, user_id: int, branch_ids: list = [], is_admin: bool = False, is_moder: bool = False):
staff_users_roles_model = self.DataBaseManager.model_classes['staff_users_roles']
staff_users_model = self.DataBaseManager.model_classes['staff_users']
staff_roles_model = self.DataBaseManager.model_classes['staff_roles']
staff_branches_model = self.DataBaseManager.model_classes['staff_branches']
stmt = (
self.DataBaseManager.select(staff_branches_model.layer, staff_roles_model.layer)
.select_from(staff_users_roles_model)
.join(staff_users_model, staff_users_roles_model.user_id == staff_users_model.id)
.join(staff_roles_model, staff_users_roles_model.role_id == staff_roles_model.id)
.join(staff_branches_model, staff_roles_model.branch_id == staff_branches_model.id)
.order_by(
staff_branches_model.layer.asc(),
staff_roles_model.layer.asc()
)
)
if is_admin or is_moder or branch_ids:
stmt = stmt.where(
self.DataBaseManager.and_(
self.DataBaseManager.or_(
staff_branches_model.id.in_(branch_ids),
staff_branches_model.is_admin == is_admin if is_admin else self.DataBaseManager.false(),
staff_branches_model.is_moder == is_moder if is_moder else self.DataBaseManager.false()
),
staff_users_model.id == user_id
)
)
else:
stmt = stmt.where(
staff_users_model.id == user_id
)
return stmt
async def promotions_add_remove_role(self, ctx, userid, add_roleid = None, remove_roleid = None):
krekchat = await self.client.fetch_guild(self.client.krekchat.id)
member = await krekchat.fetch_member(userid)
if not add_roleid is None:
add_role = await krekchat.fetch_role(add_roleid)
try:
await member.add_roles(add_role)
except disnake.errors.Forbidden:
await ctx.channel.send(ctx.author.mention, embed = disnake.Embed(description = f'Недостаточно прав. Пользователю {member.mention} необходимо ручное добавление роли {add_role.mention}', colour = 0xff9900))
if not remove_roleid is None:
remove_role = await krekchat.fetch_role(remove_roleid)
try:
await member.remove_roles(remove_role)
except disnake.errors.Forbidden:
await ctx.channel.send(ctx.author.mention, embed = disnake.Embed(description = f'Недостаточно прав. Пользователю {member.mention} необходимо ручное удаление роли {remove_role.mention}', colour = 0xff9900))
@commands.slash_command(description="Позволяет повысить пользователя в указанной ветке", name="повысить", administrator=True)
async def promote(self, ctx: disnake.AppCmdInter, branchid: int = commands.Param(description="Укажите id ветви", name="ветвь", default = None),
userid: str = commands.Param(description="Укажите id пользователя (используются идентификаторы дискорда)", name="пользователь")):
userid = int(userid)
async with self.DataBaseManager.session() as session:
async with session.begin():
staff_users_roles_model = self.DataBaseManager.model_classes['staff_users_roles']
staff_users_model = self.DataBaseManager.model_classes['staff_users']
staff_roles_model = self.DataBaseManager.model_classes['staff_roles']
staff_branches_model = self.DataBaseManager.model_classes['staff_branches']
if branchid is None:
stmt = (
self.DataBaseManager.select(staff_users_roles_model)
.options(
self.DataBaseManager.selectinload(staff_users_roles_model.role)
.selectinload(staff_roles_model.branch)
)
.where(
staff_users_roles_model.user_id == userid
)
)
user_roles = (await session.execute(stmt)).scalars().all()
if len(user_roles) != 1:
await ctx.send(embed = disnake.Embed(description = f'Для данной операции необходимо уточнение id ветки!', colour = 0xff9900))
return 1
else:
branchid = user_roles[0].role.branch.id
stmt = self.max_strength_role(user_id = ctx.author.id, branch_ids = [branchid], is_admin = True)
author_upper_role = (await session.execute(stmt)).first()
stmt = self.max_strength_role(user_id = userid, branch_ids = [branchid])
member_upper_role = (await session.execute(stmt)).first()
if author_upper_role is None or ((not member_upper_role is None) and author_upper_role >= member_upper_role):
await ctx.send(embed = disnake.Embed(description = f'Вы должны быть выше должности, на которую назначаете пользователя', colour = 0xff9900))
return 1
stmt = (
self.DataBaseManager.select(staff_roles_model)
.options(
self.DataBaseManager.selectinload(staff_roles_model.branch)
)
.where(staff_roles_model.branch_id == branchid)
.order_by(staff_roles_model.layer.desc())
)
branch_roles = (await session.execute(stmt)).scalars().all()
stmt = (
self.DataBaseManager.select(staff_users_roles_model)
.join(staff_users_model, staff_users_roles_model.user_id == staff_users_model.id)
.join(staff_roles_model, staff_users_roles_model.role_id == staff_roles_model.id)
.join(staff_branches_model, staff_roles_model.branch_id == staff_branches_model.id)
.options(
self.DataBaseManager.contains_eager(staff_users_roles_model.user),
self.DataBaseManager.contains_eager(staff_users_roles_model.role)
.contains_eager(staff_roles_model.branch)
)
.where(
self.DataBaseManager.and_(
staff_branches_model.id == branchid,
staff_users_model.id == userid
)
)
).with_for_update()
member_role = (await session.execute(stmt)).scalars().first()
if member_role is not None:
member_role_index = next((i for i, role in enumerate(branch_roles) if role.id == member_role.role.id), None)
if member_role_index + 1 >= len(branch_roles):
await ctx.send(embed = disnake.Embed(description = f'Этот пользователь уже находится на самой высокой должности в данной ветви', colour = 0xff9900))
return 1
target_role = branch_roles[member_role_index + 1]
if author_upper_role >= (target_role.branch.layer, target_role.layer):
await ctx.send(embed = disnake.Embed(description = f'Вы должны быть выше должности, на которую назначаете пользователя', colour = 0xff9900))
return 1
previous_role_id = member_role.role_id
member_role.role_id = target_role.id
await ctx.send(embed = disnake.Embed(description = f'Пользователь <@{userid}> успешно назначен на роль <@&{target_role.id}>', colour = 0xff9900))
await self.promotions_add_remove_role(ctx, userid, add_roleid = target_role.id, remove_roleid = previous_role_id)
return 0
else:
member_check = await session.get(staff_users_model, userid)
if member_check is None:
await ctx.send(embed = disnake.Embed(description = f'Необходимо изначально зарегистрировать пользователя, попросите разработчика использовать \"/правка_пользователя\"', colour = 0xff9900))
return 1
member_role_index = 0
if member_role_index >= len(branch_roles):
await ctx.send(embed = disnake.Embed(description = f'В данной ветви пока нет ролей', colour = 0xff9900))
return 1
target_role = branch_roles[member_role_index]
if author_upper_role >= (target_role.branch.layer, target_role.layer):
await ctx.send(embed = disnake.Embed(description = f'Вы должны быть выше должности, на которую назначаете пользователя', colour = 0xff9900))
return 1
user_role = await staff_users_roles_model.create_with_auto_branch(session, user_id = userid, role_id = target_role.id)
session.add(user_role)
await ctx.send(embed = disnake.Embed(description = f'Пользователь <@{userid}> успешно назначен на роль <@&{target_role.id}>', colour = 0xff9900))
await self.promotions_add_remove_role(ctx, userid, add_roleid = target_role.id)
return 0
@commands.slash_command(description="Позволяет понизить пользователя в указанной ветке", name="понизить", administrator=True)
async def demote(self, ctx: disnake.AppCmdInter, branchid: int = commands.Param(description="Укажите id ветви", name="ветвь", default = None),
userid: str = commands.Param(description="Укажите id пользователя (используются идентификаторы дискорда)", name="пользователь")):
userid = int(userid)
async with self.DataBaseManager.session() as session:
async with session.begin():
staff_users_roles_model = self.DataBaseManager.model_classes['staff_users_roles']
staff_users_model = self.DataBaseManager.model_classes['staff_users']
staff_roles_model = self.DataBaseManager.model_classes['staff_roles']
staff_branches_model = self.DataBaseManager.model_classes['staff_branches']
if branchid is None:
stmt = (
self.DataBaseManager.select(staff_users_roles_model)
.options(
self.DataBaseManager.selectinload(staff_users_roles_model.role)
.selectinload(staff_roles_model.branch)
)
.where(
staff_users_roles_model.user_id == userid
)
)
user_roles = (await session.execute(stmt)).scalars().all()
if len(user_roles) != 1:
await ctx.send(embed = disnake.Embed(description = f'Для данной операции необходимо уточнение id ветки!', colour = 0xff9900))
return 1
else:
branchid = user_roles[0].role.branch.id
stmt = self.max_strength_role(user_id = ctx.author.id, branch_ids = [branchid], is_admin = True)
author_upper_role = (await session.execute(stmt)).first()
stmt = self.max_strength_role(user_id = userid, branch_ids = [branchid])
member_upper_role = (await session.execute(stmt)).first()
if author_upper_role is None or ((not member_upper_role is None) and author_upper_role >= member_upper_role):
await ctx.send(embed = disnake.Embed(description = f'Вы должны быть выше пользователя в должности', colour = 0xff9900))
return 1
stmt = (
self.DataBaseManager.select(staff_roles_model)
.options(
self.DataBaseManager.selectinload(staff_roles_model.branch)
)
.where(staff_roles_model.branch_id == branchid)
.order_by(staff_roles_model.layer.desc())
)
branch_roles = (await session.execute(stmt)).scalars().all()
stmt = (
self.DataBaseManager.select(staff_users_roles_model)
.join(staff_users_model, staff_users_roles_model.user_id == staff_users_model.id)
.join(staff_roles_model, staff_users_roles_model.role_id == staff_roles_model.id)
.join(staff_branches_model, staff_roles_model.branch_id == staff_branches_model.id)
.options(
self.DataBaseManager.contains_eager(staff_users_roles_model.user),
self.DataBaseManager.contains_eager(staff_users_roles_model.role)
.contains_eager(staff_roles_model.branch)
)
.where(
self.DataBaseManager.and_(
staff_branches_model.id == branchid,
staff_users_model.id == userid
)
)
).with_for_update()
member_role = (await session.execute(stmt)).scalars().first()
if member_role is not None:
member_role_index = next((i for i, role in enumerate(branch_roles) if role.id == member_role.role.id), None)
if member_role_index == 0:
stmt = (
self.DataBaseManager.select(staff_users_roles_model)
.where(staff_users_roles_model.user_id == userid)
)
result = (await session.execute(stmt)).scalars().all()
await self.promotions_add_remove_role(ctx, userid, remove_roleid = member_role.role_id)
if len(result) <= 1:
stmt = self.DataBaseManager.delete(staff_users_model).where(staff_users_model.id == userid)
await session.execute(stmt)
await ctx.send(embed = disnake.Embed(description = f'Пользователь <@{userid}> полностью удалён из системы', colour = 0xff9900))
await self.promotions_add_remove_role(ctx, userid, remove_roleid = self.client.staff.id)
return 0
else:
await session.delete(member_role)
await ctx.send(embed = disnake.Embed(description = f'Пользователь <@{userid}> снят со всех должностей в ветви {branchid}', colour = 0xff9900))
return 0
target_role = branch_roles[member_role_index - 1]
previous_role_id = member_role.role_id
member_role.role_id = target_role.id
await ctx.send(embed = disnake.Embed(description = f'Пользователь <@{userid}> успешно понижен до роли <@&{target_role.id}> в ветви {branchid}', colour = 0xff9900))
await self.promotions_add_remove_role(ctx, userid, add_roleid = target_role.id, remove_roleid = previous_role_id)
return 0
else:
await ctx.send(embed = disnake.Embed(description = f'Данный пользователь не имеет роли в ветке {branchid}', colour = 0xff9900))
return 1

View File

@@ -0,0 +1,14 @@
import disnake
from disnake.ext import commands
def setup(bot: commands.Bot):
bot.add_cog(ExampleCog(bot))
print("ExampleCog загружен!")
class ExampleCog(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot
@commands.slash_command(name="ping", description="Проверка бота на работоспособность")
async def ping(self, inter: disnake.ApplicationCommandInteraction):
await inter.response.send_message(f"Понг! {round(self.bot.latency * 1000)}мс")

280
src/cogs/users.py Normal file
View File

@@ -0,0 +1,280 @@
import disnake
from disnake.ext import commands
from disnake.ext import tasks
import requests
import numpy as np
import asyncio
import sys
import os
import copy
import datetime
import math
import random
import json
import shutil
def setup(bot):
bot.add_cog(UIModule(bot))
class UIModule(commands.Cog):
def __init__(self, client):
self.client = client
self.DataBaseManager = self.client.DataBaseManager
@commands.Cog.listener()
async def on_ready(self):
print(f'KrekModBot UI module activated')
@commands.slash_command(description="Показывает действительные наказания пользователя", name="наказания")
async def penalties(self, ctx: disnake.AppCmdInter, member: disnake.Member = None):
models = self.client.DataBaseManager.model_classes
if not member:
member = ctx.author
embed = disnake.Embed(title="Наказания", description = f"{member.mention}", colour = 0x008000)
embed.set_thumbnail(url=member.avatar)
async with self.DataBaseManager.session() as session:
async with session.begin():
stmt = self.DataBaseManager.select(models['punishment_mutes_text']).where(models['punishment_mutes_text'].user_id == member.id)
result = (await session.execute(stmt)).scalars().all()
stmt = self.DataBaseManager.select(models['punishment_mutes_voice']).where(models['punishment_mutes_voice'].user_id == member.id)
result += (await session.execute(stmt)).scalars().all()
stmt = self.DataBaseManager.select(models['punishment_bans']).where(models['punishment_bans'].user_id == member.id)
result += (await session.execute(stmt)).scalars().all()
stmt = self.DataBaseManager.select(models['punishment_warns']).where(models['punishment_warns'].user_id == member.id)
result += (await session.execute(stmt)).scalars().all()
stmt = self.DataBaseManager.select(models['punishment_reprimands']).where(models['punishment_reprimands'].user_id == member.id)
result += (await session.execute(stmt)).scalars().all()
stmt = self.DataBaseManager.select(models['punishment_perms']).where(models['punishment_perms'].user_id == member.id)
result += (await session.execute(stmt)).scalars().all()
result = sorted(result, key=lambda a: a.time_begin, reverse=True)
for penalt in result:
match penalt.get_table_name():
case 'punishment_mutes_text':
embed.add_field(name = f"Текстовый мут", value = f"Заканчивается {'<t:{time_end}:f>'.format(time_end = int(penalt.time_end)) if penalt.time_warn is None else '<t:{time_warn}:f> (предупреждение)'.format(time_warn = int(penalt.time_warn))}\n```{penalt.reason}```", inline = False)
case 'punishment_mutes_voice':
embed.add_field(name = f"Голосовой мут", value = f"Заканчивается {'<t:{time_end}:f>'.format(time_end = int(penalt.time_end)) if penalt.time_warn is None else '<t:{time_warn}:f> (предупреждение)'.format(time_warn = int(penalt.time_warn))}\n```{penalt.reason}```", inline = False)
case 'punishment_bans':
embed.add_field(name = f"Бан", value = f"Заканчивается {'<t:{time_end}:f>'.format(time_end = int(penalt.time_end)) if not penalt.time_end is None else 'никогда (предупреждение)'}\n```{penalt.reason}```", inline = False)
case 'punishment_warns':
embed.add_field(name = f"Предупреждение", value = f"Заканчивается <t:{int(penalt.time_warn)}:f>\n```{penalt.reason}```", inline = False)
case 'punishment_reprimands':
embed.add_field(name = f"Выговор", value = f"Заканчивается <t:{int(penalt.time_warn)}:f>\n```{penalt.reason}```", inline = False)
case 'punishment_perms':
embed.add_field(name = f"Вечный бан", value = f"```{penalt.reason}```", inline = False)
if len(embed.fields)==0:
embed.add_field(name = f"Наказаний нет", value = f"", inline = False)
await ctx.response.send_message(embed = embed)
@commands.slash_command(description="Подайте жалобу на нарушение правил сервера или действия модератора", name="жалоба")
async def report(self, ctx: disnake.AppCmdInter, member: disnake.Member = commands.Param(description="На кого хотите подать жалобу?", name="пользователь"), reason: str = commands.Param(description="Кратко опишите причину жалобы", name="причина")):
async def ReportCallback(ctx, member, report_message, reason, embed, mentions, mod):
async def AcceptCallback(report_message, member, embed, mod, call, channel):
if call.author == mod.author:
await channel.delete()
embed.set_footer(text = f"Рассмотрено в пользу исца\n{mod.author.name} ({mod.author.id})")
embed.colour=0x008000
await report_message.edit(content = "", embed=embed, view=None)
await member.send(embed = disnake.Embed(title=f"", description = f"После разбора нарушения модератор {mod.author.mention} признал вас виновным", colour = 0xff9900))
await self.client.bt_send({"type": "complaint", "options": {"accepted": True, "attack_member": ctx.author.id, "defence_member": member.id, "moderator": mod.author.id}})
else:
await call.send(embed = disnake.Embed(title=f"", description = f"Только судья может использовать эти команды", colour = 0xff9900), ephemeral=True)
async def DenyCallback(report_message, member, embed, mod, call, channel):
if call.author == mod.author:
await channel.delete()
embed.set_footer(text = f"Рассмотрено в пользу ответчика\n{mod.author.name} ({mod.author.id})")
embed.colour=0x008000
await report_message.edit(content = "", embed=embed, view=None)
#await member.send(embed = disnake.Embed(title=f"", description = f"После разбора нарушения модератор {mod.author.mention} признал вас невиновным", colour = 0xff9900))
await self.client.bt_send({"type": "complaint", "options": {"accepted": False, "attack_member": ctx.author.id, "defence_member": member.id, "moderator": mod.author.id}})
else:
await call.send(embed = disnake.Embed(title=f"", description = f"Только судья может использовать эти команды", colour = 0xff9900), ephemeral=True)
class AttackClass:
def __init__(self, member, channel, mod):
self.member=member
self.channel=channel
self.mod=mod
self.permatt=True
async def callback(self, mod):
if mod.author == self.mod.author:
await self.channel.set_permissions(self.member, read_messages = True, read_message_history=True, send_messages=self.permatt) #read_messages=self.permatt
await mod.send(embed = disnake.Embed(title=f"", description = f"{self.member.mention}-{'право ответа включено' if self.permatt else 'право ответа выключено'}", colour = 0x008000), ephemeral=True)
self.permatt=not self.permatt
else:
await mod.send(embed = disnake.Embed(title=f"", description = f"Только судья может использовать эти команды", colour = 0xff9900), ephemeral=True)
class DefenceClass:
def __init__(self, member, channel, mod):
self.member=member
self.channel=channel
self.mod=mod
self.permdef=True
async def callback(self, mod):
if mod.author == self.mod.author:
await self.channel.set_permissions(self.member, read_messages = True, read_message_history=True, send_messages=self.permdef) #read_messages=self.permdef
await mod.send(embed = disnake.Embed(title=f"", description = f"{self.member.mention}-{'право ответа включено' if self.permdef else 'право ответа выключено'}", colour = 0x008000), ephemeral=True)
self.permdef=not self.permdef
else:
await mod.send(embed = disnake.Embed(title=f"", description = f"Только судья может использовать эти команды", colour = 0xff9900), ephemeral=True)
if not any(i.mention in mentions for i in mod.author.roles):
await mod.send(embed = disnake.Embed(description = f"Вы не можете принять этот репорт", colour = 0xff9900), ephemeral=True)
return
if mod.author == ctx.author:
await mod.send(embed = disnake.Embed(description = f"Вы не можете принять свой же репорт", colour = 0xff9900), ephemeral=True)
return
embed.set_footer(text = f"Принято\n{mod.author.name} ({mod.author.id})")
embed.colour=0x008000
await report_message.edit(content = "", embed=embed, view=None)
parsing_channel = await ctx.guild.create_text_channel(
name=f"⚠️Жалоба от {ctx.author.name} на {member}",
overwrites = {ctx.author: disnake.PermissionOverwrite(read_messages=True, send_messages=False, read_message_history=True, attach_files=True),
mod.author: disnake.PermissionOverwrite(read_messages=True, send_messages=True, read_message_history=True, attach_files=True),
member: disnake.PermissionOverwrite(read_messages=True, send_messages=False, read_message_history=True, attach_files=True),
self.client.everyone: disnake.PermissionOverwrite(read_messages=False, send_messages=False, read_message_history=False)},
category=ctx.guild.get_channel(1220744958961778850)
)
view = disnake.ui.View(timeout=86400)
btnatt = disnake.ui.Button(label="⚔️", style=disnake.ButtonStyle.primary)
view.add_item(btnatt)
btndef = disnake.ui.Button(label="⚰️", style=disnake.ButtonStyle.primary)
view.add_item(btndef)
btnacc = disnake.ui.Button(label="", style=disnake.ButtonStyle.primary)
view.add_item(btnacc)
btnden = disnake.ui.Button(label="", style=disnake.ButtonStyle.primary)
view.add_item(btnden)
attack = AttackClass(ctx.author, parsing_channel, mod)
defence = DefenceClass(member, parsing_channel, mod)
pin = await parsing_channel.send( f"{mod.author.mention}" ,embed = disnake.Embed(title=f"Жалоба", description = f"Вы вызвались судить {member.mention} по жалобе от {ctx.author.mention}\n\
⚔️ - Дать {ctx.author.mention} право ответа\n\
⚰️ - Дать {member.mention} право ответа\n\
✅ - Виновен\n\
❌ - Невиновен\n\
Перед закрытием дела убедитесь, что сохранили все доказательства", colour = 0x008000), view=view)
btnatt.callback = lambda mod: attack.callback(mod)
btndef.callback = lambda mod: defence.callback(mod)
btnacc.callback = lambda call: AcceptCallback(report_message, member, embed, mod, call, parsing_channel)
btnden.callback = lambda call: DenyCallback(report_message, member, embed, mod, call, parsing_channel)
await pin.pin()
if member==ctx.author:
await ctx.send(embed = disnake.Embed(description = f'Нельзя подать жалобу на самого себя!', colour = 0xFF4500), ephemeral=True)
return
mentions = []
report_channel = disnake.utils.get(ctx.guild.channels, id = 1219644036378394746)
highest = [i for i in self.client.hierarchy if i in member.roles][0]
for i in range(0, self.client.hierarchy.index(highest)):
mentions.append(f"{self.client.hierarchy[i].mention}")
if len(mentions)==0:
mentions.append(f"{self.client.me.mention}")
report_embed = disnake.Embed(title=f"**Жалоба**", colour = 0xDC143C)
report_embed.add_field(name=f"Обвинитель: ", value = f"{ctx.author.mention}\n({ctx.author.id})", inline=True)
report_embed.add_field(name=f"Обвиняемый: ", value = f"{member.mention}\n({member.id})", inline=True)
report_embed.add_field(name=f"Причина: ", value = f"```{reason}```", inline=False)
view = disnake.ui.View(timeout=86400)
btn = disnake.ui.Button(label="", style=disnake.ButtonStyle.primary)
view.add_item(btn)
report_embed.set_thumbnail(url=member.avatar)
report_message = await report_channel.send(", ".join(mentions), embed = report_embed, view=view)
btn.callback = lambda mod: ReportCallback(ctx,member,report_message,reason,report_embed,mentions,mod)
await ctx.send(embed = disnake.Embed(description = f'Жалоба на {member.mention} успешно подана', colour = 0x008000), ephemeral=True)
'''
Иерархия
'''
@commands.slash_command(description="Показывает весь персонал, по ролям и веткам", name="иерархия", administrator=True)
async def hierarchy(self, ctx: disnake.AppCmdInter, branchid: int = commands.Param(description="Укажите id ветки, в которой вам нужна иерархия", name="ветвь", default=None),
devmod: bool = commands.Param(description="Показывать подробную информацию?", name="devmode", default=False)):
async with self.DataBaseManager.session() as session:
async with session.begin():
if branchid is None:
stmt = (
self.DataBaseManager.select(self.DataBaseManager.model_classes['staff_branches'])
.options(
self.DataBaseManager.selectinload(self.DataBaseManager.model_classes['staff_branches'].roles)
)
.order_by(
self.DataBaseManager.model_classes['staff_branches'].layer.asc()
)
)
branches = (await session.execute(stmt)).scalars().all()
for branch in branches:
branch.roles.sort(key=lambda role: role.layer)
embed = disnake.Embed(title = f"", description = f"# Общая иерархия\n", colour = 0xff9900)
for branchcounter, branch in enumerate(branches, start=1):
embed.description += f"## {branchcounter}) {branch.purpose}"
if devmod:
embed.description += f" id:({branch.id}) layer:({branch.layer})"
embed.description += "\n"
for rolecounter, role in enumerate(branch.roles, start=1):
embed.description += f"### {rolecounter}. <@&{role.id}>"
if devmod:
embed.description += f" layer:({role.layer})"
embed.description += "\n"
await ctx.send(embed = embed)
else:
branch = await session.get(self.DataBaseManager.model_classes['staff_branches'], branchid)
stmt = (
self.DataBaseManager.select(self.DataBaseManager.model_classes['staff_branches'])
.options(
self.DataBaseManager.selectinload(self.DataBaseManager.model_classes['staff_branches'].roles)
.selectinload(self.DataBaseManager.model_classes['staff_roles'].users)
)
.where(
self.DataBaseManager.model_classes['staff_branches'].id == branchid
)
)
branch = (await session.execute(stmt)).scalars().first()
if branch is None:
await ctx.send(embed = disnake.Embed(description = f'Ветви с идентификатором {branchid} не существует', colour = 0xff9900))
return 1
branch.roles.sort(key=lambda role: role.layer)
embed = disnake.Embed(title = f"", description = f"# Иерархия по ветви {branch.purpose}\n", colour = 0xff9900)
for rolecounter, role in enumerate(branch.roles, start=1):
embed.description += f"## {rolecounter}) <@&{role.id}>"
if devmod:
embed.description += f" layer:({role.layer})"
embed.description += "\n"
for membercounter, user in enumerate(role.users, start=1):
embed.description += f"### - <@{user.user_id}>"
if devmod:
embed.description += f" update_time:(<t:{int(user.update_time)}:R>)"
embed.description += "\n"
await ctx.send(embed = embed)

View File

@@ -0,0 +1,24 @@
constants = {
"krekchat": 490445877903622144,
"sponsors": [648932777755934740, 926021742151999509, 959416622144180265, 995602360325902386, 995602360325902387,
995602360325902388, 995602360325902389, 712751688741814343, 1244009309000437780, 1285667409365176372],
"mutes": [1219615791750709268, #text
1219615979185770597],#voice
"ban_role": 1219276355875639407,
"me": 887696340920963072,
"moder": 490712181927837722,
"curator": 490712205445169162,
"everyone": 490445877903622144,
"staff": 1229765545310818377,
"level_roles": [None, 490707199526567946, 490707605711486985, 490707615429689375, 490707634710642700],
"moderators": [490712181927837722, 490712205445169162],
"hierarchy": [490712205445169162, 490712181927837722, 490445877903622144],
# logs_channels
"mutelog_channel": 1219644125469474889,
"banlog_channel": 1219644103973671035,
"warnlog_channel": 1219644151184887838,
"reprimandlog_channel": 1380593220513173534,
"bots_talk_protocol_channel": 1376233239202758827,
"databases_backups_channel": 1382363252683706448,
}

206
src/database/db_classes.py Normal file
View File

@@ -0,0 +1,206 @@
from sqlalchemy import Column, Integer, BigInteger, Text, Float, ForeignKey, UniqueConstraint, MetaData, Boolean
from sqlalchemy.orm import declarative_base, relationship, Mapped, mapped_column
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, relationship, DeclarativeBase
from sqlalchemy.schema import CreateTable
from sqlalchemy.sql import text
import datetime
from typing import Annotated
class Base(DeclarativeBase):
def get_table_name(self):
return self.__tablename__
def to_dict(self, exclude: list[str] = None):
"""Конвертирует модель в словарь, исключая указанные поля."""
if exclude is None:
exclude = []
return {
c.name: getattr(self, c.name)
for c in self.__table__.columns
if c.name not in exclude
}
discord_identificator_pk = Annotated[int, mapped_column(BigInteger, primary_key=True, nullable=False, index = True)]
identificator_pk = Annotated[int, mapped_column(Integer, primary_key=True, nullable=False, autoincrement=True, index = True)]
discord_identificator = Annotated[int, mapped_column(BigInteger, nullable=False, index=True)]
# Модели для системы наказаний
class PunishmentTextMute(Base):
__tablename__ = 'punishment_mutes_text'
id: Mapped[identificator_pk]
user_id: Mapped[discord_identificator]
reason: Mapped[str | None] = mapped_column(Text)
time_end: Mapped[float | None] = mapped_column(Float)
time_warn: Mapped[float | None] = mapped_column(Float)
time_begin: Mapped[float] = mapped_column(Float, nullable=False, server_default=text("EXTRACT(EPOCH FROM NOW())"))
moderator_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True, index=True, default = None)
class PunishmentVoiceMute(Base):
__tablename__ = 'punishment_mutes_voice'
id: Mapped[identificator_pk]
user_id: Mapped[discord_identificator]
reason: Mapped[str | None] = mapped_column(Text)
time_end: Mapped[float | None] = mapped_column(Float)
time_warn: Mapped[float | None] = mapped_column(Float)
time_begin: Mapped[float] = mapped_column(Float, nullable=False, server_default=text("EXTRACT(EPOCH FROM NOW())"))
moderator_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True, index=True, default = None)
class PunishmentWarn(Base):
__tablename__ = 'punishment_warns'
id: Mapped[identificator_pk]
user_id: Mapped[discord_identificator]
reason: Mapped[str | None] = mapped_column(Text)
time_warn: Mapped[float] = mapped_column(Float)
time_begin: Mapped[float] = mapped_column(Float, nullable=False, server_default=text("EXTRACT(EPOCH FROM NOW())"))
moderator_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True, index=True, default = None)
class PunishmentBan(Base):
__tablename__ = 'punishment_bans'
id: Mapped[identificator_pk]
user_id: Mapped[discord_identificator]
reason: Mapped[str | None] = mapped_column(Text)
time_end: Mapped[float | None] = mapped_column(Float)
time_begin: Mapped[float] = mapped_column(Float, nullable=False, server_default=text("EXTRACT(EPOCH FROM NOW())"))
moderator_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True, index=True, default = None)
class PunishmentPerm(Base):
__tablename__ = 'punishment_perms'
id: Mapped[identificator_pk]
user_id: Mapped[discord_identificator]
reason: Mapped[str | None] = mapped_column(Text)
time_begin: Mapped[float] = mapped_column(Float, nullable=False, server_default=text("EXTRACT(EPOCH FROM NOW())"))
moderator_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True, index=True, default = None)
class PunishmentReprimand(Base):
__tablename__ = 'punishment_reprimands'
id: Mapped[identificator_pk]
user_id: Mapped[discord_identificator]
reason: Mapped[str | None] = mapped_column(Text)
time_warn: Mapped[float] = mapped_column(Float)
branch_id: Mapped[int] = mapped_column(ForeignKey('staff_branches.id', ondelete='CASCADE'))
time_begin: Mapped[float] = mapped_column(Float, nullable=False, server_default=text("EXTRACT(EPOCH FROM NOW())"))
designated_user_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True, index=True, default = None)
# Модели для системы персонала
class StaffBranch(Base):
__tablename__ = 'staff_branches'
id: Mapped[identificator_pk]
layer: Mapped[int] = mapped_column(Integer, nullable=False)
purpose: Mapped[str] = mapped_column(Text)
is_admin: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
is_moder: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
roles: Mapped[list["StaffRole"]] = relationship(
back_populates="branch",
primaryjoin="StaffBranch.id==StaffRole.branch_id"
)
users: Mapped[list["StaffUserRole"]] = relationship(
back_populates="branch",
primaryjoin="StaffBranch.id==StaffUserRole.branch_id"
)
curations: Mapped[list["StaffCuration"]] = relationship(
back_populates="branch",
primaryjoin="StaffBranch.id==StaffCuration.branch_id"
)
class StaffRole(Base):
__tablename__ = 'staff_roles'
id: Mapped[discord_identificator_pk]
layer: Mapped[int] = mapped_column(Integer, nullable=False)
staff_salary: Mapped[int] = mapped_column(Integer, nullable=False)
branch_id: Mapped[int] = mapped_column(ForeignKey('staff_branches.id', ondelete='CASCADE'))
branch: Mapped["StaffBranch"] = relationship(
back_populates="roles",
primaryjoin="StaffRole.branch_id==StaffBranch.id"
)
users: Mapped[list["StaffUserRole"]] = relationship(
back_populates="role",
primaryjoin="StaffRole.id==StaffUserRole.role_id"
)
class StaffUser(Base):
__tablename__ = 'staff_users'
id: Mapped[discord_identificator_pk]
roles: Mapped[list["StaffUserRole"]] = relationship(back_populates="user")
curators: Mapped[list["StaffCuration"]] = relationship(
back_populates="apprentice",
primaryjoin="StaffUser.id==StaffCuration.apprentice_id"
)
apprentices: Mapped[list["StaffCuration"]] = relationship(
back_populates="curator",
primaryjoin="StaffUser.id==StaffCuration.curator_id"
)
class StaffUserRole(Base):
__tablename__ = 'staff_users_roles'
id: Mapped[identificator_pk]
user_id: Mapped[int] = mapped_column(ForeignKey('staff_users.id', ondelete='CASCADE'))
role_id: Mapped[int] = mapped_column(ForeignKey('staff_roles.id', ondelete='CASCADE'))
branch_id: Mapped[int] = mapped_column(ForeignKey('staff_branches.id', ondelete='CASCADE'))
description: Mapped[str | None] = mapped_column(Text)
update_time: Mapped[float] = mapped_column(Float,
server_default=text("EXTRACT(EPOCH FROM NOW())"),
server_onupdate=text("EXTRACT(EPOCH FROM NOW())")
)
user: Mapped["StaffUser"] = relationship(back_populates="roles")
branch: Mapped["StaffBranch"] = relationship(back_populates="users")
role: Mapped["StaffRole"] = relationship(back_populates="users")
__table_args__ = (
UniqueConstraint('user_id', 'branch_id', name='uq_user_branch'),
)
@classmethod
async def create_with_auto_branch(cls, session, user_id: int, role_id: int, **kwargs):
# Получаем роль
role = await session.get(StaffRole, role_id)
if not role:
raise ValueError("Роль не найдена")
return cls(user_id=user_id, role_id=role_id, branch_id=role.branch_id, **kwargs)
class StaffCuration(Base):
__tablename__ = 'staff_curation'
id: Mapped[identificator_pk]
apprentice_id: Mapped[int] = mapped_column(ForeignKey('staff_users.id', ondelete='CASCADE'))
curator_id: Mapped[int] = mapped_column(ForeignKey('staff_users.id', ondelete='CASCADE'))
branch_id: Mapped[int] = mapped_column(ForeignKey('staff_branches.id', ondelete='CASCADE'))
apprentice: Mapped["StaffUser"] = relationship(
back_populates="curators",
foreign_keys=[apprentice_id]
)
curator: Mapped["StaffUser"] = relationship(
back_populates="apprentices",
foreign_keys=[curator_id]
)
branch: Mapped["StaffBranch"] = relationship(back_populates="curations")
__table_args__ = (
UniqueConstraint('apprentice_id', 'curator_id', 'branch_id', name='uq_apprentice_curator_branch'),
)
all_data = {
'base': Base
}

View File

@@ -0,0 +1,17 @@
from database.settings.db_settings import *
class Settings:
def __init__(self):
self.DB_HOST = DB_HOST
self.DB_PORT = DB_PORT
self.DB_USER = DB_USER
self.DB_PASS = DB_PASS
self.DB_NAME = DB_NAME
@property
def DB_URL(self):
return f"postgresql+asyncpg://{self.DB_USER}:{self.DB_PASS}@{self.DB_HOST}:{self.DB_PORT}/{self.DB_NAME}"
settings = Settings()

View File

@@ -0,0 +1,137 @@
import disnake
from disnake.ext import commands
from disnake.ext import tasks
from typing import Optional, Union, List, Dict, Any, AsyncIterator, Tuple
import datetime
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import joinedload, selectinload, contains_eager, aliased
from sqlalchemy import select, delete, insert, update
from sqlalchemy import func, asc, desc, false, tuple_
from sqlalchemy import and_, or_, not_
import subprocess
import os
from types import SimpleNamespace
class Model:
def __init__(self, model):
self.model = model
self.m = model
async def __aenter__(self):
return self.model
async def __aexit__(self, exc_type, exc_val, exc_tb):
return None
class DatabaseManager:
def __init__(self, engine, tables_data):
self.engine = engine
self.session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
self.metadata = tables_data['base'].metadata
self.model_classes = {}
tables = self.metadata.tables
for table_name, table in tables.items():
model_class = next((cls for cls in tables_data['base'].__subclasses__() if cls.__tablename__ == table_name), None)
if model_class:
#print(f"Table: {table_name}, Model Class: {model_class.__name__}")
self.model_classes[table_name] = model_class
self.models = {table_name: Model(model) for table_name, model in self.model_classes.items()}
self.tables_data = tables_data
#функции sqlalchemy
self.joinedload = joinedload
self.selectinload = selectinload
self.contains_eager = contains_eager
self.aliased = aliased
self.select = select
self.delete = delete
self.insert = insert
self.update = update
self.func = func
self.desc = desc
self.asc = asc
self.false = false
self.tuple_ = tuple_
self.and_ = and_
self.or_ = or_
self.not_ = not_
#ошибки sqlalchemy
exceptions = SimpleNamespace(
IntegrityError = IntegrityError
)
async def close(self):
await self.engine.dispose()
async def pg_dump(self, echo=False, backup_file='src/backups/discord_moderation_bot_backup.sql'):
conn = await self.engine.connect()
db_name = self.engine.url.database
user = self.engine.url.username
host = self.engine.url.host
port = self.engine.url.port
password = self.engine.url.password
os.environ['PGPASSWORD'] = password
command = [
'pg_dump',
'-h', host,
'-p', str(port),
'-U', user,
'-F', 'p', # <-- plain text SQL
] + (['-v'] if echo else []) + [
'-f', backup_file,
db_name
]
try:
subprocess.run(command, check=True)
if echo:
print(f"{datetime.datetime.now():%H:%M:%S %d-%m-%Y} :: SQL backup of '{db_name}' created.")
except subprocess.CalledProcessError as e:
print(f"{datetime.datetime.now():%H:%M:%S %d-%m-%Y} :: Backup failed: {e}")
finally:
await conn.close()
return backup_file
async def pg_restore(self, echo = False, backup_file = 'backups/backup_file.backup'):
conn = await self.DataBaseManager.engine.connect()
db_name = self.DataBaseManager.engine.url.database
user = self.DataBaseManager.engine.url.username
host = self.DataBaseManager.engine.url.host
port = self.DataBaseManager.engine.url.port
password = self.DataBaseManager.engine.url.password
os.environ['PGPASSWORD'] = password # Установка пароля для подключения
command = [
'pg_restore',
'-h', host,
'-p', str(port),
'-U', user,
'-d', db_name, # Имя базы данных, в которую будет восстановлено
] + ([
'-v' # Подробный вывод
] if echo else []) + [
backup_file # Путь к файлу резервной копии
]
try:
subprocess.run(command, check=True)
if echo:
print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Database '{db_name}' restored successfully from '{backup_file}'.")
except subprocess.CalledProcessError as e:
print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Error during restore: {e}")
finally:
await conn.close()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass

View File

@@ -0,0 +1,239 @@
try:
import aiosqlite
import disnake
from disnake.ext import commands
from disnake.ext import tasks
except:
import pip
pip.main(['install', 'disnake'])
pip.main(['install', 'aiosqlite'])
import disnake
from disnake.ext import commands
from disnake.ext import tasks
import aiosqlite
from typing import Optional, Union, List, Dict, Any, AsyncIterator, Tuple
from asyncio import Lock
import datetime
class DatabaseManager:
"""Расширенное соединение с БД с поддержкой транзакций и удобных методов"""
def __init__(self, connection: aiosqlite.Connection):
self._connection = connection
self._transaction_depth = 0
self._closed = False
self._transaction_lock = Lock()
self.last_error = None
@classmethod
async def connect(cls, database: str, **kwargs) -> 'DatabaseManager':
"""Альтернатива конструктору для подключения"""
connection = await aiosqlite.connect(database, **kwargs)
return cls(connection)
async def UpdateBD(self, table: str, *, change: dict, where: dict, whereandor = "AND"):
request = ()
change_request = []
for i in change.keys():
change_request.append(f"{i} = ?")
request = request + (change[i],)
where_request = []
for i in where.keys():
where_request.append(f"{i} = ?")
request = request + (where[i],)
await self.execute('UPDATE {table} SET {change} WHERE {where}'
.format(table = table, change = ", ".join(change_request), where = f" {whereandor} ".join(where_request)), request)
return 0
async def SelectBD(self, table: str, *, select: list = ["*"], where: dict = None, where_ops: dict = None, whereandor = "AND", order_by: str = None, limit: int = None):
where_combined = {}
if where:
where_combined.update({f"{k} =": v for k, v in where.items()})
if where_ops:
where_combined.update(where_ops)
request = ()
where_clauses = ""
for condition, value in where_combined.items():
field_op = condition.split()
field = field_op[0]
op = "=" if len(field_op) == 1 else field_op[1]
if where_clauses == "":
where_clauses= where_clauses + f"{field} {op} ? "
else:
where_clauses= where_clauses + f"{whereandor} {field} {op} ? "
request += (value,)
query = "SELECT {select} FROM {table}".format(
select=", ".join(select),
table=table
)
if where_clauses:
query += f" WHERE {where_clauses}"
if order_by:
query += f" ORDER BY {order_by}"
if limit:
query += f" LIMIT {limit}"
async with await self.execute(query, request) as cursor:
return [i for i in await cursor.fetchall()]
async def GetStaffJoins():
query = \
"""
SELECT sur.userid, sur.roleid, sur.description, sur.starttime, sr.staffsalary, sbr.layer as rolelayer, sbr.branchid, sb.layer as branchlayer, sb.purpose
FROM staff_users_roles AS sur
JOIN staff_roles as sr ON sr.roleid = sur.roleid
JOIN staff_branches_roles as sbr ON sbr.roleid = sur.roleid
JOIN staff_branches as sb ON sb.branchid = sbr.branchid
ORDER BY branchlayer ASC, rolelayer ASC;
"""
async with await self.execute(query, request) as cursor:
answer = [i for i in await cursor.fetchall()]
result = [{'userid': userid, 'roleid': roleid, 'description': description, 'starttime': starttime, 'staffsalary': staffsalary, 'rolelayer': rolelayer, 'branchid': branchid, 'branchlayer': branchlayer, 'purpose': purpose} for userid, roleid, description, starttime, staffsalary, rolelayer, branchid, branchlayer, purpose in answer]
return result
async def DeleteBD(self, table: str, *, where: dict, whereandor = "AND"):
request = ()
where_request = []
for i in where.keys():
where_request.append(f"{i} = ?")
request = request + (where[i],)
await self.execute("DELETE FROM {table} where {where}"
.format(table = table, where = f" {whereandor} ".join(where_request)), request)
return 0
async def InsertBD(self, table: str, *, data: dict):
request = ()
keys = list(data.keys())
qstring = []
for i in keys:
request = request + (data[i],)
qstring.append("?")
await self.execute("INSERT INTO {table}({keys}) VALUES({values})"
.format(table = table, keys = ", ".join(keys), values = ", ".join(qstring)), request)
return 0
async def execute(self, sql: str, parameters: Optional[Union[Tuple, Dict]] = None, **kwargs) -> aiosqlite.Cursor:
"""
Универсальный execute, который автоматически определяет:
- Нужно ли начинать транзакцию (для INSERT/UPDATE/DELETE вне транзакции)
- Работает ли уже внутри транзакции (не создаёт вложенные транзакции)
"""
is_modifying = sql.strip().upper().startswith(("INSERT", "UPDATE", "DELETE"))
# Если это модифицирующий запрос И мы НЕ внутри транзакции
if is_modifying and self._transaction_depth == 0:
async with self: # Автоматические begin/commit
cursor = await self._connection.execute(sql, parameters or (), **kwargs)
await cursor.close() # Важно: закрываем курсор для COMMIT
return cursor
else:
# Для SELECT или работы внутри существующей транзакции
return await self._connection.execute(sql, parameters or (), **kwargs)
async def fetch_all(self, sql: str, parameters: Optional[Union[Tuple, Dict]] = None) -> List[Tuple]:
"""Выполняет запрос и возвращает все строки"""
async with await self.execute(sql, parameters) as cursor:
return await cursor.fetchall()
async def fetch_one(self, sql: str, parameters: Optional[Union[Tuple, Dict]] = None) -> Optional[Tuple]:
"""Выполняет запрос и возвращает первую строку"""
async with await self.execute(sql, parameters) as cursor:
return await cursor.fetchone()
async def fetch_val(self, sql: str, parameters: Optional[Union[Tuple, Dict]] = None, column: int = 0) -> Any:
"""Возвращает значение из первого столбца"""
row = await self.fetch_one(sql, parameters)
return row[column] if row else None
async def insert(self, table: str, data: Dict[str, Any], on_conflict: str = None) -> int:
"""Упрощенный INSERT с поддержкой ON CONFLICT"""
keys = data.keys()
values = list(data.values())
sql = f"""
INSERT INTO {table} ({', '.join(keys)})
VALUES ({', '.join(['?']*len(keys))})
"""
if on_conflict:
sql += f" ON CONFLICT {on_conflict}"
await self.execute(sql, values)
return self.lastrowid
async def update(self, table: str, where: Dict[str, Any], changes: Dict[str, Any], where_operator: str = "AND") -> int:
"""Упрощенный UPDATE с автоматическим построением WHERE"""
set_clause = ", ".join([f"{k} = ?" for k in changes.keys()])
where_clause = f" {where_operator} ".join([f"{k} = ?" for k in where.keys()])
sql = f"""
UPDATE {table}
SET {set_clause}
WHERE {where_clause}
"""
result = await self.execute(sql, [*changes.values(), *where.values()])
return result.rowcount
async def begin(self):
"""Начать транзакцию (с поддержкой вложенности)"""
async with self._transaction_lock:
if self._transaction_depth == 0:
await self._connection.execute("BEGIN IMMEDIATE")
self._transaction_depth += 1
async def commit(self):
"""Зафиксировать транзакцию"""
async with self._transaction_lock:
if self._transaction_depth == 1:
await self._connection.commit()
self._transaction_depth = max(0, self._transaction_depth - 1)
async def rollback(self):
"""Откатить транзакцию"""
async with self._transaction_lock:
if self._transaction_depth > 0:
await self._connection.rollback()
self._transaction_depth = 0
async def close(self) -> None:
"""Безопасное закрытие соединения с учётом транзакций"""
async with self._transaction_lock:
try:
# Откатываем активную транзакцию, если есть
if self._transaction_depth > 0:
await self._connection.rollback()
self._transaction_depth = 0
# Закрываем соединение
if hasattr(self._connection, '_connection'): # Проверка внутреннего состояния
await self._connection.close()
except Exception as e:
self.last_error = f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Ошибка при закрытии соединения: {e}"
print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Ошибка при закрытии соединения: {e}")
finally:
# Помечаем соединение как закрытое
self._closed = True
async def __aenter__(self):
await self.begin() # Используем собственный метод begin
return self # Возвращаем сам менеджер, а не соединение
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
await self.commit()
else:
self.last_error = f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Во время записи в бд произошла ошибка: {exc_type}({exc_val}): {exc_tb.tb_frame.f_code.co_filename}(строка {exc_tb.tb_lineno})!"
print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Во время записи в бд произошла ошибка: {exc_type}({exc_val}): {exc_tb.tb_frame.f_code.co_filename}(строка {exc_tb.tb_lineno})!")
await self.rollback()

103
src/test.py Normal file
View File

@@ -0,0 +1,103 @@
try:
import disnake
from disnake.ext import commands
from disnake.ext import tasks
import requests
import numpy as np
import aiohttp
#from colorthief import ColorThief
except:
import pip
pip.main(['install', 'disnake'])
#pip.main(['install', 'matplotlib'])
pip.main(['install', 'requests'])
#pip.main(['install', 'Pillow'])
pip.main(['install', 'numpy'])
pip.main(['install', 'aiohttp'])
#pip.main(['install', 'colorthief'])
import disnake
from disnake.ext import commands
from disnake.ext import tasks
import numpy as np
import requests
import aiohttp
#from colorthief import ColorThief
import asyncio
import sys
import os
import copy
import datetime
import math
import random
import json
import shutil
from constants.global_constants import *
from data.TOKENS import TOKENS
import CoreMod
async def main():
stop_event = asyncio.Event()
sup_bot = None
DataBase = None
all_bots = []
try:
DataBase = await CoreMod.init_db()
#sup_bot = CoreMod.MainBot(DataBase, stop_event)
sup_bot = CoreMod.AnyBots(DataBase)
all_bots = [sup_bot]
#НЕ СМЕЙ РАСКОММЕНТИРОВАТЬ
#await CoreMod.db_migration(DataBase)
'''
users = [
DataBase.model_classes['staff_users'](id = 78173123),
DataBase.model_classes['staff_users'](id = 6345345)
]
async with DataBase.session() as session:
async with session.begin():
#session.add_all(users)
stmt = CoreMod.select(DataBase.model_classes['staff_users']).where(DataBase.model_classes['staff_users'].id == 78173123)
result = (await session.execute(stmt)).scalars().all()
for i in result:
await session.delete(i)
print(result)
'''
# Загрузка когов
sup_bot.load_extension("cogs.resetsupcommands")
sup_bot.load_extension("cogs.moderators")
sup_bot.load_extension("cogs.users")
sup_bot.load_extension("cogs.administrators")
# Запуск монитора остановки и ботов
monitor_task = asyncio.create_task(CoreMod.monitor_stop(stop_event, all_bots))
bot_tasks = [
asyncio.create_task(CoreMod.run_bot(sup_bot, TOKENS["KrekSupBot"], stop_event)),
]
await asyncio.gather(*bot_tasks, monitor_task)
except KeyboardInterrupt:
print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Боты остановлены по запросу пользователя")
except Exception as e:
print(f"{datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')}:: Произошла критическая ошибка: {e}")
finally:
# Остановка всех ботов
stop_event.set()
for bot in all_bots:
if not bot.is_closed():
await bot.close()
if DataBase is not None:
await DataBase.close()
if __name__ == "__main__":
asyncio.run(main())