| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- """This version uses a traditional event-driven version,
- using continuation passing style. Each method call is passed
- a completion callback and an error callback
- """
- from statistics import median
- import json
- import asyncio
- import random
- import time
- import hbmqtt.client
- from collections import deque
- from thingflow.base import SensorEvent
- URL = "mqtt://localhost:1883"
- class RandomSensor:
- def __init__(self, sensor_id, mean=100.0, stddev=20.0, stop_after_events=None):
- self.sensor_id = sensor_id
- self.mean = mean
- self.stddev = stddev
- self.stop_after_events = stop_after_events
- if stop_after_events is not None:
- def generator():
- for i in range(stop_after_events):
- yield round(random.gauss(mean, stddev), 1)
- else: # go on forever
- def generator():
- while True:
- yield round(random.gauss(mean, stddev), 1)
- self.generator = generator()
- def sample(self):
- return self.generator.__next__()
- def __repr__(self):
- if self.stop_after_events is None:
- return 'RandomSensor(%s, mean=%s, stddev=%s)' % \
- (self.sensor_id, self.mean, self.stddev)
- else:
- return 'RandomSensor(%s, mean=%s, stddev=%s, stop_after_events=%s)' % \
- (self.sensor_id, self.mean, self.stddev, self.stop_after_events)
- class PeriodicMedianTransducer:
- """Emit an event once every ``period`` input events.
- The value is the median of the inputs received since the last
- emission.
- """
- def __init__(self, period=5):
- self.period = period
- self.samples = [None for i in range(period)]
- self.events_since_last = 0
- self.last_event = None # this is used in emitting the last event
-
- def step(self, v):
- self.samples[self.events_since_last] = v.val
- self.events_since_last += 1
- if self.events_since_last==self.period:
- val = median(self.samples)
- event = SensorEvent(sensor_id=v.sensor_id, ts=v.ts, val=val)
- self.events_since_last = 0
- return event
- else:
- self.last_event = v # save in case we complete before completing a period
- return None
- def complete(self):
- if self.events_since_last>0:
- # if we have some partial state, we emit one final event that
- # averages whatever we saw since the last emission.
- return SensorEvent(sensor_id=self.last_event.sensor_id,
- ts=self.last_event.ts,
- val=median(self.samples[0:self.events_since_last]))
- def csv_writer(evt):
- print("csv_writer(%s)" % repr(evt))
- class MqttWriter:
- """All the processing is asynchronous. We ensure that a given send has
- completed and the callbacks called before we process the next one.
- """
- def __init__(self, url, topic, event_loop):
- self.url = url
- self.topic = topic
- self.client = hbmqtt.client.MQTTClient(loop=event_loop)
- self.event_loop = event_loop
- self.connected = False
- self.pending_task = None
- self.request_queue = deque()
- def _to_message(self, msg):
- return bytes(json.dumps((msg.sensor_id, msg.ts, msg.val),), encoding='utf-8')
- def _request_done(self, f, completion_cb, error_cb):
- assert f==self.pending_task
- self.pending_task = None
- exc = f.exception()
- if exc:
- self.event_loop.call_soon(error_cb, exc)
- else:
- self.event_loop.call_soon(completion_cb)
- if len(self.request_queue)>0:
- self.event_loop.call_soon(self._process_queue)
-
- def _process_queue(self):
- assert self.pending_task == None
- assert len(self.request_queue)>0
- (msg, completion_cb, error_cb) = self.request_queue.popleft()
- if msg is not None:
- print("send from queue: %s" % msg)
- self.pending_task = self.event_loop.create_task(
- self.client.publish(self.topic, msg)
- )
- else: # None means that we wanted a disconnect
- print("disconnect")
- self.pending_task = self.event_loop.create_task(
- self.client.disconnect()
- )
- self.pending_task.add_done_callback(lambda f:
- self._request_done(f, completion_cb,
- error_cb))
- def send(self, msg, completion_cb, error_cb):
- if not self.connected:
- print("attempting connection")
- self.request_queue.append((self._to_message(msg),
- completion_cb, error_cb),)
- self.connected = True
- self.pending_task = self.event_loop.create_task(self.client.connect(self.url))
- def connect_done(f):
- assert f==self.pending_task
- print("connected")
- self.pending_task = None
- self.event_loop.call_soon(self._process_queue)
- self.pending_task.add_done_callback(connect_done)
- elif self.pending_task:
- self.request_queue.append((self._to_message(msg), completion_cb,
- error_cb),)
- else:
- print("sending %s" % self._to_message(msg))
- self.pending_task = self.event_loop.create_task(
- self.client.publish(self.topic, self._to_message(msg))
- )
- self.pending_task.add_done_callback(lambda f:
- self._request_done(f, completion_cb,
- error_cb))
- def disconnect(self, completion_cb, error_cb, drop_queue=False):
- if not self.connected:
- return
- if len(self.request_queue)>0 and drop_queue: # for error situations
- self.request_queue = deque()
- if self.pending_task:
- self.request_queue.append((None, completion_cb, error_cb),)
- else:
- print("disconnecting")
- self.pending_task = self.event_loop.create_task(
- self.client.disconnect()
- )
- self.pending_task.add_done_callback(lambda f:
- self._request_done(f, completion_cb,
- error_cb))
- def sample_and_process(sensor, mqtt_writer, xducer, completion_cb, error_cb):
- try:
- sample = sensor.sample()
- except StopIteration:
- final_event = xducer.complete()
- if final_event:
- mqtt_writer.send(final_event,
- lambda: mqtt_writer.disconnect(lambda: completion_cb(False), error_cb),
- error_cb)
- else:
- mqtt_writer.disconnect(lambda: completion_cb(False), error_cb)
- return
- event = SensorEvent(sensor_id=sensor.sensor_id, ts=time.time(), val=sample)
- csv_writer(event)
- median_event = xducer.step(event)
- if median_event:
- mqtt_writer.send(median_event,
- lambda: completion_cb(True), error_cb)
- else:
- completion_cb(True)
-
- sensor = RandomSensor('sensor-2', stop_after_events=12)
- transducer = PeriodicMedianTransducer(5)
- event_loop = asyncio.get_event_loop()
- writer = MqttWriter(URL, sensor.sensor_id, event_loop)
- def loop():
- def completion_cb(more):
- if more:
- event_loop.call_later(0.5, loop)
- else:
- print("all done, no more callbacks to schedule")
- event_loop.stop()
- def error_cb(e):
- print("Got error: %s" % e)
- event_loop.stop()
- event_loop.call_soon(
- lambda: sample_and_process(sensor, writer, transducer,
- completion_cb, error_cb)
- )
- event_loop.call_soon(loop)
- event_loop.run_forever()
-
- print("that's all folks")
|