ex1.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. import time
  2. import machine
  3. import onewire, ds18x20
  4. import json
  5. import ubinascii
  6. import utils
  7. # the device is on GPIO12
  8. class TempMon:
  9. def __init__(self, pin):
  10. dat = machine.Pin(pin)
  11. # create the onewire object
  12. self.ds = ds18x20.DS18X20(onewire.OneWire(dat))
  13. # scan for devices on the bus
  14. self.roms = self.ds.scan()
  15. print('found devices:', self.roms)
  16. # loop 10 times and print all temperatures
  17. def read_temp(self):
  18. try:
  19. print('temperatures:', end=' ')
  20. self.ds.convert_temp()
  21. vals=[]
  22. time.sleep_ms(750)
  23. for rom in self.roms:
  24. vals.append(dict(id=ubinascii.hexlify(rom).decode('utf-8'), temp=self.ds.read_temp(rom)))
  25. #print('%s/%s' % (rom, self.ds.read_temp(rom), end=' '))
  26. print(json.dumps(vals))
  27. return vals
  28. except Exception:
  29. print("error encountered...continuing")
  30. class DweetTemp:
  31. def __init__(self, thing, pin):
  32. self.thing = thing
  33. self.t = TempMon(pin)
  34. def dweet(self, readings):
  35. self.url = 'http://dweet.io/dweet/for/%s?' % self.thing + '&'.join('%s=%s' % (e['id'], e['temp']) for e in readings)
  36. print(self.url)
  37. utils.http_get(self.url)
  38. def post_dweet(self, loop=1, interval=5):
  39. for i in range(loop):
  40. self.dweet(self.t.read_temp())
  41. time.sleep(interval)