""" Core components for event-discrete simulation environments. """ from heapq import heappush, heappop from itertools import count from types import MethodType from typing import ( TYPE_CHECKING, Any, Generic, Iterable, List, Optional, Tuple, Type, TypeVar, Union, ) from simpy.events import ( AllOf, AnyOf, Event, EventPriority, Process, ProcessGenerator, Timeout, URGENT, NORMAL, ) Infinity: float = float('inf') #: Convenience alias for infinity T = TypeVar('T') class BoundClass(Generic[T]): """Allows classes to behave like methods. The ``__get__()`` descriptor is basically identical to ``function.__get__()`` and binds the first argument of the ``cls`` to the descriptor instance. """ def __init__(self, cls: Type[T]): self.cls = cls def __get__( self, instance: Optional['BoundClass'], owner: Optional[Type['BoundClass']] = None, ) -> Union[Type[T], MethodType]: if instance is None: return self.cls return MethodType(self.cls, instance) @staticmethod def bind_early(instance: object) -> None: """Bind all :class:`BoundClass` attributes of the *instance's* class to the instance itself to increase performance.""" for name, obj in instance.__class__.__dict__.items(): if type(obj) is BoundClass: bound_class = getattr(instance, name) setattr(instance, name, bound_class) class EmptySchedule(Exception): """Thrown by an :class:`Environment` if there are no further events to be processed.""" class StopSimulation(Exception): """Indicates that the simulation should stop now.""" @classmethod def callback(cls, event: Event) -> None: """Used as callback in :meth:`Environment.run()` to stop the simulation when the *until* event occurred.""" if event.ok: raise cls(event.value) else: raise event._value SimTime = Union[int, float] class Environment: """Execution environment for an event-based simulation. The passing of time is simulated by stepping from event to event. You can provide an *initial_time* for the environment. By default, it starts at ``0``. This class also provides aliases for common event types, for example :attr:`process`, :attr:`timeout` and :attr:`event`. """ def __init__(self, initial_time: SimTime = 0): self._now = initial_time self._queue: List[ Tuple[SimTime, EventPriority, int, Event] ] = [] # The list of all currently scheduled events. self._eid = count() # Counter for event IDs self._active_proc: Optional[Process] = None # Bind all BoundClass instances to "self" to improve performance. BoundClass.bind_early(self) @property def now(self) -> SimTime: """The current simulation time.""" return self._now @property def active_process(self) -> Optional[Process]: """The currently active process of the environment.""" return self._active_proc if TYPE_CHECKING: # This block is only evaluated when type checking with, e.g. Mypy. # These are the effective types of the methods created with BoundClass # magic and are thus a useful reference for SimPy users as well as for # static type checking. def process(self, generator: ProcessGenerator) -> Process: """Create a new :class:`~simpy.events.Process` instance for *generator*.""" return Process(self, generator) def timeout( self, delay: SimTime = 0, value: Optional[Any] = None ) -> Timeout: """Return a new :class:`~simpy.events.Timeout` event with a *delay* and, optionally, a *value*.""" return Timeout(self, delay, value) def event(self) -> Event: """Return a new :class:`~simpy.events.Event` instance. Yielding this event suspends a process until another process triggers the event. """ return Event(self) def all_of(self, events: Iterable[Event]) -> AllOf: """Return a :class:`~simpy.events.AllOf` condition for *events*.""" return AllOf(self, events) def any_of(self, events: Iterable[Event]) -> AnyOf: """Return a :class:`~simpy.events.AnyOf` condition for *events*.""" return AnyOf(self, events) else: process = BoundClass(Process) timeout = BoundClass(Timeout) event = BoundClass(Event) all_of = BoundClass(AllOf) any_of = BoundClass(AnyOf) def schedule( self, event: Event, priority: EventPriority = NORMAL, delay: SimTime = 0, ) -> None: """Schedule an *event* with a given *priority* and a *delay*.""" heappush(self._queue, (self._now + delay, priority, next(self._eid), event)) def peek(self) -> SimTime: """Get the time of the next scheduled event. Return :data:`~simpy.core.Infinity` if there is no further event.""" try: return self._queue[0][0] except IndexError: return Infinity def step(self) -> None: """Process the next event. Raise an :exc:`EmptySchedule` if no further events are available. """ try: self._now, _, _, event = heappop(self._queue) except IndexError: raise EmptySchedule() # Process callbacks of the event. Set the events callbacks to None # immediately to prevent concurrent modifications. callbacks, event.callbacks = event.callbacks, None # type: ignore for callback in callbacks: callback(event) if not event._ok and not hasattr(event, '_defused'): # The event has failed and has not been defused. Crash the # environment. # Create a copy of the failure exception with a new traceback. exc = type(event._value)(*event._value.args) exc.__cause__ = event._value raise exc def run( self, until: Optional[Union[SimTime, Event]] = None ) -> Optional[Any]: """Executes :meth:`step()` until the given criterion *until* is met. - If it is ``None`` (which is the default), this method will return when there are no further events to be processed. - If it is an :class:`~simpy.events.Event`, the method will continue stepping until this event has been triggered and will return its value. Raises a :exc:`RuntimeError` if there are no further events to be processed and the *until* event was not triggered. - If it is a number, the method will continue stepping until the environment's time reaches *until*. """ if until is not None: if not isinstance(until, Event): # Assume that *until* is a number if it is not None and # not an event. Create a Timeout(until) in this case. at: SimTime if isinstance(until, int): at = until else: at = float(until) if at <= self.now: raise ValueError( f'until(={at}) must be > the current simulation time.' ) # Schedule the event before all regular timeouts. until = Event(self) until._ok = True until._value = None self.schedule(until, URGENT, at - self.now) elif until.callbacks is None: # Until event has already been processed. return until.value until.callbacks.append(StopSimulation.callback) try: while True: self.step() except StopSimulation as exc: return exc.args[0] # == until.value except EmptySchedule: if until is not None: assert not until.triggered raise RuntimeError( f'No scheduled events left but "until" event was not ' f'triggered: {until}' ) return None