Module graia.broadcast

Expand source code
import asyncio
import sys
import traceback
from typing import (
    Callable,
    Dict,
    Generator,
    Iterable,
    List,
    Set,
    Type,
    Union,
)

from iterwrapper import IterWrapper as iw

from graia.broadcast.entities.track_log import TrackLog, TrackLogType

from .typing import T_Dispatcher

from .entities.exectarget import ExecTarget

from .interfaces.dispatcher import DispatcherInterface
from .entities.decorator import Decorator
from .entities.dispatcher import BaseDispatcher
from .entities.event import BaseEvent
from .entities.listener import Listener
from .entities.namespace import Namespace
from .entities.signatures import Force, RemoveMe
from .exceptions import (
    DisabledNamespace,
    ExecutionStop,
    ExistedNamespace,
    InvaildEventName,
    PropagationCancelled,
    RegisteredEventListener,
    RequirementCrashed,
    UnexistedNamespace,
)
from .interfaces.decorator import DecoratorInterface
from .utilles import (
    argument_signature,
    dispatcher_mixin_handler,
    group_dict,
    printer,
    run_always_await_safely,
    cached_isinstance,
)
from .typing import T_Dispatcher


class Broadcast:
    loop: asyncio.AbstractEventLoop

    default_namespace: Namespace
    namespaces: List[Namespace]
    listeners: List[Listener]

    dispatcher_interface: DispatcherInterface

    debug_flag: bool

    def __init__(
        self,
        *,
        loop: asyncio.AbstractEventLoop = None,
        debug_flag: bool = False,
    ):
        self.loop = loop or asyncio.get_event_loop()
        self.default_namespace = Namespace(name="default", default=True)
        self.debug_flag = debug_flag
        self.namespaces = []
        self.listeners = []
        self.dispatcher_interface = DispatcherInterface(self)
        self.dispatcher_interface.execution_contexts[0].dispatchers.insert(
            0, DecoratorInterface(self.dispatcher_interface)
        )

        @self.dispatcher_interface.inject_global_raw
        async def _(interface: DispatcherInterface):
            if interface.annotation is interface.event.__class__:
                return interface.event
            elif interface.annotation is Broadcast:
                return interface.broadcast
            elif interface.annotation is DispatcherInterface:
                return interface

    def default_listener_generator(self, event_class) -> Iterable[Listener]:
        return (
            iw(self.listeners)
            .filter(lambda x: not x.namespace.hide)  # filter for hide
            .filter(lambda x: not x.namespace.disabled)  # filter for disabled
            .filter(lambda x: event_class in x.listening_events)
            # .collect(list)  # collect to a whole list
        )

    async def layered_scheduler(
        self, listener_generator: Generator[Listener, None, None], event: BaseEvent
    ):
        grouped: Dict[int, List[Listener]] = group_dict(
            listener_generator, lambda x: x.priority
        )
        for _, current_group in sorted(grouped.items(), key=lambda x: x[0]):
            coros = [self.Executor(target=i, event=event) for i in current_group]
            done_tasks, _ = await asyncio.wait(coros)
            for task in done_tasks:
                if task.exception().__class__ is PropagationCancelled:
                    break

    async def Executor(
        self,
        target: Union[Callable, ExecTarget],
        event: BaseEvent,
        dispatchers: List[
            Union[
                Type[BaseDispatcher],
                Callable,
                BaseDispatcher,
            ]
        ] = None,
        post_exception_event: bool = False,
        print_exception: bool = True,
        enableInternalAccess: bool = False,
    ):
        from .builtin.event import ExceptionThrowed

        is_exectarget = cached_isinstance(target, ExecTarget)
        is_listener = cached_isinstance(target, Listener)

        if is_listener:
            if target.namespace.disabled:
                raise DisabledNamespace(
                    "catched a disabled namespace: {0}".format(target.namespace.name)
                )

        target_callable = target.callable if is_exectarget else target
        parameter_compile_result = {}
        complete_finished = False

        track_logs: TrackLog = TrackLog()
        async with self.dispatcher_interface.start_execution(
            event,
            [
                *(dispatchers or []),
                *(target.namespace.injected_dispatchers if is_listener else []),
                *(target.inline_dispatchers if is_exectarget else []),
                *dispatcher_mixin_handler(event.Dispatcher),
            ],
            track_logs,
        ) as dii:
            if enableInternalAccess or (
                is_exectarget and target.enable_internal_access
            ):

                @dii.inject_execution_raw
                async def _(interface: DispatcherInterface):
                    if interface.annotation is target.__class__:
                        return target
                    elif interface.annotation is Namespace and is_listener:
                        return target.namespace

            await dii.exec_lifecycle("beforeExecution")
            try:
                await dii.exec_lifecycle("beforeDispatch")

                if is_exectarget:
                    if target.maybe_failure:
                        initial_path = dii.init_dispatch_path()
                        for name, annotation, default in argument_signature(
                            target_callable
                        ):
                            if (
                                target.param_paths.setdefault(name, initial_path)
                                is initial_path
                            ):
                                target.param_paths[name + "$set"] = set()
                            parameter_compile_result[name] = await dii.lookup_param(
                                name, annotation, default, target.param_paths[name]
                            )
                    else:
                        for name, annotation, default in argument_signature(
                            target_callable
                        ):
                            parameter_compile_result[
                                name
                            ] = await dii.lookup_param_without_log(
                                name, annotation, default, target.param_paths[name]
                            )

                    for hl_d in target.headless_decorators:
                        await dii.lookup_param_without_log(None, None, hl_d)
                else:
                    for name, annotation, default in argument_signature(
                        target_callable
                    ):
                        parameter_compile_result[
                            name
                        ] = await dii.lookup_param_without_log(
                            name, annotation, default, target.param_paths[name]
                        )

                complete_finished = True

                result = await run_always_await_safely(
                    target_callable, **parameter_compile_result
                )
            except (ExecutionStop, PropagationCancelled):
                raise
            except RequirementCrashed:
                traceback.print_exc()
                raise
            except Exception as e:
                if print_exception:
                    traceback.print_exc()
                if post_exception_event:
                    self.postEvent(ExceptionThrowed(exception=e, event=event))
                raise
            finally:
                _, exception, tb = sys.exc_info()
                await dii.exec_lifecycle("afterDispatch", exception, tb)
                await dii.exec_lifecycle("afterTargetExec", exception, tb)
                await dii.exec_lifecycle("afterExecution", exception, tb)

                if is_exectarget and not track_logs.fluent_success:
                    current_paths = target.param_paths
                    current_path: List[List["T_Dispatcher"]] = None
                    current_path_set: Set["T_Dispatcher"] = None
                    has_failures: set = set()

                    for log in track_logs.log:
                        if log[0] is TrackLogType.LookupStart:
                            current_path = current_paths[log[1]]
                            current_path_set = current_paths[log[1] + "$set"]
                        elif log[0] is TrackLogType.LookupEnd:
                            current_path = None
                        elif (
                            current_path is not None
                            and log[0] is TrackLogType.Result
                            and log[2] not in current_path_set
                        ):
                            current_path[0].append(log[2])
                            current_path_set.add(log[2])
                        elif log[0] is TrackLogType.Continue:
                            has_failures.add(log[1])

                    target.maybe_failure.symmetric_difference_update(has_failures)

            if result.__class__ is Force:
                return result.content

            if result.__class__ is RemoveMe:
                if cached_isinstance(target, Listener):
                    if target in self.listeners:
                        self.listeners.pop(self.listeners.index(target))

            return result

    def postEvent(self, event: BaseEvent):
        self.loop.create_task(
            self.layered_scheduler(
                listener_generator=self.default_listener_generator(event.__class__),
                event=event,
            )
        )

    @staticmethod
    def event_class_generator(target=BaseEvent):
        for i in target.__subclasses__():
            yield i
            if i.__subclasses__():
                yield from Broadcast.event_class_generator(i)

    @staticmethod
    def findEvent(name: str):
        for i in Broadcast.event_class_generator():
            if i.__name__ == name:
                return i

    def getDefaultNamespace(self):
        return self.default_namespace

    def createNamespace(
        self, name, *, priority: int = 0, hide: bool = False, disabled: bool = False
    ):
        if self.containNamespace(name):
            raise ExistedNamespace(name, "has been created!")
        self.namespaces.append(
            Namespace(name=name, priority=priority, hide=hide, disabled=disabled)
        )
        return self.namespaces[-1]

    def removeNamespace(self, name):
        if self.containNamespace(name):
            for index, i in enumerate(self.namespaces):
                if i.name == name:
                    self.namespaces.pop(index)
                    return
        else:
            raise UnexistedNamespace(name)

    def containNamespace(self, name):
        for i in self.namespaces:
            if i.name == name:
                return True
        return False

    def getNamespace(self, name):
        if self.containNamespace(name):
            for i in self.namespaces:
                if i.name == name:
                    return i
        else:
            raise UnexistedNamespace(name)

    def hideNamespace(self, name):
        ns = self.getNamespace(name)
        ns.hide = True

    def unhideNamespace(self, name):
        ns = self.getNamespace(name)
        ns.hide = False

    def disableNamespace(self, name):
        ns = self.getNamespace(name)
        ns.disabled = True

    def enableNamespace(self, name):
        ns = self.getNamespace(name)
        ns.disabled = False

    def containListener(self, target):
        for i in self.listeners:
            if i.callable == target:
                return True
        return False

    def getListener(self, target):
        for i in self.listeners:
            if i.callable == target:
                return i

    def removeListener(self, target):
        self.listeners.remove(target)

    def receiver(
        self,
        event: Union[str, Type[BaseEvent]],
        priority: int = 16,
        dispatchers: List[Type[BaseDispatcher]] = [],
        namespace: Namespace = None,
        headless_decorators: List[Decorator] = [],
        enable_internal_access: bool = False,
    ):
        if cached_isinstance(event, str):
            _name = event
            event = self.findEvent(event)
            if not event:
                raise InvaildEventName(_name, "is not vaild!")
        priority = (type(priority) == int) and priority or int(priority)  # 类型转换

        def receiver_wrapper(callable_target):
            may_listener = self.getListener(callable_target)
            if not may_listener:
                self.listeners.append(
                    Listener(
                        callable=callable_target,
                        namespace=namespace or self.getDefaultNamespace(),
                        inline_dispatchers=dispatchers,
                        priority=priority,
                        listening_events=[event],
                        headless_decorators=headless_decorators,
                        enable_internal_access=enable_internal_access,
                    )
                )
            else:
                if event not in may_listener.listening_events:
                    may_listener.listening_events.append(event)
                else:
                    raise RegisteredEventListener(
                        event.__name__, "has been registered!"
                    )
            return callable_target

        return receiver_wrapper

