Source code for piripherals.util
"""utility functions and classes"""
from time import sleep
from threading import Thread, Event
from functools import partial
from os.path import getmtime
import traceback
import atexit
try:
import RPi.GPIO as GPIO
except:
pass
__all__ = ['fork', 'not_raising', 'IRQHandler', 'Poller', 'on_change', 'noop']
[docs]def fork(func):
"""run func asynchronously in a DaemonThread"""
Thread(target=func, daemon=True).start()
noop = lambda *x: None
[docs]def not_raising(func):
"""Wraps a function and swallows exceptions.
Args:
func: function to wrap
Returns:
wrapped function, that does not raise Exceptions,
Exceptions are printed to console
"""
def logging_func(*args, **kwargs):
try:
func(*args, **kwargs)
except:
traceback.print_exc()
return logging_func
[docs]def on_change(file, callback, delay=1, forking=1):
t = getmtime(file)
def check():
while True:
if getmtime(file) != t:
callback()
sleep(delay)
if forking:
fork(check)
else:
check()
_cleanup = 0
def cleanup_at_exit(gpio):
global _cleanup
if not _cleanup:
atexit.register(gpio.cleanup)
_cleanup = 1
[docs]class IRQHandler:
"""Abstraction of an IRQ handler.
An edge on the IRQ pin sets a flag. The callback is invoked on a separate
thread when the flag was set. The flag is reset when the pin is high again.
The callback may be invoked repeatedly if the pin does not reset. So you have
to reset the IRQ inside the callback, such that when the callback returns,
the IRQ line is high again.
This uses RPi.GPIO_ internally.
.. _RPi.GPIO: https://pypi.python.org/pypi/RPi.GPIO
.. _BCM: https://pinout.xyz/
Args:
pin (int): BCM_ pin number attached to IRQ line. 0 disables use of GPIO
pins, call ``interrupt`` explicitly
callback: function invoked on IRQ.
edge (int): fire interrupt on falling=0 or rising=1 edge. 1 inverts
the logic, so IRQ is considered reset when low.
pullup (int): activate internal pullup
1=pullup, 0=nothing, -1=pulldown.
"""
def __init__(self, pin, callback, edge=0, pullup=1):
if pin:
GPIO.setmode(GPIO.BCM)
cleanup_at_exit(GPIO)
pud = [GPIO.PUD_DOWN, GPIO.PUD_OFF, GPIO.PUD_UP][pullup + 1]
reset = GPIO.LOW if edge else GPIO.HIGH
edge = GPIO.RISING if edge else GPIO.FALLING
GPIO.setup(pin, GPIO.IN, pull_up_down=pud)
GPIO.add_event_detect(pin, edge, self.interrupt)
def is_reset(): return GPIO.input(pin) == reset
else:
def is_reset(): return True
self._irq = Event()
callback = not_raising(callback)
def loop():
while True:
self._irq.wait()
callback()
if is_reset():
self._irq.clear()
fork(loop)
[docs] def interrupt(self, *a, **kw):
"""fire interrupt
All arguments are ignored.
"""
self._irq.set()
[docs]class Poller:
"""Polling loop as replacement for IRQHandler.
Use it if using the IRQ line is not possible or desired.
Args:
callcack: function that is called continously. The actuall polling
happens in this callback.
delay (float): delay in seconds between invocations of callback
"""
def __init__(self, callback, delay=0.01):
callback = not_raising(callback)
def loop():
while True:
callback()
sleep(delay)
fork(loop)
class Polling:
"""universal halting Polling loop
The loop halts if the state did not change for the given number of polls
"""
def __init__(self, getter, setter, delay=0.01, count=100):
event = Event()
def poll():
k = -1
state=None
while True:
if count and k < 0:
event.clear()
event.wait()
k = count
s=getter()
setter(s)
if state!=s:
k = count
else:
k -= 1
state=s
sleep(delay)
fork(poll)
return event.set