dist_lux_server.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. """Demo of lux sensor and led from raspberry pi
  2. Distributed version - server side: read from an mqtt message queue
  3. and save the datainto a postgres database. We assume the database has
  4. a schema "iot".
  5. Here is the sql to setup the database table (via psql):
  6. drop table if exists events;
  7. drop sequence if exists events_seq;
  8. create sequence events_seq;
  9. create table events (id bigint NOT NULL DEFAULT nextval('events_seq'), ts timestamp NOT NULL, sensor_id integer NOT NULL, val double precision NOT NULL);
  10. """
  11. import sys
  12. import asyncio
  13. import getpass
  14. from thingflow.base import Scheduler, SensorEvent
  15. from thingflow.adapters.mqtt import MQTTReader
  16. from thingflow.adapters.postgres import PostgresWriter, SensorEventMapping
  17. import thingflow.filters.select
  18. import thingflow.filters.json
  19. connect_string="dbname=iot user=%s" % getpass.getuser()
  20. mapping = SensorEventMapping('events')
  21. def setup(host):
  22. mqtt = MQTTReader(host, topics=[('bogus/bogus', 2)])
  23. decoded = mqtt.select(lambda m:(m.payload).decode("utf-8")) \
  24. .from_json(constructor=SensorEvent)
  25. scheduler = Scheduler(asyncio.get_event_loop())
  26. decoded.connect(PostgresWriter(scheduler, connect_string, mapping))
  27. decoded.output()
  28. mqtt.print_downstream()
  29. return mqtt, scheduler
  30. def main(host):
  31. mqtt, scheduler = setup(host)
  32. stop = scheduler.schedule_on_private_event_loop(mqtt)
  33. print("starting run...")
  34. try:
  35. scheduler.run_forever()
  36. except KeyboardInterrupt:
  37. stop()
  38. return 0
  39. if __name__ == '__main__':
  40. if len(sys.argv)!=2:
  41. print("%s BROKER" % sys.argv[0])
  42. sys.exit(1)
  43. host=sys.argv[1]
  44. main(host)