Module graia.broadcast.interfaces.dispatcher
Expand source code
from inspect import isclass, isfunction, ismethod
import itertools
from functools import lru_cache, partial
from typing import (
Any,
Dict,
Generator,
List,
Optional,
TYPE_CHECKING,
Tuple,
)
from graia.broadcast.entities.dispatcher import BaseDispatcher
from graia.broadcast.entities.event import BaseEvent
from graia.broadcast.entities.context import ExecutionContext, ParameterContext
from graia.broadcast.entities.signatures import Force
from graia.broadcast.entities.track_log import TrackLog, TrackLogType
from graia.broadcast.exceptions import RequirementCrashed
from graia.broadcast.typing import T_Dispatcher, T_Dispatcher_Callable
from ..utilles import NestableIterable, run_always_await_safely, cached_getattr
if TYPE_CHECKING:
from graia.broadcast import Broadcast
DEFAULT_LIFECYCLE_NAMES = (
"beforeDispatch",
"afterDispatch",
"beforeExecution",
"afterExecution",
"beforeTargetExec",
"afterTargetExec",
)
class EmptyEvent(BaseEvent):
class Dispatcher(BaseDispatcher):
@staticmethod
def catch(_):
pass
class DispatcherInterface:
broadcast: "Broadcast"
execution_contexts: List["ExecutionContext"]
parameter_contexts: List["ParameterContext"]
track_logs: List[TrackLog] = None
@property
def track_log(self):
return self.track_logs[-1]
@staticmethod
@lru_cache(None)
def get_lifecycle_refs(
dispatcher: "T_Dispatcher",
) -> Optional[Dict[str, List]]:
from graia.broadcast.entities.dispatcher import BaseDispatcher
lifecycle_refs: Dict[str, List] = {}
if not isinstance(dispatcher, (BaseDispatcher, type)):
return
for name in DEFAULT_LIFECYCLE_NAMES:
lifecycle_refs.setdefault(name, [])
abstract_lifecycle_func = cached_getattr(BaseDispatcher, name)
unbound_attr = getattr(dispatcher, name, None)
if unbound_attr is None:
continue
orig_call = unbound_attr
while ismethod(orig_call):
orig_call = unbound_attr.__func__
if orig_call is abstract_lifecycle_func:
continue
lifecycle_refs[name].append(unbound_attr)
return lifecycle_refs
def dispatcher_pure_generator(self) -> Generator[None, None, T_Dispatcher]:
return itertools.chain(
self.execution_contexts[0].dispatchers,
self.parameter_contexts[-1].dispatchers,
self.execution_contexts[-1].dispatchers,
)
def flush_lifecycle_refs(
self,
dispatchers: List["T_Dispatcher"] = None,
):
from graia.broadcast.entities.dispatcher import BaseDispatcher
lifecycle_refs = self.execution_contexts[-1].lifecycle_refs
if dispatchers is None and lifecycle_refs: # 已经刷新.
return
for dispatcher in dispatchers or self.dispatcher_pure_generator():
if (
not isinstance(dispatcher, BaseDispatcher)
or dispatcher.__class__ is type
):
continue
for name, value in self.get_lifecycle_refs(dispatcher).items():
lifecycle_refs.setdefault(name, [])
lifecycle_refs[name].extend(value)
async def exec_lifecycle(self, lifecycle_name: str, *args, **kwargs):
lifecycle_funcs = self.execution_contexts[-1].lifecycle_refs.get(
lifecycle_name, []
)
if lifecycle_funcs:
for func in lifecycle_funcs:
await run_always_await_safely(func, self, *args, **kwargs)
def inject_local_raw(self, *dispatchers: List["T_Dispatcher"]):
# 为什么没有 flush: 因为这里的 lifecycle 是无意义的.
for dispatcher in dispatchers[::-1]:
self.parameter_contexts[-1].dispatchers.insert(0, dispatcher)
def inject_execution_raw(self, *dispatchers: List["T_Dispatcher"]):
for dispatcher in dispatchers:
self.execution_contexts[-1].dispatchers.insert(0, dispatcher)
self.flush_lifecycle_refs(dispatchers)
def inject_global_raw(self, *dispatchers: List["T_Dispatcher"]):
# self.dispatchers.extend(dispatchers)
for dispatcher in dispatchers[::-1]:
self.execution_contexts[0].dispatchers.insert(1, dispatcher)
self.flush_lifecycle_refs(dispatchers)
@property
def name(self) -> str:
return self.parameter_contexts[-1].name
@property
def annotation(self) -> Any:
return self.parameter_contexts[-1].annotation
@property
def default(self) -> Any:
return self.parameter_contexts[-1].default
@property
def _index(self) -> int:
return self.execution_contexts[-1]._index
@property
def event(self) -> BaseEvent:
return self.execution_contexts[-1].event
@property
def global_dispatcher(self) -> List[T_Dispatcher]:
return self.execution_contexts[0].dispatchers
@property
def current_path(self) -> NestableIterable[int, T_Dispatcher]:
return self.parameter_contexts[-1].path
@property
def has_current_exec_context(self) -> bool:
return len(self.execution_contexts) >= 2
@property
def has_current_param_context(self) -> bool:
return len(self.parameter_contexts) >= 2
def __init__(self, broadcast_instance: "Broadcast") -> None:
self.broadcast = broadcast_instance
self.execution_contexts = [ExecutionContext([], EmptyEvent())]
self.parameter_contexts = [
ParameterContext(
None,
None,
None,
[],
[
[],
],
)
]
self.track_logs = [TrackLog()]
async def __aenter__(self) -> "DispatcherInterface":
return self
async def __aexit__(self, _, exc: Exception, tb):
await self.exit_current_execution()
if tb is not None:
raise exc.with_traceback(tb)
def start_execution(
self,
event: BaseEvent,
dispatchers: List[T_Dispatcher],
track_log_receiver: TrackLog = None,
) -> "DispatcherInterface":
self.execution_contexts.append(ExecutionContext(dispatchers, event))
self.flush_lifecycle_refs()
self.track_logs.append(track_log_receiver or TrackLog())
return self
async def exit_current_execution(self):
self.execution_contexts.pop()
self.track_logs.pop()
@staticmethod
@lru_cache(None)
def dispatcher_callable_detector(dispatcher: T_Dispatcher) -> T_Dispatcher_Callable:
if hasattr(dispatcher, "catch"):
if isfunction(dispatcher.catch) or not isclass(dispatcher):
return dispatcher.catch
else:
return partial(dispatcher.catch, None)
elif callable(dispatcher):
return dispatcher
else:
raise ValueError("invaild dispatcher: ", dispatcher)
def init_dispatch_path(self) -> List[List["T_Dispatcher"]]:
return [
[],
self.execution_contexts[0].dispatchers,
self.parameter_contexts[-1].dispatchers,
self.execution_contexts[-1].dispatchers,
]
async def lookup_param_without_log(
self,
name: str,
annotation: Any,
default: Any,
using_path: List[List["T_Dispatcher"]] = None,
) -> Any:
self.parameter_contexts.append(
ParameterContext(
name, annotation, default, [], using_path or self.init_dispatch_path()
)
)
result = None
try:
for dispatcher in self.current_path:
result = await run_always_await_safely(
self.dispatcher_callable_detector(dispatcher), self
)
if result is None:
continue
if result.__class__ is Force:
result = result.target
self.execution_contexts[-1]._index = 0
return result
else:
raise RequirementCrashed(
"the dispatching requirement crashed: ",
self.name,
self.annotation,
self.default,
)
finally:
self.parameter_contexts.pop()
async def lookup_param(
self,
name: str,
annotation: Any,
default: Any,
using_path: List[List["T_Dispatcher"]] = None,
) -> Any:
self.parameter_contexts.append(
ParameterContext(
name, annotation, default, [], using_path or self.init_dispatch_path()
)
)
track_log = self.track_log.log
track_log.append((TrackLogType.LookupStart, name, annotation, default))
try:
for dispatcher in self.current_path:
result = await run_always_await_safely(
self.dispatcher_callable_detector(dispatcher), self
)
if result is None:
self.track_log.fluent_success = False
track_log.append((TrackLogType.Continue, name, dispatcher))
continue
if result.__class__ is Force:
result = result.target
track_log.append((TrackLogType.Result, name, dispatcher))
self.execution_contexts[-1]._index = 0
return result
else:
track_log.append((TrackLogType.RequirementCrashed, name))
raise RequirementCrashed(
"the dispatching requirement crashed: ",
self.name,
self.annotation,
self.default,
)
finally:
track_log.append((TrackLogType.LookupEnd, name))
self.parameter_contexts.pop()
async def lookup_using_current(self) -> Any:
result = None
for dispatcher in self.current_path:
result = await run_always_await_safely(
self.dispatcher_callable_detector(dispatcher), self
)
if result is None:
continue
if result.__class__ is Force:
result = result.target
self.execution_contexts[-1]._index = 0
return result
else:
raise RequirementCrashed(
"the dispatching requirement crashed: ",
self.name,
self.annotation,
self.default,
)
async def lookup_by_directly(
self,
dispatcher: T_Dispatcher,
name: str,
annotation: Any,
default: Any,
using_path: List[List["T_Dispatcher"]] = None,
) -> Any:
self.parameter_contexts.append(
ParameterContext(
name, annotation, default, [], using_path or self.init_dispatch_path()
)
)
dispatcher_callable = self.dispatcher_callable_detector(dispatcher)
try:
result = await run_always_await_safely(dispatcher_callable, self)
if result.__class__ is Force:
return result.target
return result
finally:
self.parameter_contexts.pop()
Classes
class DispatcherInterface (broadcast_instance: Broadcast)
-
Expand source code
class DispatcherInterface: broadcast: "Broadcast" execution_contexts: List["ExecutionContext"] parameter_contexts: List["ParameterContext"] track_logs: List[TrackLog] = None @property def track_log(self): return self.track_logs[-1] @staticmethod @lru_cache(None) def get_lifecycle_refs( dispatcher: "T_Dispatcher", ) -> Optional[Dict[str, List]]: from graia.broadcast.entities.dispatcher import BaseDispatcher lifecycle_refs: Dict[str, List] = {} if not isinstance(dispatcher, (BaseDispatcher, type)): return for name in DEFAULT_LIFECYCLE_NAMES: lifecycle_refs.setdefault(name, []) abstract_lifecycle_func = cached_getattr(BaseDispatcher, name) unbound_attr = getattr(dispatcher, name, None) if unbound_attr is None: continue orig_call = unbound_attr while ismethod(orig_call): orig_call = unbound_attr.__func__ if orig_call is abstract_lifecycle_func: continue lifecycle_refs[name].append(unbound_attr) return lifecycle_refs def dispatcher_pure_generator(self) -> Generator[None, None, T_Dispatcher]: return itertools.chain( self.execution_contexts[0].dispatchers, self.parameter_contexts[-1].dispatchers, self.execution_contexts[-1].dispatchers, ) def flush_lifecycle_refs( self, dispatchers: List["T_Dispatcher"] = None, ): from graia.broadcast.entities.dispatcher import BaseDispatcher lifecycle_refs = self.execution_contexts[-1].lifecycle_refs if dispatchers is None and lifecycle_refs: # 已经刷新. return for dispatcher in dispatchers or self.dispatcher_pure_generator(): if ( not isinstance(dispatcher, BaseDispatcher) or dispatcher.__class__ is type ): continue for name, value in self.get_lifecycle_refs(dispatcher).items(): lifecycle_refs.setdefault(name, []) lifecycle_refs[name].extend(value) async def exec_lifecycle(self, lifecycle_name: str, *args, **kwargs): lifecycle_funcs = self.execution_contexts[-1].lifecycle_refs.get( lifecycle_name, [] ) if lifecycle_funcs: for func in lifecycle_funcs: await run_always_await_safely(func, self, *args, **kwargs) def inject_local_raw(self, *dispatchers: List["T_Dispatcher"]): # 为什么没有 flush: 因为这里的 lifecycle 是无意义的. for dispatcher in dispatchers[::-1]: self.parameter_contexts[-1].dispatchers.insert(0, dispatcher) def inject_execution_raw(self, *dispatchers: List["T_Dispatcher"]): for dispatcher in dispatchers: self.execution_contexts[-1].dispatchers.insert(0, dispatcher) self.flush_lifecycle_refs(dispatchers) def inject_global_raw(self, *dispatchers: List["T_Dispatcher"]): # self.dispatchers.extend(dispatchers) for dispatcher in dispatchers[::-1]: self.execution_contexts[0].dispatchers.insert(1, dispatcher) self.flush_lifecycle_refs(dispatchers) @property def name(self) -> str: return self.parameter_contexts[-1].name @property def annotation(self) -> Any: return self.parameter_contexts[-1].annotation @property def default(self) -> Any: return self.parameter_contexts[-1].default @property def _index(self) -> int: return self.execution_contexts[-1]._index @property def event(self) -> BaseEvent: return self.execution_contexts[-1].event @property def global_dispatcher(self) -> List[T_Dispatcher]: return self.execution_contexts[0].dispatchers @property def current_path(self) -> NestableIterable[int, T_Dispatcher]: return self.parameter_contexts[-1].path @property def has_current_exec_context(self) -> bool: return len(self.execution_contexts) >= 2 @property def has_current_param_context(self) -> bool: return len(self.parameter_contexts) >= 2 def __init__(self, broadcast_instance: "Broadcast") -> None: self.broadcast = broadcast_instance self.execution_contexts = [ExecutionContext([], EmptyEvent())] self.parameter_contexts = [ ParameterContext( None, None, None, [], [ [], ], ) ] self.track_logs = [TrackLog()] async def __aenter__(self) -> "DispatcherInterface": return self async def __aexit__(self, _, exc: Exception, tb): await self.exit_current_execution() if tb is not None: raise exc.with_traceback(tb) def start_execution( self, event: BaseEvent, dispatchers: List[T_Dispatcher], track_log_receiver: TrackLog = None, ) -> "DispatcherInterface": self.execution_contexts.append(ExecutionContext(dispatchers, event)) self.flush_lifecycle_refs() self.track_logs.append(track_log_receiver or TrackLog()) return self async def exit_current_execution(self): self.execution_contexts.pop() self.track_logs.pop() @staticmethod @lru_cache(None) def dispatcher_callable_detector(dispatcher: T_Dispatcher) -> T_Dispatcher_Callable: if hasattr(dispatcher, "catch"): if isfunction(dispatcher.catch) or not isclass(dispatcher): return dispatcher.catch else: return partial(dispatcher.catch, None) elif callable(dispatcher): return dispatcher else: raise ValueError("invaild dispatcher: ", dispatcher) def init_dispatch_path(self) -> List[List["T_Dispatcher"]]: return [ [], self.execution_contexts[0].dispatchers, self.parameter_contexts[-1].dispatchers, self.execution_contexts[-1].dispatchers, ] async def lookup_param_without_log( self, name: str, annotation: Any, default: Any, using_path: List[List["T_Dispatcher"]] = None, ) -> Any: self.parameter_contexts.append( ParameterContext( name, annotation, default, [], using_path or self.init_dispatch_path() ) ) result = None try: for dispatcher in self.current_path: result = await run_always_await_safely( self.dispatcher_callable_detector(dispatcher), self ) if result is None: continue if result.__class__ is Force: result = result.target self.execution_contexts[-1]._index = 0 return result else: raise RequirementCrashed( "the dispatching requirement crashed: ", self.name, self.annotation, self.default, ) finally: self.parameter_contexts.pop() async def lookup_param( self, name: str, annotation: Any, default: Any, using_path: List[List["T_Dispatcher"]] = None, ) -> Any: self.parameter_contexts.append( ParameterContext( name, annotation, default, [], using_path or self.init_dispatch_path() ) ) track_log = self.track_log.log track_log.append((TrackLogType.LookupStart, name, annotation, default)) try: for dispatcher in self.current_path: result = await run_always_await_safely( self.dispatcher_callable_detector(dispatcher), self ) if result is None: self.track_log.fluent_success = False track_log.append((TrackLogType.Continue, name, dispatcher)) continue if result.__class__ is Force: result = result.target track_log.append((TrackLogType.Result, name, dispatcher)) self.execution_contexts[-1]._index = 0 return result else: track_log.append((TrackLogType.RequirementCrashed, name)) raise RequirementCrashed( "the dispatching requirement crashed: ", self.name, self.annotation, self.default, ) finally: track_log.append((TrackLogType.LookupEnd, name)) self.parameter_contexts.pop() async def lookup_using_current(self) -> Any: result = None for dispatcher in self.current_path: result = await run_always_await_safely( self.dispatcher_callable_detector(dispatcher), self ) if result is None: continue if result.__class__ is Force: result = result.target self.execution_contexts[-1]._index = 0 return result else: raise RequirementCrashed( "the dispatching requirement crashed: ", self.name, self.annotation, self.default, ) async def lookup_by_directly( self, dispatcher: T_Dispatcher, name: str, annotation: Any, default: Any, using_path: List[List["T_Dispatcher"]] = None, ) -> Any: self.parameter_contexts.append( ParameterContext( name, annotation, default, [], using_path or self.init_dispatch_path() ) ) dispatcher_callable = self.dispatcher_callable_detector(dispatcher) try: result = await run_always_await_safely(dispatcher_callable, self) if result.__class__ is Force: return result.target return result finally: self.parameter_contexts.pop()
Class variables
var broadcast : Broadcast
var execution_contexts : List[ExecutionContext]
var parameter_contexts : List[ParameterContext]
var track_logs : List[TrackLog]
Static methods
def dispatcher_callable_detector(dispatcher: Union[Type[ForwardRef('BaseDispatcher')], ForwardRef('BaseDispatcher'), Callable[[ForwardRef('DispatcherInterface')], Any]]) ‑> Callable[[DispatcherInterface], Any]
-
Expand source code
@staticmethod @lru_cache(None) def dispatcher_callable_detector(dispatcher: T_Dispatcher) -> T_Dispatcher_Callable: if hasattr(dispatcher, "catch"): if isfunction(dispatcher.catch) or not isclass(dispatcher): return dispatcher.catch else: return partial(dispatcher.catch, None) elif callable(dispatcher): return dispatcher else: raise ValueError("invaild dispatcher: ", dispatcher)
def get_lifecycle_refs(dispatcher: T_Dispatcher) ‑> Union[Dict[str, List], NoneType]
-
Expand source code
@staticmethod @lru_cache(None) def get_lifecycle_refs( dispatcher: "T_Dispatcher", ) -> Optional[Dict[str, List]]: from graia.broadcast.entities.dispatcher import BaseDispatcher lifecycle_refs: Dict[str, List] = {} if not isinstance(dispatcher, (BaseDispatcher, type)): return for name in DEFAULT_LIFECYCLE_NAMES: lifecycle_refs.setdefault(name, []) abstract_lifecycle_func = cached_getattr(BaseDispatcher, name) unbound_attr = getattr(dispatcher, name, None) if unbound_attr is None: continue orig_call = unbound_attr while ismethod(orig_call): orig_call = unbound_attr.__func__ if orig_call is abstract_lifecycle_func: continue lifecycle_refs[name].append(unbound_attr) return lifecycle_refs
Instance variables
var annotation : Any
-
Expand source code
@property def annotation(self) -> Any: return self.parameter_contexts[-1].annotation
var current_path : NestableIterable[int, typing.Union[typing.Type[BaseDispatcher], BaseDispatcher, typing.Callable[[DispatcherInterface], typing.Any]]]
-
Expand source code
@property def current_path(self) -> NestableIterable[int, T_Dispatcher]: return self.parameter_contexts[-1].path
var default : Any
-
Expand source code
@property def default(self) -> Any: return self.parameter_contexts[-1].default
var event : BaseEvent
-
Expand source code
@property def event(self) -> BaseEvent: return self.execution_contexts[-1].event
var global_dispatcher : List[Union[Type[BaseDispatcher], BaseDispatcher, Callable[[DispatcherInterface], Any]]]
-
Expand source code
@property def global_dispatcher(self) -> List[T_Dispatcher]: return self.execution_contexts[0].dispatchers
var has_current_exec_context : bool
-
Expand source code
@property def has_current_exec_context(self) -> bool: return len(self.execution_contexts) >= 2
var has_current_param_context : bool
-
Expand source code
@property def has_current_param_context(self) -> bool: return len(self.parameter_contexts) >= 2
var name : str
-
Expand source code
@property def name(self) -> str: return self.parameter_contexts[-1].name
var track_log
-
Expand source code
@property def track_log(self): return self.track_logs[-1]
Methods
def dispatcher_pure_generator(self) ‑> Generator[NoneType, NoneType, Union[Type[BaseDispatcher], BaseDispatcher, Callable[[DispatcherInterface], Any]]]
-
Expand source code
def dispatcher_pure_generator(self) -> Generator[None, None, T_Dispatcher]: return itertools.chain( self.execution_contexts[0].dispatchers, self.parameter_contexts[-1].dispatchers, self.execution_contexts[-1].dispatchers, )
async def exec_lifecycle(self, lifecycle_name: str, *args, **kwargs)
-
Expand source code
async def exec_lifecycle(self, lifecycle_name: str, *args, **kwargs): lifecycle_funcs = self.execution_contexts[-1].lifecycle_refs.get( lifecycle_name, [] ) if lifecycle_funcs: for func in lifecycle_funcs: await run_always_await_safely(func, self, *args, **kwargs)
async def exit_current_execution(self)
-
Expand source code
async def exit_current_execution(self): self.execution_contexts.pop() self.track_logs.pop()
def flush_lifecycle_refs(self, dispatchers: List[ForwardRef('T_Dispatcher')] = None)
-
Expand source code
def flush_lifecycle_refs( self, dispatchers: List["T_Dispatcher"] = None, ): from graia.broadcast.entities.dispatcher import BaseDispatcher lifecycle_refs = self.execution_contexts[-1].lifecycle_refs if dispatchers is None and lifecycle_refs: # 已经刷新. return for dispatcher in dispatchers or self.dispatcher_pure_generator(): if ( not isinstance(dispatcher, BaseDispatcher) or dispatcher.__class__ is type ): continue for name, value in self.get_lifecycle_refs(dispatcher).items(): lifecycle_refs.setdefault(name, []) lifecycle_refs[name].extend(value)
def init_dispatch_path(self) ‑> List[List[Union[Type[BaseDispatcher], BaseDispatcher, Callable[[DispatcherInterface], Any]]]]
-
Expand source code
def init_dispatch_path(self) -> List[List["T_Dispatcher"]]: return [ [], self.execution_contexts[0].dispatchers, self.parameter_contexts[-1].dispatchers, self.execution_contexts[-1].dispatchers, ]
def inject_execution_raw(self, *dispatchers: List[ForwardRef('T_Dispatcher')])
-
Expand source code
def inject_execution_raw(self, *dispatchers: List["T_Dispatcher"]): for dispatcher in dispatchers: self.execution_contexts[-1].dispatchers.insert(0, dispatcher) self.flush_lifecycle_refs(dispatchers)
def inject_global_raw(self, *dispatchers: List[ForwardRef('T_Dispatcher')])
-
Expand source code
def inject_global_raw(self, *dispatchers: List["T_Dispatcher"]): # self.dispatchers.extend(dispatchers) for dispatcher in dispatchers[::-1]: self.execution_contexts[0].dispatchers.insert(1, dispatcher) self.flush_lifecycle_refs(dispatchers)
def inject_local_raw(self, *dispatchers: List[ForwardRef('T_Dispatcher')])
-
Expand source code
def inject_local_raw(self, *dispatchers: List["T_Dispatcher"]): # 为什么没有 flush: 因为这里的 lifecycle 是无意义的. for dispatcher in dispatchers[::-1]: self.parameter_contexts[-1].dispatchers.insert(0, dispatcher)
async def lookup_by_directly(self, dispatcher: Union[Type[ForwardRef('BaseDispatcher')], ForwardRef('BaseDispatcher'), Callable[[ForwardRef('DispatcherInterface')], Any]], name: str, annotation: Any, default: Any, using_path: List[List[ForwardRef('T_Dispatcher')]] = None) ‑> Any
-
Expand source code
async def lookup_by_directly( self, dispatcher: T_Dispatcher, name: str, annotation: Any, default: Any, using_path: List[List["T_Dispatcher"]] = None, ) -> Any: self.parameter_contexts.append( ParameterContext( name, annotation, default, [], using_path or self.init_dispatch_path() ) ) dispatcher_callable = self.dispatcher_callable_detector(dispatcher) try: result = await run_always_await_safely(dispatcher_callable, self) if result.__class__ is Force: return result.target return result finally: self.parameter_contexts.pop()
async def lookup_param(self, name: str, annotation: Any, default: Any, using_path: List[List[ForwardRef('T_Dispatcher')]] = None) ‑> Any
-
Expand source code
async def lookup_param( self, name: str, annotation: Any, default: Any, using_path: List[List["T_Dispatcher"]] = None, ) -> Any: self.parameter_contexts.append( ParameterContext( name, annotation, default, [], using_path or self.init_dispatch_path() ) ) track_log = self.track_log.log track_log.append((TrackLogType.LookupStart, name, annotation, default)) try: for dispatcher in self.current_path: result = await run_always_await_safely( self.dispatcher_callable_detector(dispatcher), self ) if result is None: self.track_log.fluent_success = False track_log.append((TrackLogType.Continue, name, dispatcher)) continue if result.__class__ is Force: result = result.target track_log.append((TrackLogType.Result, name, dispatcher)) self.execution_contexts[-1]._index = 0 return result else: track_log.append((TrackLogType.RequirementCrashed, name)) raise RequirementCrashed( "the dispatching requirement crashed: ", self.name, self.annotation, self.default, ) finally: track_log.append((TrackLogType.LookupEnd, name)) self.parameter_contexts.pop()
async def lookup_param_without_log(self, name: str, annotation: Any, default: Any, using_path: List[List[ForwardRef('T_Dispatcher')]] = None) ‑> Any
-
Expand source code
async def lookup_param_without_log( self, name: str, annotation: Any, default: Any, using_path: List[List["T_Dispatcher"]] = None, ) -> Any: self.parameter_contexts.append( ParameterContext( name, annotation, default, [], using_path or self.init_dispatch_path() ) ) result = None try: for dispatcher in self.current_path: result = await run_always_await_safely( self.dispatcher_callable_detector(dispatcher), self ) if result is None: continue if result.__class__ is Force: result = result.target self.execution_contexts[-1]._index = 0 return result else: raise RequirementCrashed( "the dispatching requirement crashed: ", self.name, self.annotation, self.default, ) finally: self.parameter_contexts.pop()
async def lookup_using_current(self) ‑> Any
-
Expand source code
async def lookup_using_current(self) -> Any: result = None for dispatcher in self.current_path: result = await run_always_await_safely( self.dispatcher_callable_detector(dispatcher), self ) if result is None: continue if result.__class__ is Force: result = result.target self.execution_contexts[-1]._index = 0 return result else: raise RequirementCrashed( "the dispatching requirement crashed: ", self.name, self.annotation, self.default, )
def start_execution(self, event: BaseEvent, dispatchers: List[Union[Type[ForwardRef('BaseDispatcher')], ForwardRef('BaseDispatcher'), Callable[[ForwardRef('DispatcherInterface')], Any]]], track_log_receiver: TrackLog = None) ‑> DispatcherInterface
-
Expand source code
def start_execution( self, event: BaseEvent, dispatchers: List[T_Dispatcher], track_log_receiver: TrackLog = None, ) -> "DispatcherInterface": self.execution_contexts.append(ExecutionContext(dispatchers, event)) self.flush_lifecycle_refs() self.track_logs.append(track_log_receiver or TrackLog()) return self
class EmptyEvent (**data: Any)
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class EmptyEvent(BaseEvent): class Dispatcher(BaseDispatcher): @staticmethod def catch(_): pass
Ancestors
- BaseEvent
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var Dispatcher
-
所有非单函数型 Dispatcher 的基类, 用于为参数解析提供可扩展的支持.