import inspect
import textwrap
from typing import (
NewType,
MutableMapping,
Callable,
Protocol,
Pattern,
Sequence,
Set,
Mapping,
Union,
Optional,
)
import re
import discord
from .members import *
__all__ = ["Manager", "Mapper", "Authorize", "AuthorizeCallBack", "RE_MARKDOWN", "ZWS"]
PatternVstr = Union[Pattern, str]
MemberIdentity = Union[int, str, Pattern, Roles, Permissions]
Message = NewType("Message", discord.Message)
Author = NewType("Author", discord.Message.author)
Auth = Union[
Members,
MembersSet,
MemberIdentity,
Set[MemberIdentity],
Sequence[MemberIdentity],
Callable[[Message], bool],
]
[docs]def Authorize(members: Auth, message: Message) -> Union[MemberIdentity, None]:
"""Authorize whether member belongs to members
Args:
members : authorized members
message : message.author to be authorized
Returns:
memberIdentity used for authorization
or None if not authorized *(member not belongs to members)*
"""
member: Author = message.author
if type(members) is MembersSet or type(members) is Members:
members = members.members
elif (
type(members) is int
or type(members) is str
or type(members) is Roles
or type(members) is Permissions
or isinstance(members, Pattern)
or isinstance(members, Callable)
):
members = {members}
for memberIdentity in members:
if type(memberIdentity) is int:
if member.id == memberIdentity:
return memberIdentity
elif type(memberIdentity) is str:
if str(member) == memberIdentity:
return memberIdentity
elif type(memberIdentity) is Roles:
# DMchannel don't have roles
if not hasattr(member, "roles"):
continue
if memberIdentity.guild_id:
guild_id = memberIdentity.guild_id
if guild_id != member.guild.id:
continue
required_roles = memberIdentity.roles
member_roles_id = [role.id for role in member.roles]
member_roles_name = [role.name for role in member.roles]
for role in required_roles:
if type(role) is int:
if role not in member_roles_id:
break
elif type(role) is str:
if role not in member_roles_name:
break
else:
return memberIdentity
elif type(memberIdentity) is Permissions:
# DMchannel don't have permissions
if not hasattr(member, "guild_permissions"):
continue
if memberIdentity.guild_id:
guild_id = memberIdentity.guild_id
if guild_id != member.guild.id:
continue
if getattr(member, "guild_permissions") >= memberIdentity:
return memberIdentity
elif isinstance(memberIdentity, Pattern):
if memberIdentity.match(str(member)):
"str(message.author) -> 'user_name#user_discriminator'"
return memberIdentity
else:
if memberIdentity(message):
return memberIdentity
return None
# fmt: off
class CallBack(Protocol):
def __call__(self, msg: Message, /, *args: str) -> Optional[str]: ...
auth: Optional[Auth] = None
has_roles: Optional[Union[Roles, Set[Roles], Sequence[Roles]]] = None
has_permissions: Optional[Union[Permissions, Set[Permissions], Sequence[Permissions]]] = None
# fmt: on
[docs]def AuthorizeCallBack(callback: CallBack, message: Message) -> bool:
"""Checks whether member is authorized to access the callback
Args:
callback : callback for which authorizing member
message : message.author to be authorized
Returns:
True if authorized else False
"""
if hasattr(callback, "auth"):
auth = getattr(callback, "auth")
if auth:
if not Authorize(auth, message):
return False
if hasattr(callback, "has_roles"):
has_roles = getattr(callback, "has_roles")
if has_roles:
if not Authorize(has_roles, message):
return False
if hasattr(callback, "has_permissions"):
has_permissions = getattr(callback, "has_permissions")
if has_permissions:
if not Authorize(has_permissions, message):
return False
return True
[docs]class Mapper:
"""To create P2F map easily using decorator
Example::
key = Mapper()
@key(re.compile(r'^! ([\\s\\S]*)$'))
def cmd1(msg, arg):
#code#
@key('cmd2','command2') # 'command2' and 'cmd2' both mapped to cmd2 callback
def cmd2(msg):
#code#
client.event(Manager(key).on_message)
"""
def __init__(self, P2F: Optional[MutableMapping[PatternVstr, CallBack]] = None):
"""
Args:
P2F : dict if you want to continue with previous P2F
or None to create new one
"""
self.P2F = dict() if P2F is None else P2F
def __call__(self, *match_with: PatternVstr) -> Callable[[CallBack], CallBack]:
"""For mapping decorated function
"""
def updateP2F(callback: CallBack) -> CallBack:
for key in match_with: # for multiple commands to a callback
self.P2F[key] = callback
return callback
return updateP2F # decorator
def __getattr__(self, attribute):
return getattr(self.P2F, attribute)
def __getitem__(self, key):
return self.P2F.__getitem__(key)
def __repr__(self):
return "Mapper(" + repr(self.P2F) + ")"
def __str__(self):
return str(self.P2F)
RE_MARKDOWN = re.compile(r"([*_~`|>])")
ZWS = "\u200B" # Unicode Character 'ZERO WIDTH SPACE' (U+200B)
[docs]class Manager:
"""To manage all commands with their respective authorized authors
"""
ContentFieldLimits = 2000
def __init__(
self,
P2F: Union[Mapping[PatternVstr, CallBack], Mapper] = {
re.compile(r"^!echo (.*)$"): lambda msg, x: x
},
authorize: bool = True,
escMD: bool = False,
listenBot: bool = False,
filter: Optional[Callable[[Message], bool]] = None,
):
"""
Args:
P2F : Pattern or Str to Function *(callback)* Map (or) :class:`Mapper` object
| if Pattern matchs with message.content
then callback is called with message, captured groups
| if Str is equal to message.content then callback is called with message
| In both the return string by callback is the reply message.
No reply if return value is None.
authorize : allow only Authorized members to execute command *(to call callback)*
| if False all callbacks *(P2F.values())* is callable by all members
| else for each command only members authorized
by :func:`AuthorizeCallBack` is allowded.
escMD : escape MarkDown while reply
| if true reply *(returned by callback)* is filtered to escape MarkDown
| else no filtering to escape MarkDown.
listenBot : allow reply to bot's message (this may leads to infinte loop of messages)
| if true bot's message can't be skiped in on_message
filter : function to filter messages
| if filter(message) is True then proceed on_message
| else skip that message and continue
"""
self.P2F = P2F
self.authorize = authorize
self.escMD = escMD
self.listenBot = listenBot
self.filter = filter
[docs] async def on_message(self, message: Message):
"""It is the on_message event listner.
set it in **client.event** like ``client.event(Manager().on_message)``
"""
if self.filter:
if not self.filter(message):
return
# Don't reply to bot's reply if listenBot disabled
if not self.listenBot and message.author.bot:
return
for cmd, callback in self.P2F.items():
if type(cmd) is str:
if cmd == message.content:
await self.execute(callback, message)
else:
match = cmd.match(message.content)
if match:
await self.execute(callback, message, *match.groups())
[docs] async def execute(self, callback: CallBack, message: Message, *args: str):
"""To execute the matched function, if the author is authorized
by :func:`AuthorizeCallBack`
Example::
def cmd(msg, *args):
#code#
cmd.auth = None
Args:
callback : function to be called if message.author is Authorized
by :func:`discordRebot.manager.AuthorizeCallBack`
message : message object from discord, having all details about a message
*args (str): arguments to pass into callback
"""
if self.authorize:
if not AuthorizeCallBack(callback, message):
denial_message = await self.on_denied(callback, message)
await self.reply(message, denial_message)
return
if inspect.iscoroutinefunction(callback):
response = await callback(message, *args)
await self.reply(message, response)
elif inspect.isasyncgenfunction(callback):
async for response in callback(message, *args):
await self.reply(message, response)
elif inspect.isgeneratorfunction(callback):
for response in callback(message, *args):
await self.reply(message, response)
else:
response = callback(message, *args)
await self.reply(message, response)
[docs] async def reply(self, message: Message, response: Union[str, None]):
"""To send the response from callback in :meth:`execute`
Args:
message : message object from discord, having all details about a message
response : response from callback
"""
if not response:
return
elif type(response) is not str:
response = str(response)
if self.escMD:
response = RE_MARKDOWN.sub(r"\\\1", response)
if len(response) > self.ContentFieldLimits:
for response in textwrap.wrap(response, self.ContentFieldLimits):
await message.channel.send(response)
else:
await message.channel.send(response)
[docs] async def on_denied(self, callback: CallBack, message: Message) -> Optional[str]:
"""Called when unauthorized and returns the denial message
Args:
callback : command which is skiped due to unauthorized
message : message object from discord, having all details about a message
Returns:
denial message
"""
return ":{ Not Permitted"
# The above Type Hints are not for type checking
# Don't do mypy since this code is more dynamic and duck-type