Sub-modules

graia.broadcast.builtin
graia.broadcast.entities
graia.broadcast.exceptions
graia.broadcast.interfaces
graia.broadcast.interrupt
graia.broadcast.priority
graia.broadcast.typing
graia.broadcast.utilles

Classes

class Broadcast (*, loop: asyncio.events.AbstractEventLoop = None, debug_flag: bool = False)
Expand source code
class Broadcast:
    loop: asyncio.AbstractEventLoop

    default_namespace: Namespace
    namespaces: List[Namespace]
    listeners: List[Listener]

    dispatcher_interface: DispatcherInterface

    debug_flag: bool

    def __init__(
        self,
        *,
        loop: asyncio.AbstractEventLoop = None,
        debug_flag: bool = False,
    ):
        self.loop = loop or asyncio.get_event_loop()
        self.default_namespace = Namespace(name="default", default=True)
        self.debug_flag = debug_flag
        self.namespaces = []
        self.listeners = []
        self.dispatcher_interface = DispatcherInterface(self)
        self.dispatcher_interface.execution_contexts[0].dispatchers.insert(
            0, DecoratorInterface(self.dispatcher_interface)
        )

        @self.dispatcher_interface.inject_global_raw
        async def _(interface: DispatcherInterface):
            if interface.annotation is interface.event.__class__:
                return interface.event
            elif interface.annotation is Broadcast:
                return interface.broadcast
            elif interface.annotation is DispatcherInterface:
                return interface

    def default_listener_generator(self, event_class) -> Iterable[Listener]:
        return (
            iw(self.listeners)
            .filter(lambda x: not x.namespace.hide)  # filter for hide
            .filter(lambda x: not x.namespace.disabled)  # filter for disabled
            .filter(lambda x: event_class in x.listening_events)
            # .collect(list)  # collect to a whole list
        )

    async def layered_scheduler(
        self, listener_generator: Generator[Listener, None, None], event: BaseEvent
    ):
        grouped: Dict[int, List[Listener]] = group_dict(
            listener_generator, lambda x: x.priority
        )
        for _, current_group in sorted(grouped.items(), key=lambda x: x[0]):
            coros = [self.Executor(target=i, event=event) for i in current_group]
            done_tasks, _ = await asyncio.wait(coros)
            for task in done_tasks:
                if task.exception().__class__ is PropagationCancelled:
                    break

    async def Executor(
        self,
        target: Union[Callable, ExecTarget],
        event: BaseEvent,
        dispatchers: List[
            Union[
                Type[BaseDispatcher],
                Callable,
                BaseDispatcher,
            ]
        ] = None,
        post_exception_event: bool = False,
        print_exception: bool = True,
        enableInternalAccess: bool = False,
    ):
        from .builtin.event import ExceptionThrowed

        is_exectarget = cached_isinstance(target, ExecTarget)
        is_listener = cached_isinstance(target, Listener)

        if is_listener:
            if target.namespace.disabled:
                raise DisabledNamespace(
                    "catched a disabled namespace: {0}".format(target.namespace.name)
                )

        target_callable = target.callable if is_exectarget else target
        parameter_compile_result = {}
        complete_finished = False

        track_logs: TrackLog = TrackLog()
        async with self.dispatcher_interface.start_execution(
            event,
            [
                *(dispatchers or []),
                *(target.namespace.injected_dispatchers if is_listener else []),
                *(target.inline_dispatchers if is_exectarget else []),
                *dispatcher_mixin_handler(event.Dispatcher),
            ],
            track_logs,
        ) as dii:
            if enableInternalAccess or (
                is_exectarget and target.enable_internal_access
            ):

                @dii.inject_execution_raw
                async def _(interface: DispatcherInterface):
                    if interface.annotation is target.__class__:
                        return target
                    elif interface.annotation is Namespace and is_listener:
                        return target.namespace

            await dii.exec_lifecycle("beforeExecution")
            try:
                await dii.exec_lifecycle("beforeDispatch")

                if is_exectarget:
                    if target.maybe_failure:
                        initial_path = dii.init_dispatch_path()
                        for name, annotation, default in argument_signature(
                            target_callable
                        ):
                            if (
                                target.param_paths.setdefault(name, initial_path)
                                is initial_path
                            ):
                                target.param_paths[name + "$set"] = set()
                            parameter_compile_result[name] = await dii.lookup_param(
                                name, annotation, default, target.param_paths[name]
                            )
                    else:
                        for name, annotation, default in argument_signature(
                            target_callable
                        ):
                            parameter_compile_result[
                                name
                            ] = await dii.lookup_param_without_log(
                                name, annotation, default, target.param_paths[name]
                            )

                    for hl_d in target.headless_decorators:
                        await dii.lookup_param_without_log(None, None, hl_d)
                else:
                    for name, annotation, default in argument_signature(
                        target_callable
                    ):
                        parameter_compile_result[
                            name
                        ] = await dii.lookup_param_without_log(
                            name, annotation, default, target.param_paths[name]
                        )

                complete_finished = True

                result = await run_always_await_safely(
                    target_callable, **parameter_compile_result
                )
            except (ExecutionStop, PropagationCancelled):
                raise
            except RequirementCrashed:
                traceback.print_exc()
                raise
            except Exception as e:
                if print_exception:
                    traceback.print_exc()
                if post_exception_event:
                    self.postEvent(ExceptionThrowed(exception=e, event=event))
                raise
            finally:
                _, exception, tb = sys.exc_info()
                await dii.exec_lifecycle("afterDispatch", exception, tb)
                await dii.exec_lifecycle("afterTargetExec", exception, tb)
                await dii.exec_lifecycle("afterExecution", exception, tb)

                if is_exectarget and not track_logs.fluent_success:
                    current_paths = target.param_paths
                    current_path: List[List["T_Dispatcher"]] = None
                    current_path_set: Set["T_Dispatcher"] = None
                    has_failures: set = set()

                    for log in track_logs.log:
                        if log[0] is TrackLogType.LookupStart:
                            current_path = current_paths[log[1]]
                            current_path_set = current_paths[log[1] + "$set"]
                        elif log[0] is TrackLogType.LookupEnd:
                            current_path = None
                        elif (
                            current_path is not None
                            and log[0] is TrackLogType.Result
                            and log[2] not in current_path_set
                        ):
                            current_path[0].append(log[2])
                            current_path_set.add(log[2])
                        elif log[0] is TrackLogType.Continue:
                            has_failures.add(log[1])

                    target.maybe_failure.symmetric_difference_update(has_failures)

            if result.__class__ is Force:
                return result.content

            if result.__class__ is RemoveMe:
                if cached_isinstance(target, Listener):
                    if target in self.listeners:
                        self.listeners.pop(self.listeners.index(target))

            return result

    def postEvent(self, event: BaseEvent):
        self.loop.create_task(
            self.layered_scheduler(
                listener_generator=self.default_listener_generator(event.__class__),
                event=event,
            )
        )

    @staticmethod
    def event_class_generator(target=BaseEvent):
        for i in target.__subclasses__():
            yield i
            if i.__subclasses__():
                yield from Broadcast.event_class_generator(i)

    @staticmethod
    def findEvent(name: str):
        for i in Broadcast.event_class_generator():
            if i.__name__ == name:
                return i

    def getDefaultNamespace(self):
        return self.default_namespace

    def createNamespace(
        self, name, *, priority: int = 0, hide: bool = False, disabled: bool = False
    ):
        if self.containNamespace(name):
            raise ExistedNamespace(name, "has been created!")
        self.namespaces.append(
            Namespace(name=name, priority=priority, hide=hide, disabled=disabled)
        )
        return self.namespaces[-1]

    def removeNamespace(self, name):
        if self.containNamespace(name):
            for index, i in enumerate(self.namespaces):
                if i.name == name:
                    self.namespaces.pop(index)
                    return
        else:
            raise UnexistedNamespace(name)

    def containNamespace(self, name):
        for i in self.namespaces:
            if i.name == name:
                return True
        return False

    def getNamespace(self, name):
        if self.containNamespace(name):
            for i in self.namespaces:
                if i.name == name:
                    return i
        else:
            raise UnexistedNamespace(name)

    def hideNamespace(self, name):
        ns = self.getNamespace(name)
        ns.hide = True

    def unhideNamespace(self, name):
        ns = self.getNamespace(name)
        ns.hide = False

    def disableNamespace(self, name):
        ns = self.getNamespace(name)
        ns.disabled = True

    def enableNamespace(self, name):
        ns = self.getNamespace(name)
        ns.disabled = False

    def containListener(self, target):
        for i in self.listeners:
            if i.callable == target:
                return True
        return False

    def getListener(self, target):
        for i in self.listeners:
            if i.callable == target:
                return i

    def removeListener(self, target):
        self.listeners.remove(target)

    def receiver(
        self,
        event: Union[str, Type[BaseEvent]],
        priority: int = 16,
        dispatchers: List[Type[BaseDispatcher]] = [],
        namespace: Namespace = None,
        headless_decorators: List[Decorator] = [],
        enable_internal_access: bool = False,
    ):
        if cached_isinstance(event, str):
            _name = event
            event = self.findEvent(event)
            if not event:
                raise InvaildEventName(_name, "is not vaild!")
        priority = (type(priority) == int) and priority or int(priority)  # 类型转换

        def receiver_wrapper(callable_target):
            may_listener = self.getListener(callable_target)
            if not may_listener:
                self.listeners.append(
                    Listener(
                        callable=callable_target,
                        namespace=namespace or self.getDefaultNamespace(),
                        inline_dispatchers=dispatchers,
                        priority=priority,
                        listening_events=[event],
                        headless_decorators=headless_decorators,
                        enable_internal_access=enable_internal_access,
                    )
                )
            else:
                if event not in may_listener.listening_events:
                    may_listener.listening_events.append(event)
                else:
                    raise RegisteredEventListener(
                        event.__name__, "has been registered!"
                    )
            return callable_target

        return receiver_wrapper

