Module graia.broadcast
Expand source code
import asyncio
import sys
import traceback
from typing import (
Callable,
Dict,
Generator,
Iterable,
List,
Set,
Type,
Union,
)
from iterwrapper import IterWrapper as iw
from graia.broadcast.entities.track_log import TrackLog, TrackLogType
from .typing import T_Dispatcher
from .entities.exectarget import ExecTarget
from .interfaces.dispatcher import DispatcherInterface
from .entities.decorator import Decorator
from .entities.dispatcher import BaseDispatcher
from .entities.event import BaseEvent
from .entities.listener import Listener
from .entities.namespace import Namespace
from .entities.signatures import Force, RemoveMe
from .exceptions import (
DisabledNamespace,
ExecutionStop,
ExistedNamespace,
InvaildEventName,
PropagationCancelled,
RegisteredEventListener,
RequirementCrashed,
UnexistedNamespace,
)
from .interfaces.decorator import DecoratorInterface
from .utilles import (
argument_signature,
dispatcher_mixin_handler,
group_dict,
printer,
run_always_await_safely,
cached_isinstance,
)
from .typing import T_Dispatcher
class Broadcast:
loop: asyncio.AbstractEventLoop
default_namespace: Namespace
namespaces: List[Namespace]
listeners: List[Listener]
dispatcher_interface: DispatcherInterface
debug_flag: bool
def __init__(
self,
*,
loop: asyncio.AbstractEventLoop = None,
debug_flag: bool = False,
):
self.loop = loop or asyncio.get_event_loop()
self.default_namespace = Namespace(name="default", default=True)
self.debug_flag = debug_flag
self.namespaces = []
self.listeners = []
self.dispatcher_interface = DispatcherInterface(self)
self.dispatcher_interface.execution_contexts[0].dispatchers.insert(
0, DecoratorInterface(self.dispatcher_interface)
)
@self.dispatcher_interface.inject_global_raw
async def _(interface: DispatcherInterface):
if interface.annotation is interface.event.__class__:
return interface.event
elif interface.annotation is Broadcast:
return interface.broadcast
elif interface.annotation is DispatcherInterface:
return interface
def default_listener_generator(self, event_class) -> Iterable[Listener]:
return (
iw(self.listeners)
.filter(lambda x: not x.namespace.hide) # filter for hide
.filter(lambda x: not x.namespace.disabled) # filter for disabled
.filter(lambda x: event_class in x.listening_events)
# .collect(list) # collect to a whole list
)
async def layered_scheduler(
self, listener_generator: Generator[Listener, None, None], event: BaseEvent
):
grouped: Dict[int, List[Listener]] = group_dict(
listener_generator, lambda x: x.priority
)
for _, current_group in sorted(grouped.items(), key=lambda x: x[0]):
coros = [self.Executor(target=i, event=event) for i in current_group]
done_tasks, _ = await asyncio.wait(coros)
for task in done_tasks:
if task.exception().__class__ is PropagationCancelled:
break
async def Executor(
self,
target: Union[Callable, ExecTarget],
event: BaseEvent,
dispatchers: List[
Union[
Type[BaseDispatcher],
Callable,
BaseDispatcher,
]
] = None,
post_exception_event: bool = False,
print_exception: bool = True,
enableInternalAccess: bool = False,
):
from .builtin.event import ExceptionThrowed
is_exectarget = cached_isinstance(target, ExecTarget)
is_listener = cached_isinstance(target, Listener)
if is_listener:
if target.namespace.disabled:
raise DisabledNamespace(
"catched a disabled namespace: {0}".format(target.namespace.name)
)
target_callable = target.callable if is_exectarget else target
parameter_compile_result = {}
complete_finished = False
track_logs: TrackLog = TrackLog()
async with self.dispatcher_interface.start_execution(
event,
[
*(dispatchers or []),
*(target.namespace.injected_dispatchers if is_listener else []),
*(target.inline_dispatchers if is_exectarget else []),
*dispatcher_mixin_handler(event.Dispatcher),
],
track_logs,
) as dii:
if enableInternalAccess or (
is_exectarget and target.enable_internal_access
):
@dii.inject_execution_raw
async def _(interface: DispatcherInterface):
if interface.annotation is target.__class__:
return target
elif interface.annotation is Namespace and is_listener:
return target.namespace
await dii.exec_lifecycle("beforeExecution")
try:
await dii.exec_lifecycle("beforeDispatch")
if is_exectarget:
if target.maybe_failure:
initial_path = dii.init_dispatch_path()
for name, annotation, default in argument_signature(
target_callable
):
if (
target.param_paths.setdefault(name, initial_path)
is initial_path
):
target.param_paths[name + "$set"] = set()
parameter_compile_result[name] = await dii.lookup_param(
name, annotation, default, target.param_paths[name]
)
else:
for name, annotation, default in argument_signature(
target_callable
):
parameter_compile_result[
name
] = await dii.lookup_param_without_log(
name, annotation, default, target.param_paths[name]
)
for hl_d in target.headless_decorators:
await dii.lookup_param_without_log(None, None, hl_d)
else:
for name, annotation, default in argument_signature(
target_callable
):
parameter_compile_result[
name
] = await dii.lookup_param_without_log(
name, annotation, default, target.param_paths[name]
)
complete_finished = True
result = await run_always_await_safely(
target_callable, **parameter_compile_result
)
except (ExecutionStop, PropagationCancelled):
raise
except RequirementCrashed:
traceback.print_exc()
raise
except Exception as e:
if print_exception:
traceback.print_exc()
if post_exception_event:
self.postEvent(ExceptionThrowed(exception=e, event=event))
raise
finally:
_, exception, tb = sys.exc_info()
await dii.exec_lifecycle("afterDispatch", exception, tb)
await dii.exec_lifecycle("afterTargetExec", exception, tb)
await dii.exec_lifecycle("afterExecution", exception, tb)
if is_exectarget and not track_logs.fluent_success:
current_paths = target.param_paths
current_path: List[List["T_Dispatcher"]] = None
current_path_set: Set["T_Dispatcher"] = None
has_failures: set = set()
for log in track_logs.log:
if log[0] is TrackLogType.LookupStart:
current_path = current_paths[log[1]]
current_path_set = current_paths[log[1] + "$set"]
elif log[0] is TrackLogType.LookupEnd:
current_path = None
elif (
current_path is not None
and log[0] is TrackLogType.Result
and log[2] not in current_path_set
):
current_path[0].append(log[2])
current_path_set.add(log[2])
elif log[0] is TrackLogType.Continue:
has_failures.add(log[1])
target.maybe_failure.symmetric_difference_update(has_failures)
if result.__class__ is Force:
return result.content
if result.__class__ is RemoveMe:
if cached_isinstance(target, Listener):
if target in self.listeners:
self.listeners.pop(self.listeners.index(target))
return result
def postEvent(self, event: BaseEvent):
self.loop.create_task(
self.layered_scheduler(
listener_generator=self.default_listener_generator(event.__class__),
event=event,
)
)
@staticmethod
def event_class_generator(target=BaseEvent):
for i in target.__subclasses__():
yield i
if i.__subclasses__():
yield from Broadcast.event_class_generator(i)
@staticmethod
def findEvent(name: str):
for i in Broadcast.event_class_generator():
if i.__name__ == name:
return i
def getDefaultNamespace(self):
return self.default_namespace
def createNamespace(
self, name, *, priority: int = 0, hide: bool = False, disabled: bool = False
):
if self.containNamespace(name):
raise ExistedNamespace(name, "has been created!")
self.namespaces.append(
Namespace(name=name, priority=priority, hide=hide, disabled=disabled)
)
return self.namespaces[-1]
def removeNamespace(self, name):
if self.containNamespace(name):
for index, i in enumerate(self.namespaces):
if i.name == name:
self.namespaces.pop(index)
return
else:
raise UnexistedNamespace(name)
def containNamespace(self, name):
for i in self.namespaces:
if i.name == name:
return True
return False
def getNamespace(self, name):
if self.containNamespace(name):
for i in self.namespaces:
if i.name == name:
return i
else:
raise UnexistedNamespace(name)
def hideNamespace(self, name):
ns = self.getNamespace(name)
ns.hide = True
def unhideNamespace(self, name):
ns = self.getNamespace(name)
ns.hide = False
def disableNamespace(self, name):
ns = self.getNamespace(name)
ns.disabled = True
def enableNamespace(self, name):
ns = self.getNamespace(name)
ns.disabled = False
def containListener(self, target):
for i in self.listeners:
if i.callable == target:
return True
return False
def getListener(self, target):
for i in self.listeners:
if i.callable == target:
return i
def removeListener(self, target):
self.listeners.remove(target)
def receiver(
self,
event: Union[str, Type[BaseEvent]],
priority: int = 16,
dispatchers: List[Type[BaseDispatcher]] = [],
namespace: Namespace = None,
headless_decorators: List[Decorator] = [],
enable_internal_access: bool = False,
):
if cached_isinstance(event, str):
_name = event
event = self.findEvent(event)
if not event:
raise InvaildEventName(_name, "is not vaild!")
priority = (type(priority) == int) and priority or int(priority) # 类型转换
def receiver_wrapper(callable_target):
may_listener = self.getListener(callable_target)
if not may_listener:
self.listeners.append(
Listener(
callable=callable_target,
namespace=namespace or self.getDefaultNamespace(),
inline_dispatchers=dispatchers,
priority=priority,
listening_events=[event],
headless_decorators=headless_decorators,
enable_internal_access=enable_internal_access,
)
)
else:
if event not in may_listener.listening_events:
may_listener.listening_events.append(event)
else:
raise RegisteredEventListener(
event.__name__, "has been registered!"
)
return callable_target
return receiver_wrapper
Sub-modules
graia.broadcast.builtin
graia.broadcast.entities
graia.broadcast.exceptions
graia.broadcast.interfaces
graia.broadcast.interrupt
graia.broadcast.priority
graia.broadcast.typing
graia.broadcast.utilles
Classes
class Broadcast (*, loop: asyncio.events.AbstractEventLoop = None, debug_flag: bool = False)
-
Expand source code
class Broadcast: loop: asyncio.AbstractEventLoop default_namespace: Namespace namespaces: List[Namespace] listeners: List[Listener] dispatcher_interface: DispatcherInterface debug_flag: bool def __init__( self, *, loop: asyncio.AbstractEventLoop = None, debug_flag: bool = False, ): self.loop = loop or asyncio.get_event_loop() self.default_namespace = Namespace(name="default", default=True) self.debug_flag = debug_flag self.namespaces = [] self.listeners = [] self.dispatcher_interface = DispatcherInterface(self) self.dispatcher_interface.execution_contexts[0].dispatchers.insert( 0, DecoratorInterface(self.dispatcher_interface) ) @self.dispatcher_interface.inject_global_raw async def _(interface: DispatcherInterface): if interface.annotation is interface.event.__class__: return interface.event elif interface.annotation is Broadcast: return interface.broadcast elif interface.annotation is DispatcherInterface: return interface def default_listener_generator(self, event_class) -> Iterable[Listener]: return ( iw(self.listeners) .filter(lambda x: not x.namespace.hide) # filter for hide .filter(lambda x: not x.namespace.disabled) # filter for disabled .filter(lambda x: event_class in x.listening_events) # .collect(list) # collect to a whole list ) async def layered_scheduler( self, listener_generator: Generator[Listener, None, None], event: BaseEvent ): grouped: Dict[int, List[Listener]] = group_dict( listener_generator, lambda x: x.priority ) for _, current_group in sorted(grouped.items(), key=lambda x: x[0]): coros = [self.Executor(target=i, event=event) for i in current_group] done_tasks, _ = await asyncio.wait(coros) for task in done_tasks: if task.exception().__class__ is PropagationCancelled: break async def Executor( self, target: Union[Callable, ExecTarget], event: BaseEvent, dispatchers: List[ Union[ Type[BaseDispatcher], Callable, BaseDispatcher, ] ] = None, post_exception_event: bool = False, print_exception: bool = True, enableInternalAccess: bool = False, ): from .builtin.event import ExceptionThrowed is_exectarget = cached_isinstance(target, ExecTarget) is_listener = cached_isinstance(target, Listener) if is_listener: if target.namespace.disabled: raise DisabledNamespace( "catched a disabled namespace: {0}".format(target.namespace.name) ) target_callable = target.callable if is_exectarget else target parameter_compile_result = {} complete_finished = False track_logs: TrackLog = TrackLog() async with self.dispatcher_interface.start_execution( event, [ *(dispatchers or []), *(target.namespace.injected_dispatchers if is_listener else []), *(target.inline_dispatchers if is_exectarget else []), *dispatcher_mixin_handler(event.Dispatcher), ], track_logs, ) as dii: if enableInternalAccess or ( is_exectarget and target.enable_internal_access ): @dii.inject_execution_raw async def _(interface: DispatcherInterface): if interface.annotation is target.__class__: return target elif interface.annotation is Namespace and is_listener: return target.namespace await dii.exec_lifecycle("beforeExecution") try: await dii.exec_lifecycle("beforeDispatch") if is_exectarget: if target.maybe_failure: initial_path = dii.init_dispatch_path() for name, annotation, default in argument_signature( target_callable ): if ( target.param_paths.setdefault(name, initial_path) is initial_path ): target.param_paths[name + "$set"] = set() parameter_compile_result[name] = await dii.lookup_param( name, annotation, default, target.param_paths[name] ) else: for name, annotation, default in argument_signature( target_callable ): parameter_compile_result[ name ] = await dii.lookup_param_without_log( name, annotation, default, target.param_paths[name] ) for hl_d in target.headless_decorators: await dii.lookup_param_without_log(None, None, hl_d) else: for name, annotation, default in argument_signature( target_callable ): parameter_compile_result[ name ] = await dii.lookup_param_without_log( name, annotation, default, target.param_paths[name] ) complete_finished = True result = await run_always_await_safely( target_callable, **parameter_compile_result ) except (ExecutionStop, PropagationCancelled): raise except RequirementCrashed: traceback.print_exc() raise except Exception as e: if print_exception: traceback.print_exc() if post_exception_event: self.postEvent(ExceptionThrowed(exception=e, event=event)) raise finally: _, exception, tb = sys.exc_info() await dii.exec_lifecycle("afterDispatch", exception, tb) await dii.exec_lifecycle("afterTargetExec", exception, tb) await dii.exec_lifecycle("afterExecution", exception, tb) if is_exectarget and not track_logs.fluent_success: current_paths = target.param_paths current_path: List[List["T_Dispatcher"]] = None current_path_set: Set["T_Dispatcher"] = None has_failures: set = set() for log in track_logs.log: if log[0] is TrackLogType.LookupStart: current_path = current_paths[log[1]] current_path_set = current_paths[log[1] + "$set"] elif log[0] is TrackLogType.LookupEnd: current_path = None elif ( current_path is not None and log[0] is TrackLogType.Result and log[2] not in current_path_set ): current_path[0].append(log[2]) current_path_set.add(log[2]) elif log[0] is TrackLogType.Continue: has_failures.add(log[1]) target.maybe_failure.symmetric_difference_update(has_failures) if result.__class__ is Force: return result.content if result.__class__ is RemoveMe: if cached_isinstance(target, Listener): if target in self.listeners: self.listeners.pop(self.listeners.index(target)) return result def postEvent(self, event: BaseEvent): self.loop.create_task( self.layered_scheduler( listener_generator=self.default_listener_generator(event.__class__), event=event, ) ) @staticmethod def event_class_generator(target=BaseEvent): for i in target.__subclasses__(): yield i if i.__subclasses__(): yield from Broadcast.event_class_generator(i) @staticmethod def findEvent(name: str): for i in Broadcast.event_class_generator(): if i.__name__ == name: return i def getDefaultNamespace(self): return self.default_namespace def createNamespace( self, name, *, priority: int = 0, hide: bool = False, disabled: bool = False ): if self.containNamespace(name): raise ExistedNamespace(name, "has been created!") self.namespaces.append( Namespace(name=name, priority=priority, hide=hide, disabled=disabled) ) return self.namespaces[-1] def removeNamespace(self, name): if self.containNamespace(name): for index, i in enumerate(self.namespaces): if i.name == name: self.namespaces.pop(index) return else: raise UnexistedNamespace(name) def containNamespace(self, name): for i in self.namespaces: if i.name == name: return True return False def getNamespace(self, name): if self.containNamespace(name): for i in self.namespaces: if i.name == name: return i else: raise UnexistedNamespace(name) def hideNamespace(self, name): ns = self.getNamespace(name) ns.hide = True def unhideNamespace(self, name): ns = self.getNamespace(name) ns.hide = False def disableNamespace(self, name): ns = self.getNamespace(name) ns.disabled = True def enableNamespace(self, name): ns = self.getNamespace(name) ns.disabled = False def containListener(self, target): for i in self.listeners: if i.callable == target: return True return False def getListener(self, target): for i in self.listeners: if i.callable == target: return i def removeListener(self, target): self.listeners.remove(target) def receiver( self, event: Union[str, Type[BaseEvent]], priority: int = 16, dispatchers: List[Type[BaseDispatcher]] = [], namespace: Namespace = None, headless_decorators: List[Decorator] = [], enable_internal_access: bool = False, ): if cached_isinstance(event, str): _name = event event = self.findEvent(event) if not event: raise InvaildEventName(_name, "is not vaild!") priority = (type(priority) == int) and priority or int(priority) # 类型转换 def receiver_wrapper(callable_target): may_listener = self.getListener(callable_target) if not may_listener: self.listeners.append( Listener( callable=callable_target, namespace=namespace or self.getDefaultNamespace(), inline_dispatchers=dispatchers, priority=priority, listening_events=[event], headless_decorators=headless_decorators, enable_internal_access=enable_internal_access, ) ) else: if event not in may_listener.listening_events: may_listener.listening_events.append(event) else: raise RegisteredEventListener( event.__name__, "has been registered!" ) return callable_target return receiver_wrapper
Class variables
var debug_flag : bool
var default_namespace : Namespace
var dispatcher_interface : DispatcherInterface
var listeners : List[Listener]
var loop : asyncio.events.AbstractEventLoop
var namespaces : List[Namespace]
Static methods
def event_class_generator(target=graia.broadcast.entities.event.BaseEvent)
-
Expand source code
@staticmethod def event_class_generator(target=BaseEvent): for i in target.__subclasses__(): yield i if i.__subclasses__(): yield from Broadcast.event_class_generator(i)
def findEvent(name: str)
-
Expand source code
@staticmethod def findEvent(name: str): for i in Broadcast.event_class_generator(): if i.__name__ == name: return i
Methods
async def Executor(self, target: Union[Callable, ExecTarget], event: BaseEvent, dispatchers: List[Union[Type[BaseDispatcher], Callable, BaseDispatcher]] = None, post_exception_event: bool = False, print_exception: bool = True, enableInternalAccess: bool = False)
-
Expand source code
async def Executor( self, target: Union[Callable, ExecTarget], event: BaseEvent, dispatchers: List[ Union[ Type[BaseDispatcher], Callable, BaseDispatcher, ] ] = None, post_exception_event: bool = False, print_exception: bool = True, enableInternalAccess: bool = False, ): from .builtin.event import ExceptionThrowed is_exectarget = cached_isinstance(target, ExecTarget) is_listener = cached_isinstance(target, Listener) if is_listener: if target.namespace.disabled: raise DisabledNamespace( "catched a disabled namespace: {0}".format(target.namespace.name) ) target_callable = target.callable if is_exectarget else target parameter_compile_result = {} complete_finished = False track_logs: TrackLog = TrackLog() async with self.dispatcher_interface.start_execution( event, [ *(dispatchers or []), *(target.namespace.injected_dispatchers if is_listener else []), *(target.inline_dispatchers if is_exectarget else []), *dispatcher_mixin_handler(event.Dispatcher), ], track_logs, ) as dii: if enableInternalAccess or ( is_exectarget and target.enable_internal_access ): @dii.inject_execution_raw async def _(interface: DispatcherInterface): if interface.annotation is target.__class__: return target elif interface.annotation is Namespace and is_listener: return target.namespace await dii.exec_lifecycle("beforeExecution") try: await dii.exec_lifecycle("beforeDispatch") if is_exectarget: if target.maybe_failure: initial_path = dii.init_dispatch_path() for name, annotation, default in argument_signature( target_callable ): if ( target.param_paths.setdefault(name, initial_path) is initial_path ): target.param_paths[name + "$set"] = set() parameter_compile_result[name] = await dii.lookup_param( name, annotation, default, target.param_paths[name] ) else: for name, annotation, default in argument_signature( target_callable ): parameter_compile_result[ name ] = await dii.lookup_param_without_log( name, annotation, default, target.param_paths[name] ) for hl_d in target.headless_decorators: await dii.lookup_param_without_log(None, None, hl_d) else: for name, annotation, default in argument_signature( target_callable ): parameter_compile_result[ name ] = await dii.lookup_param_without_log( name, annotation, default, target.param_paths[name] ) complete_finished = True result = await run_always_await_safely( target_callable, **parameter_compile_result ) except (ExecutionStop, PropagationCancelled): raise except RequirementCrashed: traceback.print_exc() raise except Exception as e: if print_exception: traceback.print_exc() if post_exception_event: self.postEvent(ExceptionThrowed(exception=e, event=event)) raise finally: _, exception, tb = sys.exc_info() await dii.exec_lifecycle("afterDispatch", exception, tb) await dii.exec_lifecycle("afterTargetExec", exception, tb) await dii.exec_lifecycle("afterExecution", exception, tb) if is_exectarget and not track_logs.fluent_success: current_paths = target.param_paths current_path: List[List["T_Dispatcher"]] = None current_path_set: Set["T_Dispatcher"] = None has_failures: set = set() for log in track_logs.log: if log[0] is TrackLogType.LookupStart: current_path = current_paths[log[1]] current_path_set = current_paths[log[1] + "$set"] elif log[0] is TrackLogType.LookupEnd: current_path = None elif ( current_path is not None and log[0] is TrackLogType.Result and log[2] not in current_path_set ): current_path[0].append(log[2]) current_path_set.add(log[2]) elif log[0] is TrackLogType.Continue: has_failures.add(log[1]) target.maybe_failure.symmetric_difference_update(has_failures) if result.__class__ is Force: return result.content if result.__class__ is RemoveMe: if cached_isinstance(target, Listener): if target in self.listeners: self.listeners.pop(self.listeners.index(target)) return result
def containListener(self, target)
-
Expand source code
def containListener(self, target): for i in self.listeners: if i.callable == target: return True return False
def containNamespace(self, name)
-
Expand source code
def containNamespace(self, name): for i in self.namespaces: if i.name == name: return True return False
def createNamespace(self, name, *, priority: int = 0, hide: bool = False, disabled: bool = False)
-
Expand source code
def createNamespace( self, name, *, priority: int = 0, hide: bool = False, disabled: bool = False ): if self.containNamespace(name): raise ExistedNamespace(name, "has been created!") self.namespaces.append( Namespace(name=name, priority=priority, hide=hide, disabled=disabled) ) return self.namespaces[-1]
def default_listener_generator(self, event_class) ‑> Iterable[Listener]
-
Expand source code
def default_listener_generator(self, event_class) -> Iterable[Listener]: return ( iw(self.listeners) .filter(lambda x: not x.namespace.hide) # filter for hide .filter(lambda x: not x.namespace.disabled) # filter for disabled .filter(lambda x: event_class in x.listening_events) # .collect(list) # collect to a whole list )
def disableNamespace(self, name)
-
Expand source code
def disableNamespace(self, name): ns = self.getNamespace(name) ns.disabled = True
def enableNamespace(self, name)
-
Expand source code
def enableNamespace(self, name): ns = self.getNamespace(name) ns.disabled = False
def getDefaultNamespace(self)
-
Expand source code
def getDefaultNamespace(self): return self.default_namespace
def getListener(self, target)
-
Expand source code
def getListener(self, target): for i in self.listeners: if i.callable == target: return i
def getNamespace(self, name)
-
Expand source code
def getNamespace(self, name): if self.containNamespace(name): for i in self.namespaces: if i.name == name: return i else: raise UnexistedNamespace(name)
def hideNamespace(self, name)
-
Expand source code
def hideNamespace(self, name): ns = self.getNamespace(name) ns.hide = True
async def layered_scheduler(self, listener_generator: Generator[Listener, NoneType, NoneType], event: BaseEvent)
-
Expand source code
async def layered_scheduler( self, listener_generator: Generator[Listener, None, None], event: BaseEvent ): grouped: Dict[int, List[Listener]] = group_dict( listener_generator, lambda x: x.priority ) for _, current_group in sorted(grouped.items(), key=lambda x: x[0]): coros = [self.Executor(target=i, event=event) for i in current_group] done_tasks, _ = await asyncio.wait(coros) for task in done_tasks: if task.exception().__class__ is PropagationCancelled: break
def postEvent(self, event: BaseEvent)
-
Expand source code
def postEvent(self, event: BaseEvent): self.loop.create_task( self.layered_scheduler( listener_generator=self.default_listener_generator(event.__class__), event=event, ) )
def receiver(self, event: Union[str, Type[BaseEvent]], priority: int = 16, dispatchers: List[Type[BaseDispatcher]] = [], namespace: Namespace = None, headless_decorators: List[Decorator] = [], enable_internal_access: bool = False)
-
Expand source code
def receiver( self, event: Union[str, Type[BaseEvent]], priority: int = 16, dispatchers: List[Type[BaseDispatcher]] = [], namespace: Namespace = None, headless_decorators: List[Decorator] = [], enable_internal_access: bool = False, ): if cached_isinstance(event, str): _name = event event = self.findEvent(event) if not event: raise InvaildEventName(_name, "is not vaild!") priority = (type(priority) == int) and priority or int(priority) # 类型转换 def receiver_wrapper(callable_target): may_listener = self.getListener(callable_target) if not may_listener: self.listeners.append( Listener( callable=callable_target, namespace=namespace or self.getDefaultNamespace(), inline_dispatchers=dispatchers, priority=priority, listening_events=[event], headless_decorators=headless_decorators, enable_internal_access=enable_internal_access, ) ) else: if event not in may_listener.listening_events: may_listener.listening_events.append(event) else: raise RegisteredEventListener( event.__name__, "has been registered!" ) return callable_target return receiver_wrapper
def removeListener(self, target)
-
Expand source code
def removeListener(self, target): self.listeners.remove(target)
def removeNamespace(self, name)
-
Expand source code
def removeNamespace(self, name): if self.containNamespace(name): for index, i in enumerate(self.namespaces): if i.name == name: self.namespaces.pop(index) return else: raise UnexistedNamespace(name)
def unhideNamespace(self, name)
-
Expand source code
def unhideNamespace(self, name): ns = self.getNamespace(name) ns.hide = False