asyncawait.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. """This version uses the async and await calls.
  2. """
  3. from statistics import median
  4. import json
  5. import asyncio
  6. import random
  7. import time
  8. import hbmqtt.client
  9. from thingflow.base import SensorEvent
  10. URL = "mqtt://localhost:1883"
  11. class RandomSensor:
  12. def __init__(self, sensor_id, mean=100.0, stddev=20.0, stop_after_events=None):
  13. self.sensor_id = sensor_id
  14. self.mean = mean
  15. self.stddev = stddev
  16. self.stop_after_events = stop_after_events
  17. if stop_after_events is not None:
  18. def generator():
  19. for i in range(stop_after_events):
  20. yield round(random.gauss(mean, stddev), 1)
  21. else: # go on forever
  22. def generator():
  23. while True:
  24. yield round(random.gauss(mean, stddev), 1)
  25. self.generator = generator()
  26. def sample(self):
  27. return self.generator.__next__()
  28. def __repr__(self):
  29. if self.stop_after_events is None:
  30. return 'RandomSensor(%s, mean=%s, stddev=%s)' % \
  31. (self.sensor_id, self.mean, self.stddev)
  32. else:
  33. return 'RandomSensor(%s, mean=%s, stddev=%s, stop_after_events=%s)' % \
  34. (self.sensor_id, self.mean, self.stddev, self.stop_after_events)
  35. class PeriodicMedianTransducer:
  36. """Emit an event once every ``period`` input events.
  37. The value is the median of the inputs received since the last
  38. emission.
  39. """
  40. def __init__(self, period=5):
  41. self.period = period
  42. self.samples = [None for i in range(period)]
  43. self.events_since_last = 0
  44. self.last_event = None # this is used in emitting the last event
  45. def step(self, v):
  46. self.samples[self.events_since_last] = v.val
  47. self.events_since_last += 1
  48. if self.events_since_last==self.period:
  49. val = median(self.samples)
  50. event = SensorEvent(sensor_id=v.sensor_id, ts=v.ts, val=val)
  51. self.events_since_last = 0
  52. return event
  53. else:
  54. self.last_event = v # save in case we complete before completing a period
  55. return None
  56. def complete(self):
  57. if self.events_since_last>0:
  58. # if we have some partial state, we emit one final event that
  59. # averages whatever we saw since the last emission.
  60. return SensorEvent(sensor_id=self.last_event.sensor_id,
  61. ts=self.last_event.ts,
  62. val=median(self.samples[0:self.events_since_last]))
  63. def csv_writer(evt):
  64. print("csv_writer(%s)" % repr(evt))
  65. class MqttWriter:
  66. def __init__(self, url, topic, event_loop):
  67. self.url = url
  68. self.topic = topic
  69. self.client = hbmqtt.client.MQTTClient(loop=event_loop)
  70. self.connected = False
  71. def _to_message(self, msg):
  72. return bytes(json.dumps((msg.sensor_id, msg.ts, msg.val),), encoding='utf-8')
  73. async def send(self, msg):
  74. if not self.connected:
  75. print("attempting connection")
  76. await self.client.connect(self.url)
  77. self.connected = True
  78. print("connected")
  79. print("sending %s" % self._to_message(msg))
  80. await self.client.publish(self.topic, self._to_message(msg))
  81. async def disconnect(self):
  82. if self.connected:
  83. await self.client.disconnect()
  84. async def sample_and_process(sensor, mqtt_writer, xducer):
  85. try:
  86. sample = sensor.sample()
  87. except StopIteration:
  88. final_event = xducer.complete()
  89. if final_event:
  90. await mqtt_writer.send(final_event)
  91. print("disconnecting")
  92. await mqtt_writer.disconnect()
  93. return False
  94. event = SensorEvent(sensor_id=sensor.sensor_id, ts=time.time(), val=sample)
  95. csv_writer(event)
  96. median_event = xducer.step(event)
  97. if median_event:
  98. await mqtt_writer.send(median_event)
  99. return True
  100. sensor = RandomSensor('sensor-2', stop_after_events=12)
  101. transducer = PeriodicMedianTransducer(5)
  102. event_loop = asyncio.get_event_loop()
  103. writer = MqttWriter(URL, sensor.sensor_id, event_loop)
  104. def loop():
  105. coro = sample_and_process(sensor, writer, transducer)
  106. task = event_loop.create_task(coro)
  107. def done_callback(f):
  108. exc = f.exception()
  109. if exc:
  110. raise exc
  111. elif f.result()==False:
  112. print("all done, no more callbacks to schedule")
  113. event_loop.stop()
  114. else:
  115. event_loop.call_later(0.5, loop)
  116. task.add_done_callback(done_callback)
  117. event_loop.call_soon(loop)
  118. event_loop.run_forever()
  119. print("that's all folks")