Class variables

var debug_flag : bool
var default_namespaceNamespace
var dispatcher_interfaceDispatcherInterface
var listeners : List[Listener]
var loop : asyncio.events.AbstractEventLoop
var namespaces : List[Namespace]

Static methods

def event_class_generator(target=graia.broadcast.entities.event.BaseEvent)
Expand source code
@staticmethod
def event_class_generator(target=BaseEvent):
    for i in target.__subclasses__():
        yield i
        if i.__subclasses__():
            yield from Broadcast.event_class_generator(i)
def findEvent(name: str)
Expand source code
@staticmethod
def findEvent(name: str):
    for i in Broadcast.event_class_generator():
        if i.__name__ == name:
            return i

Methods

async def Executor(self, target: Union[Callable, ExecTarget], event: BaseEvent, dispatchers: List[Union[Type[BaseDispatcher], Callable, BaseDispatcher]] = None, post_exception_event: bool = False, print_exception: bool = True, enableInternalAccess: bool = False)
Expand source code
async def Executor(
    self,
    target: Union[Callable, ExecTarget],
    event: BaseEvent,
    dispatchers: List[
        Union[
            Type[BaseDispatcher],
            Callable,
            BaseDispatcher,
        ]
    ] = None,
    post_exception_event: bool = False,
    print_exception: bool = True,
    enableInternalAccess: bool = False,
):
    from .builtin.event import ExceptionThrowed

    is_exectarget = cached_isinstance(target, ExecTarget)
    is_listener = cached_isinstance(target, Listener)

    if is_listener:
        if target.namespace.disabled:
            raise DisabledNamespace(
                "catched a disabled namespace: {0}".format(target.namespace.name)
            )

    target_callable = target.callable if is_exectarget else target
    parameter_compile_result = {}
    complete_finished = False

    track_logs: TrackLog = TrackLog()
    async with self.dispatcher_interface.start_execution(
        event,
        [
            *(dispatchers or []),
            *(target.namespace.injected_dispatchers if is_listener else []),
            *(target.inline_dispatchers if is_exectarget else []),
            *dispatcher_mixin_handler(event.Dispatcher),
        ],
        track_logs,
    ) as dii:
        if enableInternalAccess or (
            is_exectarget and target.enable_internal_access
        ):

            @dii.inject_execution_raw
            async def _(interface: DispatcherInterface):
                if interface.annotation is target.__class__:
                    return target
                elif interface.annotation is Namespace and is_listener:
                    return target.namespace

        await dii.exec_lifecycle("beforeExecution")
        try:
            await dii.exec_lifecycle("beforeDispatch")

            if is_exectarget:
                if target.maybe_failure:
                    initial_path = dii.init_dispatch_path()
                    for name, annotation, default in argument_signature(
                        target_callable
                    ):
                        if (
                            target.param_paths.setdefault(name, initial_path)
                            is initial_path
                        ):
                            target.param_paths[name + "$set"] = set()
                        parameter_compile_result[name] = await dii.lookup_param(
                            name, annotation, default, target.param_paths[name]
                        )
                else:
                    for name, annotation, default in argument_signature(
                        target_callable
                    ):
                        parameter_compile_result[
                            name
                        ] = await dii.lookup_param_without_log(
                            name, annotation, default, target.param_paths[name]
                        )

                for hl_d in target.headless_decorators:
                    await dii.lookup_param_without_log(None, None, hl_d)
            else:
                for name, annotation, default in argument_signature(
                    target_callable
                ):
                    parameter_compile_result[
                        name
                    ] = await dii.lookup_param_without_log(
                        name, annotation, default, target.param_paths[name]
                    )

            complete_finished = True

            result = await run_always_await_safely(
                target_callable, **parameter_compile_result
            )
        except (ExecutionStop, PropagationCancelled):
            raise
        except RequirementCrashed:
            traceback.print_exc()
            raise
        except Exception as e:
            if print_exception:
                traceback.print_exc()
            if post_exception_event:
                self.postEvent(ExceptionThrowed(exception=e, event=event))
            raise
        finally:
            _, exception, tb = sys.exc_info()
            await dii.exec_lifecycle("afterDispatch", exception, tb)
            await dii.exec_lifecycle("afterTargetExec", exception, tb)
            await dii.exec_lifecycle("afterExecution", exception, tb)

            if is_exectarget and not track_logs.fluent_success:
                current_paths = target.param_paths
                current_path: List[List["T_Dispatcher"]] = None
                current_path_set: Set["T_Dispatcher"] = None
                has_failures: set = set()

                for log in track_logs.log:
                    if log[0] is TrackLogType.LookupStart:
                        current_path = current_paths[log[1]]
                        current_path_set = current_paths[log[1] + "$set"]
                    elif log[0] is TrackLogType.LookupEnd:
                        current_path = None
                    elif (
                        current_path is not None
                        and log[0] is TrackLogType.Result
                        and log[2] not in current_path_set
                    ):
                        current_path[0].append(log[2])
                        current_path_set.add(log[2])
                    elif log[0] is TrackLogType.Continue:
                        has_failures.add(log[1])

                target.maybe_failure.symmetric_difference_update(has_failures)

        if result.__class__ is Force:
            return result.content

        if result.__class__ is RemoveMe:
            if cached_isinstance(target, Listener):
                if target in self.listeners:
                    self.listeners.pop(self.listeners.index(target))

        return result
