thingflow.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. # thingflow for micropython
  2. from ucollections import namedtuple
  3. import utime
  4. class FatalError(Exception):
  5. pass
  6. class ExcInDispatch(FatalError):
  7. # Dispatching an event should not raise an error, other than a fatal error.
  8. pass
  9. _Connection = namedtuple('_Connection', ['on_next', 'on_completed', 'on_error'])
  10. # Base class for event generators (output_things).
  11. class OutputThing:
  12. __slots__ = ('__connections__',)
  13. def __init__(self, ports=None):
  14. self.__connections__ = {}
  15. if ports==None:
  16. ports = ['default']
  17. for port in ports:
  18. self.__connections__[port] = []
  19. def connect(self, input_thing, port_map=None):
  20. if port_map==None:
  21. out_port = 'default'
  22. in_port = 'default'
  23. else:
  24. (out_port, in_port) = port_map
  25. if in_port=='default':
  26. dispatchnames = ('on_next', 'on_completed', 'on_error')
  27. else:
  28. dispatchnames = ('on_%s_next' % in_port,
  29. 'on_%s_completd' % in_port,
  30. 'on_%s_error' % in_port)
  31. functions = [getattr(input_thing, m) for m in dispatchnames]
  32. connection = _Connection(on_next=functions[0],
  33. on_completed=functions[1],
  34. on_error=functions[2])
  35. new_connections = self.__connections__[out_port].copy()
  36. new_connections.append(connection)
  37. self.__connections__[out_port] = new_connections
  38. def dispose():
  39. new_connections = self.__connections__[out_port].copy()
  40. new_connections.remove(connection)
  41. self.__connections__[out_port] = new_connections
  42. return dispose
  43. def _dispatch_next(self, x, port=None):
  44. connections = self.__connections__[port if port is not None
  45. else 'default']
  46. try:
  47. for s in connections:
  48. s.on_next(x)
  49. except FatalError:
  50. raise
  51. except Exception as e:
  52. raise ExcInDispatch('dispatching event %s: %s' % (repr(x), repr(e)))
  53. def _dispatch_completed(self, port=None):
  54. if port==None:
  55. port = 'default'
  56. connections = self.__connections__[port]
  57. try:
  58. for s in connections:
  59. s.on_completed()
  60. except FatalError:
  61. raise
  62. except Exception as e:
  63. raise ExcInDispatch(e)
  64. del self.__connections__[port]
  65. def _dispatch_error(self, e, port=None):
  66. if port==None:
  67. port = 'default'
  68. connections = self.__connections__[port]
  69. try:
  70. for s in connections:
  71. s.on_error(e)
  72. except FatalError:
  73. raise
  74. except Exception as e:
  75. raise ExcInDispatch(e)
  76. del self.__connections__[port]
  77. def _observe(self):
  78. # Get an event and call the appropriate dispatch function.
  79. raise NotImplemented
  80. class SensorAsOutputThing(OutputThing):
  81. __slots__ = ('sensor')
  82. def __init__(self, sensor):
  83. super().__init__(None)
  84. self.sensor = sensor
  85. def _observe(self):
  86. try:
  87. v = self.sensor.sample()
  88. self._dispatch_next((self.sensor.sensor_id, utime.time(), v),)
  89. except FatalError:
  90. raise
  91. except StopIteration:
  92. self._dispatch_completed()
  93. except Exception as e:
  94. self._dispatch_error(e)
  95. class Output:
  96. def on_next(self, x):
  97. print(x)
  98. def on_completed():
  99. pass
  100. def on_error(self, e):
  101. print("Error: " + e)
  102. class _Interval:
  103. __slots__ = ('ticks', 'tasks', 'next_tick')
  104. def __init__(self, ticks, next_tick):
  105. self.ticks = ticks
  106. self.tasks = []
  107. self.next_tick = next_tick
  108. class Scheduler:
  109. __slots__ = ('clock_wrap', 'time_in_ticks', 'intervals', 'sorted_ticks')
  110. def __init__(self, clock_wrap=1048576):
  111. self.clock_wrap = clock_wrap
  112. self.time_in_ticks = 0
  113. self.intervals = {}
  114. self.sorted_ticks = []
  115. def _add_task(self, task, ticks):
  116. assert ticks > 0 and ticks < self.clock_wrap
  117. if ticks in self.intervals:
  118. self.intervals[ticks].tasks.append(task)
  119. else:
  120. next_tick = None
  121. for i in self.sorted_ticks:
  122. if (i%ticks)==0 or (ticks%i)==0:
  123. next_tick = self.intervals[i].next_tick
  124. break
  125. if next_tick is None:
  126. next_tick = self.time_in_ticks # otherwise use now
  127. interval = _Interval(ticks, next_tick)
  128. interval.tasks.append(task)
  129. self.intervals[ticks] = interval
  130. self.sorted_ticks.append(ticks)
  131. self.sorted_ticks.sort()
  132. def _remove_task(self, task):
  133. for i in self.intervals.values():
  134. if task in i.tasks:
  135. i.tasks.remove(task)
  136. if len(i.tasks)==0:
  137. self.sorted_ticks.remove(i.ticks)
  138. del self.intervals[i.ticks]
  139. return
  140. assert 0, "Did not find task %s" % task
  141. def _get_next_sleep_interval(self):
  142. assert len(self.intervals)>0
  143. sleep_time = self.clock_wrap
  144. for i in self.intervals.values():
  145. time_diff = max(i.next_tick-self.time_in_ticks, 0)
  146. if time_diff < sleep_time:
  147. sleep_time = time_diff
  148. return sleep_time
  149. def _advance_time(self, ticks):
  150. assert ticks < self.clock_wrap
  151. self.time_in_ticks += ticks
  152. if self.time_in_ticks >= self.clock_wrap:
  153. # wrap all the clocks
  154. unwrapped_time = self.time_in_ticks
  155. self.time_in_ticks = self.time_in_ticks % self.clock_wrap
  156. for i in self.intervals.values():
  157. if i.next_tick >= self.clock_wrap:
  158. i.next_tick = i.next_tick % self.clock_wrap
  159. else: # interval overdue
  160. i.next_tick = self.time_in_ticks - (unwrapped_time-i.next_tick)
  161. def _get_tasks(self): # get runnable tasks
  162. sample_list = []
  163. for ticks in self.sorted_ticks:
  164. i = self.intervals[ticks]
  165. if i.next_tick<=self.time_in_ticks:
  166. sample_list.extend(i.tasks)
  167. i.next_tick = i.next_tick + i.ticks
  168. return sample_list
  169. def schedule_periodic(self, output_thing, interval):
  170. interval_ticks = int(round(interval*100))
  171. assert interval_ticks>0
  172. self._add_task(output_thing, interval_ticks)
  173. def cancel():
  174. self._remove_task(output_thing)
  175. return cancel
  176. def schedule_sensor(self, sensor, interval, *conns):
  177. task = SensorAsOutputThing(sensor)
  178. for s in conns:
  179. task.connect(s)
  180. return self.schedule_periodic(task, interval)
  181. def run_forever(self):
  182. assert len(self.intervals)>0
  183. while True:
  184. output_things = self._get_tasks()
  185. start_ts = utime.ticks_ms()
  186. for pub in output_things:
  187. pub._observe()
  188. if not pub.__connections__:
  189. self._remove_task(pub)
  190. if len(self.intervals)==0:
  191. break
  192. end_ts = utime.ticks_ms()
  193. if end_ts > start_ts:
  194. self._advance_time(int(round((end_ts-start_ts)/10)))
  195. sleep = self._get_next_sleep_interval()
  196. utime.sleep_ms(sleep*10)
  197. now = utime.ticks_ms()
  198. self._advance_time(int(round((now-end_ts)/10)) if now>=end_ts else sleep)