| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152 |
- """Demo of lux sensor and led from raspberry pi
- Distributed version - server side: read from an mqtt message queue
- and save the datainto a postgres database. We assume the database has
- a schema "iot".
- Here is the sql to setup the database table (via psql):
- drop table if exists events;
- drop sequence if exists events_seq;
- create sequence events_seq;
- create table events (id bigint NOT NULL DEFAULT nextval('events_seq'), ts timestamp NOT NULL, sensor_id integer NOT NULL, val double precision NOT NULL);
- """
- import sys
- import asyncio
- import getpass
- from thingflow.base import Scheduler, SensorEvent
- from thingflow.adapters.mqtt import MQTTReader
- from thingflow.adapters.postgres import PostgresWriter, SensorEventMapping
- import thingflow.filters.select
- import thingflow.filters.json
- connect_string="dbname=iot user=%s" % getpass.getuser()
- mapping = SensorEventMapping('events')
- def setup(host):
- mqtt = MQTTReader(host, topics=[('bogus/bogus', 2)])
- decoded = mqtt.select(lambda m:(m.payload).decode("utf-8")) \
- .from_json(constructor=SensorEvent)
- scheduler = Scheduler(asyncio.get_event_loop())
- decoded.connect(PostgresWriter(scheduler, connect_string, mapping))
- decoded.output()
- mqtt.print_downstream()
- return mqtt, scheduler
-
- def main(host):
- mqtt, scheduler = setup(host)
- stop = scheduler.schedule_on_private_event_loop(mqtt)
- print("starting run...")
- try:
- scheduler.run_forever()
- except KeyboardInterrupt:
- stop()
- return 0
- if __name__ == '__main__':
- if len(sys.argv)!=2:
- print("%s BROKER" % sys.argv[0])
- sys.exit(1)
- host=sys.argv[1]
- main(host)
|