def containListener(self, target)
Expand source code
def containListener(self, target):
    for i in self.listeners:
        if i.callable == target:
            return True
    return False
def containNamespace(self, name)
Expand source code
def containNamespace(self, name):
    for i in self.namespaces:
        if i.name == name:
            return True
    return False
def createNamespace(self, name, *, priority: int = 0, hide: bool = False, disabled: bool = False)
Expand source code
def createNamespace(
    self, name, *, priority: int = 0, hide: bool = False, disabled: bool = False
):
    if self.containNamespace(name):
        raise ExistedNamespace(name, "has been created!")
    self.namespaces.append(
        Namespace(name=name, priority=priority, hide=hide, disabled=disabled)
    )
    return self.namespaces[-1]
def default_listener_generator(self, event_class) ‑> Iterable[Listener]
Expand source code
def default_listener_generator(self, event_class) -> Iterable[Listener]:
    return (
        iw(self.listeners)
        .filter(lambda x: not x.namespace.hide)  # filter for hide
        .filter(lambda x: not x.namespace.disabled)  # filter for disabled
        .filter(lambda x: event_class in x.listening_events)
        # .collect(list)  # collect to a whole list
    )
def disableNamespace(self, name)
Expand source code
def disableNamespace(self, name):
    ns = self.getNamespace(name)
    ns.disabled = True
