event.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. """This version uses a traditional event-driven version,
  2. using continuation passing style. Each method call is passed
  3. a completion callback and an error callback
  4. """
  5. from statistics import median
  6. import json
  7. import asyncio
  8. import random
  9. import time
  10. import hbmqtt.client
  11. from collections import deque
  12. from thingflow.base import SensorEvent
  13. URL = "mqtt://localhost:1883"
  14. class RandomSensor:
  15. def __init__(self, sensor_id, mean=100.0, stddev=20.0, stop_after_events=None):
  16. self.sensor_id = sensor_id
  17. self.mean = mean
  18. self.stddev = stddev
  19. self.stop_after_events = stop_after_events
  20. if stop_after_events is not None:
  21. def generator():
  22. for i in range(stop_after_events):
  23. yield round(random.gauss(mean, stddev), 1)
  24. else: # go on forever
  25. def generator():
  26. while True:
  27. yield round(random.gauss(mean, stddev), 1)
  28. self.generator = generator()
  29. def sample(self):
  30. return self.generator.__next__()
  31. def __repr__(self):
  32. if self.stop_after_events is None:
  33. return 'RandomSensor(%s, mean=%s, stddev=%s)' % \
  34. (self.sensor_id, self.mean, self.stddev)
  35. else:
  36. return 'RandomSensor(%s, mean=%s, stddev=%s, stop_after_events=%s)' % \
  37. (self.sensor_id, self.mean, self.stddev, self.stop_after_events)
  38. class PeriodicMedianTransducer:
  39. """Emit an event once every ``period`` input events.
  40. The value is the median of the inputs received since the last
  41. emission.
  42. """
  43. def __init__(self, period=5):
  44. self.period = period
  45. self.samples = [None for i in range(period)]
  46. self.events_since_last = 0
  47. self.last_event = None # this is used in emitting the last event
  48. def step(self, v):
  49. self.samples[self.events_since_last] = v.val
  50. self.events_since_last += 1
  51. if self.events_since_last==self.period:
  52. val = median(self.samples)
  53. event = SensorEvent(sensor_id=v.sensor_id, ts=v.ts, val=val)
  54. self.events_since_last = 0
  55. return event
  56. else:
  57. self.last_event = v # save in case we complete before completing a period
  58. return None
  59. def complete(self):
  60. if self.events_since_last>0:
  61. # if we have some partial state, we emit one final event that
  62. # averages whatever we saw since the last emission.
  63. return SensorEvent(sensor_id=self.last_event.sensor_id,
  64. ts=self.last_event.ts,
  65. val=median(self.samples[0:self.events_since_last]))
  66. def csv_writer(evt):
  67. print("csv_writer(%s)" % repr(evt))
  68. class MqttWriter:
  69. """All the processing is asynchronous. We ensure that a given send has
  70. completed and the callbacks called before we process the next one.
  71. """
  72. def __init__(self, url, topic, event_loop):
  73. self.url = url
  74. self.topic = topic
  75. self.client = hbmqtt.client.MQTTClient(loop=event_loop)
  76. self.event_loop = event_loop
  77. self.connected = False
  78. self.pending_task = None
  79. self.request_queue = deque()
  80. def _to_message(self, msg):
  81. return bytes(json.dumps((msg.sensor_id, msg.ts, msg.val),), encoding='utf-8')
  82. def _request_done(self, f, completion_cb, error_cb):
  83. assert f==self.pending_task
  84. self.pending_task = None
  85. exc = f.exception()
  86. if exc:
  87. self.event_loop.call_soon(error_cb, exc)
  88. else:
  89. self.event_loop.call_soon(completion_cb)
  90. if len(self.request_queue)>0:
  91. self.event_loop.call_soon(self._process_queue)
  92. def _process_queue(self):
  93. assert self.pending_task == None
  94. assert len(self.request_queue)>0
  95. (msg, completion_cb, error_cb) = self.request_queue.popleft()
  96. if msg is not None:
  97. print("send from queue: %s" % msg)
  98. self.pending_task = self.event_loop.create_task(
  99. self.client.publish(self.topic, msg)
  100. )
  101. else: # None means that we wanted a disconnect
  102. print("disconnect")
  103. self.pending_task = self.event_loop.create_task(
  104. self.client.disconnect()
  105. )
  106. self.pending_task.add_done_callback(lambda f:
  107. self._request_done(f, completion_cb,
  108. error_cb))
  109. def send(self, msg, completion_cb, error_cb):
  110. if not self.connected:
  111. print("attempting connection")
  112. self.request_queue.append((self._to_message(msg),
  113. completion_cb, error_cb),)
  114. self.connected = True
  115. self.pending_task = self.event_loop.create_task(self.client.connect(self.url))
  116. def connect_done(f):
  117. assert f==self.pending_task
  118. print("connected")
  119. self.pending_task = None
  120. self.event_loop.call_soon(self._process_queue)
  121. self.pending_task.add_done_callback(connect_done)
  122. elif self.pending_task:
  123. self.request_queue.append((self._to_message(msg), completion_cb,
  124. error_cb),)
  125. else:
  126. print("sending %s" % self._to_message(msg))
  127. self.pending_task = self.event_loop.create_task(
  128. self.client.publish(self.topic, self._to_message(msg))
  129. )
  130. self.pending_task.add_done_callback(lambda f:
  131. self._request_done(f, completion_cb,
  132. error_cb))
  133. def disconnect(self, completion_cb, error_cb, drop_queue=False):
  134. if not self.connected:
  135. return
  136. if len(self.request_queue)>0 and drop_queue: # for error situations
  137. self.request_queue = deque()
  138. if self.pending_task:
  139. self.request_queue.append((None, completion_cb, error_cb),)
  140. else:
  141. print("disconnecting")
  142. self.pending_task = self.event_loop.create_task(
  143. self.client.disconnect()
  144. )
  145. self.pending_task.add_done_callback(lambda f:
  146. self._request_done(f, completion_cb,
  147. error_cb))
  148. def sample_and_process(sensor, mqtt_writer, xducer, completion_cb, error_cb):
  149. try:
  150. sample = sensor.sample()
  151. except StopIteration:
  152. final_event = xducer.complete()
  153. if final_event:
  154. mqtt_writer.send(final_event,
  155. lambda: mqtt_writer.disconnect(lambda: completion_cb(False), error_cb),
  156. error_cb)
  157. else:
  158. mqtt_writer.disconnect(lambda: completion_cb(False), error_cb)
  159. return
  160. event = SensorEvent(sensor_id=sensor.sensor_id, ts=time.time(), val=sample)
  161. csv_writer(event)
  162. median_event = xducer.step(event)
  163. if median_event:
  164. mqtt_writer.send(median_event,
  165. lambda: completion_cb(True), error_cb)
  166. else:
  167. completion_cb(True)
  168. sensor = RandomSensor('sensor-2', stop_after_events=12)
  169. transducer = PeriodicMedianTransducer(5)
  170. event_loop = asyncio.get_event_loop()
  171. writer = MqttWriter(URL, sensor.sensor_id, event_loop)
  172. def loop():
  173. def completion_cb(more):
  174. if more:
  175. event_loop.call_later(0.5, loop)
  176. else:
  177. print("all done, no more callbacks to schedule")
  178. event_loop.stop()
  179. def error_cb(e):
  180. print("Got error: %s" % e)
  181. event_loop.stop()
  182. event_loop.call_soon(
  183. lambda: sample_and_process(sensor, writer, transducer,
  184. completion_cb, error_cb)
  185. )
  186. event_loop.call_soon(loop)
  187. event_loop.run_forever()
  188. print("that's all folks")