things.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. """
  2. Comparing ThingFlow to generic asyncio programming.
  3. This is the ThingFlow version.
  4. """
  5. import asyncio
  6. import random
  7. from statistics import median
  8. from thingflow.base import InputThing, SensorEvent, Scheduler, SensorAsOutputThing
  9. from thingflow.filters.transducer import Transducer
  10. import thingflow.filters.combinators
  11. import thingflow.adapters.csv
  12. from thingflow.adapters.mqtt_async import mqtt_async_send
  13. import thingflow.filters.output
  14. URL = "mqtt://localhost:1883"
  15. class RandomSensor:
  16. def __init__(self, sensor_id, mean=100.0, stddev=20.0, stop_after_events=None):
  17. self.sensor_id = sensor_id
  18. self.mean = mean
  19. self.stddev = stddev
  20. self.stop_after_events = stop_after_events
  21. if stop_after_events is not None:
  22. def generator():
  23. for i in range(stop_after_events):
  24. yield round(random.gauss(mean, stddev), 1)
  25. else: # go on forever
  26. def generator():
  27. while True:
  28. yield round(random.gauss(mean, stddev), 1)
  29. self.generator = generator()
  30. def sample(self):
  31. return self.generator.__next__()
  32. def __repr__(self):
  33. if self.stop_after_events is None:
  34. return 'RandomSensor(%s, mean=%s, stddev=%s)' % \
  35. (self.sensor_id, self.mean, self.stddev)
  36. else:
  37. return 'RandomSensor(%s, mean=%s, stddev=%s, stop_after_events=%s)' % \
  38. (self.sensor_id, self.mean, self.stddev, self.stop_after_events)
  39. class PeriodicMedianTransducer(Transducer):
  40. """Emit an event once every ``period`` input events.
  41. The value is the median of the inputs received since the last
  42. emission.
  43. """
  44. def __init__(self, period=5):
  45. self.period = period
  46. self.samples = [None for i in range(period)]
  47. self.events_since_last = 0
  48. self.last_event = None # this is used in emitting the last event
  49. def step(self, v):
  50. self.samples[self.events_since_last] = v.val
  51. self.events_since_last += 1
  52. if self.events_since_last==self.period:
  53. val = median(self.samples)
  54. event = SensorEvent(sensor_id=v.sensor_id, ts=v.ts, val=val)
  55. self.events_since_last = 0
  56. return event
  57. else:
  58. self.last_event = v # save in case we complete before completing a period
  59. return None
  60. def complete(self):
  61. if self.events_since_last>0:
  62. # if we have some partial state, we emit one final event that
  63. # averages whatever we saw since the last emission.
  64. return SensorEvent(sensor_id=self.last_event.sensor_id,
  65. ts=self.last_event.ts,
  66. val=median(self.samples[0:self.events_since_last]))
  67. SENSOR_ID = 'sensor-1'
  68. scheduler = Scheduler(asyncio.get_event_loop())
  69. sensor = SensorAsOutputThing(RandomSensor(SENSOR_ID, mean=10, stddev=5, stop_after_events=12))
  70. sensor.csv_writer('raw_data.csv').connect(lambda x: print("raw data: %s" % repr(x)))
  71. sensor.transduce(PeriodicMedianTransducer()).mqtt_async_send(URL, SENSOR_ID, scheduler).output()
  72. scheduler.schedule_periodic(sensor, 0.5)
  73. scheduler.run_forever()
  74. print("that's all folks")