def enableNamespace(self, name)
Expand source code
def enableNamespace(self, name):
    ns = self.getNamespace(name)
    ns.disabled = False
def getDefaultNamespace(self)
Expand source code
def getDefaultNamespace(self):
    return self.default_namespace
def getListener(self, target)
Expand source code
def getListener(self, target):
    for i in self.listeners:
        if i.callable == target:
            return i
def getNamespace(self, name)
Expand source code
def getNamespace(self, name):
    if self.containNamespace(name):
        for i in self.namespaces:
            if i.name == name:
                return i
    else:
        raise UnexistedNamespace(name)
def hideNamespace(self, name)
Expand source code
def hideNamespace(self, name):
    ns = self.getNamespace(name)
    ns.hide = True
async def layered_scheduler(self, listener_generator: Generator[Listener, NoneType, NoneType], event: BaseEvent)
Expand source code
async def layered_scheduler(
    self, listener_generator: Generator[Listener, None, None], event: BaseEvent
):
    grouped: Dict[int, List[Listener]] = group_dict(
        listener_generator, lambda x: x.priority
    )
    for _, current_group in sorted(grouped.items(), key=lambda x: x[0]):
        coros = [self.Executor(target=i, event=event) for i in current_group]
        done_tasks, _ = await asyncio.wait(coros)
        for task in done_tasks:
            if task.exception().__class__ is PropagationCancelled:
                break
