| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- """This version uses the async and await calls.
- """
- from statistics import median
- import json
- import asyncio
- import random
- import time
- import hbmqtt.client
- from thingflow.base import SensorEvent
- URL = "mqtt://localhost:1883"
- class RandomSensor:
- def __init__(self, sensor_id, mean=100.0, stddev=20.0, stop_after_events=None):
- self.sensor_id = sensor_id
- self.mean = mean
- self.stddev = stddev
- self.stop_after_events = stop_after_events
- if stop_after_events is not None:
- def generator():
- for i in range(stop_after_events):
- yield round(random.gauss(mean, stddev), 1)
- else: # go on forever
- def generator():
- while True:
- yield round(random.gauss(mean, stddev), 1)
- self.generator = generator()
- def sample(self):
- return self.generator.__next__()
- def __repr__(self):
- if self.stop_after_events is None:
- return 'RandomSensor(%s, mean=%s, stddev=%s)' % \
- (self.sensor_id, self.mean, self.stddev)
- else:
- return 'RandomSensor(%s, mean=%s, stddev=%s, stop_after_events=%s)' % \
- (self.sensor_id, self.mean, self.stddev, self.stop_after_events)
- class PeriodicMedianTransducer:
- """Emit an event once every ``period`` input events.
- The value is the median of the inputs received since the last
- emission.
- """
- def __init__(self, period=5):
- self.period = period
- self.samples = [None for i in range(period)]
- self.events_since_last = 0
- self.last_event = None # this is used in emitting the last event
-
- def step(self, v):
- self.samples[self.events_since_last] = v.val
- self.events_since_last += 1
- if self.events_since_last==self.period:
- val = median(self.samples)
- event = SensorEvent(sensor_id=v.sensor_id, ts=v.ts, val=val)
- self.events_since_last = 0
- return event
- else:
- self.last_event = v # save in case we complete before completing a period
- return None
- def complete(self):
- if self.events_since_last>0:
- # if we have some partial state, we emit one final event that
- # averages whatever we saw since the last emission.
- return SensorEvent(sensor_id=self.last_event.sensor_id,
- ts=self.last_event.ts,
- val=median(self.samples[0:self.events_since_last]))
- def csv_writer(evt):
- print("csv_writer(%s)" % repr(evt))
- class MqttWriter:
- def __init__(self, url, topic, event_loop):
- self.url = url
- self.topic = topic
- self.client = hbmqtt.client.MQTTClient(loop=event_loop)
- self.connected = False
- def _to_message(self, msg):
- return bytes(json.dumps((msg.sensor_id, msg.ts, msg.val),), encoding='utf-8')
-
- async def send(self, msg):
- if not self.connected:
- print("attempting connection")
- await self.client.connect(self.url)
- self.connected = True
- print("connected")
- print("sending %s" % self._to_message(msg))
- await self.client.publish(self.topic, self._to_message(msg))
- async def disconnect(self):
- if self.connected:
- await self.client.disconnect()
- async def sample_and_process(sensor, mqtt_writer, xducer):
- try:
- sample = sensor.sample()
- except StopIteration:
- final_event = xducer.complete()
- if final_event:
- await mqtt_writer.send(final_event)
- print("disconnecting")
- await mqtt_writer.disconnect()
- return False
- event = SensorEvent(sensor_id=sensor.sensor_id, ts=time.time(), val=sample)
- csv_writer(event)
- median_event = xducer.step(event)
- if median_event:
- await mqtt_writer.send(median_event)
- return True
-
-
-
- sensor = RandomSensor('sensor-2', stop_after_events=12)
- transducer = PeriodicMedianTransducer(5)
- event_loop = asyncio.get_event_loop()
- writer = MqttWriter(URL, sensor.sensor_id, event_loop)
- def loop():
- coro = sample_and_process(sensor, writer, transducer)
- task = event_loop.create_task(coro)
- def done_callback(f):
- exc = f.exception()
- if exc:
- raise exc
- elif f.result()==False:
- print("all done, no more callbacks to schedule")
- event_loop.stop()
- else:
- event_loop.call_later(0.5, loop)
- task.add_done_callback(done_callback)
- event_loop.call_soon(loop)
- event_loop.run_forever()
-
- print("that's all folks")
|