Source code for piripherals.event

from threading import current_thread
from queue import Queue
from functools import partial
from piripherals.util import fork, not_raising

__all__ = ['Event']


class EventLoop:
    """queue for processing function calls on a separate thread"""

    def __init__(self):
        self.events = Queue()
        self.loopthread = None

    def _start(self):
        if self.loopthread:
            return

        def loop():
            self.loopthread = current_thread()
            for f in self:
                f()

        fork(loop)
        return self

    def __iter__(self): return self

    def __next__(self): return self.remove()

    def in_loop(self):
        """test if currently in eventloop.

        Return:
            bool: True if on eventloop
        """
        return current_thread() is self.loopthread

    def add(self, func):
        """add to the queue

        Args:
            func(callable): func to put in the queue
        """
        self._start()
        self.events.put(not_raising(func))
        return self

    def remove(self):
        """get function from the queue and remove it. Blocks until there is
        something in the queue."""
        return self.events.get()

    __lshift__ = add


[docs]class Event: """Event with attached handlers and optional condition. Args: name (str): name of the event, usuful for debugging condition (callable): condition to suppress firing, if it evaluates to False """ loop = EventLoop() def __init__(self, name='event', condition=lambda: True): self.name = name self.handlers = [] self.condition = condition def __str__(self): return str(self.name)
[docs] def add(self, handler): """add an event handler. Handlers can be added with ``Event >> handler``. Args: handler (callable): handler to attach. If the handler is an Event, its :meth:`fire` method will be attach as handler. """ if isinstance(handler, Event): self.handlers.append(handler.fire) else: self.handlers.append(handler) return self
[docs] def remove(self, handler): """remove an attached handler""" self.handlers.remove(handler) return self
[docs] def queue(self, *args, **kwargs): """Enqueue the event on eventloop. This is equivalent to just calling the Event itself. :meth:`fire` will be enqueued in the eventloop, such that it will be called on the loop thread and not on the thread calling :meth:`queue`. All arguments are passed to the handlers. """ Event.loop << partial(self.fire, *args, **kwargs)
[docs] def fire(self, *args, **kwargs): """Fire the event, call all attached handlers. The event is only fired, if the condition evaluates to True. All arguments are passed to the handlers. """ if self.condition(): for handler in self.handlers: not_raising(handler)(*args, **kwargs)
[docs] def conditional(self, cond): """derive a new conditional event A conditional Event can created with ``Event & condition``. Args: condition (callable): see :class:`Event` Return: Event: conditional Event, with this Event's :meth:`fire` as handler """ return Event(self.name + '?', cond) >> self.fire
[docs] def join(self, other): """derive Event as combination of two Events Events can be joined with ``EventA + EventB``. This differs from :meth:`add`, because it creates a new Event and leaves this untouched. Args: other (callable): handler to join with, can be another Event Return: Event: Event with this and other's :meth:`fire` as handlers """ if isinstance(other, Event): return Event(self.name + '+' + other.name) >> self.fire >> other.fire else: return Event(self.name + '+other') >> self.fire >> other
[docs] def partial(self, *args, **kwargs): """creat new Event with partially set arguments""" return Event('{}({}{})'.format(self, args, kwargs)) \ >> partial(self.fire, *args, **kwargs)
__call__ = queue __rshift__ = add __and__ = conditional __add__ = join