Реализована проверка на права суперпользователя is_admin_or_moder; Добавлен белый список людей, которым разрешены все ссылки

This commit is contained in:
2025-07-31 23:53:56 +03:00
parent 539bcba4e0
commit 3f8a240771
3 changed files with 49 additions and 47 deletions

View File

@@ -520,6 +520,10 @@ class MainBot(AnyBots):
files = files) files = files)
return 0 return 0
async with self.DataBaseManager.session() as session:
if (await self.DataBaseManager.model_classes['staff_users'].is_admin_or_moder_by_id(msg.author.id, self.DataBaseManager, session)):
return 0
def extract_root_domain(url): def extract_root_domain(url):
ext = tldextract.extract(url) ext = tldextract.extract(url)
if not ext.domain or not ext.suffix: if not ext.domain or not ext.suffix:
@@ -538,7 +542,6 @@ class MainBot(AnyBots):
link_in_wl = (await session.execute(stmt)).scalars().first() link_in_wl = (await session.execute(stmt)).scalars().first()
if link_in_wl is None: if link_in_wl is None:
print("Нарушение!!!")
await log.send(f"{msg.author.mention}({msg.author.id}) отправил в чат {msg.channel.mention} сомнительную ссылку, которой нет в вайлисте:```{msg.content}```") await log.send(f"{msg.author.mention}({msg.author.id}) отправил в чат {msg.channel.mention} сомнительную ссылку, которой нет в вайлисте:```{msg.content}```")
mess = await msg.reply(embed=self.ErrEmbed(description=f'Этой ссылки нет в белом списке. Чтобы её туда добавили, свяжитесь с разработчиком или модераторами.', colour=0xff9900)) mess = await msg.reply(embed=self.ErrEmbed(description=f'Этой ссылки нет в белом списке. Чтобы её туда добавили, свяжитесь с разработчиком или модераторами.', colour=0xff9900))
await msg.delete() await msg.delete()

View File

@@ -689,60 +689,34 @@ class ModerModule(commands.Cog):
@commands.slash_command(description="Позволяет добавить домен в белый список", name="добавить_ссылку", administrator=True) @commands.slash_command(description="Позволяет добавить домен в белый список", name="добавить_ссылку", administrator=True)
async def add_domain(self, ctx: disnake.AppCmdInter, link: str = commands.Param(description="Укажите ссылку или домен", name="ссылка")): async def add_domain(self, ctx: disnake.AppCmdInter, link: str = commands.Param(description="Укажите ссылку или домен", name="ссылка")):
async with self.DataBaseManager.session() as session: async with self.DataBaseManager.session() as session:
async with session.begin(): if not (await self.DataBaseManager.model_classes['staff_users'].is_admin_or_moder_by_id(ctx.author.id, self.DataBaseManager, session)):
staff_branches_model = self.DataBaseManager.model_classes['staff_branches'] await ctx.send(embed = self.client.ErrEmbed(description = f'У вас недостаточно полномочий, чтобы добавлять ссылку в белый лист. Обратитесь к любому модератору или разработчику.'))
аllowed_domains_model = self.DataBaseManager.model_classes['аllowed_domains'] return 1
admin_flag = False else:
stmt = ( def extract_root_domain(url):
self.DataBaseManager.select(staff_branches_model) ext = tldextract.extract(url)
.options( if not ext.domain or not ext.suffix:
self.DataBaseManager.selectinload(staff_branches_model.users) return None
) return f"{ext.domain}.{ext.suffix}".lower()
.where( new_link = extract_root_domain(link)
self.DataBaseManager.or_( if not new_link:
staff_branches_model.is_admin == True, await ctx.send(embed = self.client.ErrEmbed(description = f'Некорректная ссылка!'))
staff_branches_model.is_moder == True
)
)
)
branches = (await session.execute(stmt)).scalars().all()
for branch in branches:
for user in branch.users:
if ctx.author.id == user.user_id:
admin_flag = True
break
if admin_flag:
break
if not admin_flag:
await ctx.send(embed = self.client.ErrEmbed(description = f'У вас недостаточно полномочий, чтобы добавлять ссылку в белый лист. Обратитесь к любому модератору или разработчику.'))
return 1 return 1
else: аllowed_domains_model = self.DataBaseManager.model_classes['аllowed_domains']
def extract_root_domain(url): stmt = self.DataBaseManager.select(аllowed_domains_model).where(аllowed_domains_model.domain == new_link)
ext = tldextract.extract(url) link_in_wl = (await session.execute(stmt)).scalars().first()
if not ext.domain or not ext.suffix:
return None
return f"{ext.domain}.{ext.suffix}".lower()
new_link = extract_root_domain(link)
if not new_link:
await ctx.send(embed = self.client.ErrEmbed(description = f'Некорректная ссылка!'))
return 1
stmt = self.DataBaseManager.select(аllowed_domains_model).where(аllowed_domains_model.domain == new_link) if link_in_wl is not None:
link_in_wl = (await session.execute(stmt)).scalars().first() await ctx.send(embed = self.client.ErrEmbed(description = f'Этот домен уже есть в белом листе!'))
return 1
if link_in_wl is not None:
await ctx.send(embed = self.client.ErrEmbed(description = f'Этот домен уже есть в белом листе!'))
return 1
async with session.begin():
domain = аllowed_domains_model(domain = new_link, initiator_id = ctx.author.id) domain = аllowed_domains_model(domain = new_link, initiator_id = ctx.author.id)
session.add(domain) session.add(domain)
await ctx.send(embed = self.client.AnswEmbed(description = f'Домен {new_link} успешно добавлен в белый список')) await ctx.send(embed = self.client.AnswEmbed(description = f'Домен {new_link} успешно добавлен в белый список'))
return 1 return 0

View File

@@ -148,6 +148,31 @@ class StaffUser(Base):
primaryjoin="StaffUser.id==StaffCuration.curator_id" primaryjoin="StaffUser.id==StaffCuration.curator_id"
) )
async def is_admin_or_moder(self, DBManager, session):
return await self.__class__.is_admin_or_moder_by_id(self.id, DBManager, session)
@staticmethod
async def is_admin_or_moder_by_id(member_id, DBManager, session):
staff_branches_model = DBManager.model_classes['staff_branches']
staff_users_roles_model = DBManager.model_classes['staff_users_roles']
stmt = (
DBManager.select(staff_users_roles_model)
.join(staff_branches_model, staff_branches_model.id == staff_users_roles_model.branch_id)
.where(
DBManager.and_(
DBManager.or_(
staff_branches_model.is_admin == True,
staff_branches_model.is_moder == True,
),
staff_users_roles_model.user_id == member_id
)
)
)
result = await session.execute(stmt)
return result.scalars().first() is not None
class StaffUserRole(Base): class StaffUserRole(Base):
__tablename__ = 'staff_users_roles' __tablename__ = 'staff_users_roles'