def postEvent(self, event: BaseEvent)
Expand source code
def postEvent(self, event: BaseEvent):
    self.loop.create_task(
        self.layered_scheduler(
            listener_generator=self.default_listener_generator(event.__class__),
            event=event,
        )
    )
def receiver(self, event: Union[str, Type[BaseEvent]], priority: int = 16, dispatchers: List[Type[BaseDispatcher]] = [], namespace: Namespace = None, headless_decorators: List[Decorator] = [], enable_internal_access: bool = False)
Expand source code
def receiver(
    self,
    event: Union[str, Type[BaseEvent]],
    priority: int = 16,
    dispatchers: List[Type[BaseDispatcher]] = [],
    namespace: Namespace = None,
    headless_decorators: List[Decorator] = [],
    enable_internal_access: bool = False,
):
    if cached_isinstance(event, str):
        _name = event
        event = self.findEvent(event)
        if not event:
            raise InvaildEventName(_name, "is not vaild!")
    priority = (type(priority) == int) and priority or int(priority)  # 类型转换

    def receiver_wrapper(callable_target):
        may_listener = self.getListener(callable_target)
        if not may_listener:
            self.listeners.append(
                Listener(
                    callable=callable_target,
                    namespace=namespace or self.getDefaultNamespace(),
                    inline_dispatchers=dispatchers,
                    priority=priority,
                    listening_events=[event],
                    headless_decorators=headless_decorators,
                    enable_internal_access=enable_internal_access,
                )
            )
        else:
            if event not in may_listener.listening_events:
                may_listener.listening_events.append(event)
            else:
                raise RegisteredEventListener(
                    event.__name__, "has been registered!"
                )
        return callable_target

    return receiver_wrapper
def removeListener(self, target)
Expand source code
def removeListener(self, target):
    self.listeners.remove(target)
def removeNamespace(self, name)
Expand source code
def removeNamespace(self, name):
    if self.containNamespace(name):
        for index, i in enumerate(self.namespaces):
            if i.name == name:
                self.namespaces.pop(index)
                return
    else:
        raise UnexistedNamespace(name)
def unhideNamespace(self, name)
Expand source code
def unhideNamespace(self, name):
    ns = self.getNamespace(name)
    ns.hide = False