test_pandas.py 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. """
  2. Test the adapter to pandas (data analysis library)
  3. """
  4. import asyncio
  5. import unittest
  6. try:
  7. import pandas
  8. PANDAS_AVAILABLE=True
  9. except:
  10. PANDAS_AVAILABLE = False
  11. from utils import ValueListSensor
  12. from thingflow.base import Scheduler, SensorAsOutputThing
  13. value_stream = [
  14. 20,
  15. 30,
  16. 100,
  17. 120,
  18. 20,
  19. 5,
  20. 2222
  21. ]
  22. @unittest.skipUnless(PANDAS_AVAILABLE, "pandas library not installed")
  23. class TestPandas(unittest.TestCase):
  24. def test_pandas(self):
  25. s = ValueListSensor(1, value_stream)
  26. p = SensorAsOutputThing(s)
  27. import thingflow.adapters.pandas
  28. import numpy
  29. w =thingflow.adapters.pandas.PandasSeriesWriter()
  30. p.connect(w)
  31. sch = Scheduler(asyncio.get_event_loop())
  32. sch.schedule_recurring(p)
  33. sch.run_forever()
  34. self.assertTrue(w.result is not None, "Result of pandas never set")
  35. # now we verify each element
  36. for (i, v) in enumerate(value_stream):
  37. pv = w.result[i]
  38. self.assertTrue(isinstance(pv, numpy.int64),
  39. "Expecting pandas value '%s' to be numpy.int64, but instead was %s" %
  40. (pv, repr(type(pv))))
  41. self.assertEqual(v, pv,
  42. "Pandas value '%s' not equal to original value '%s'" %
  43. (repr(pv), repr(v)))
  44. print("Validate pandas array")
  45. if __name__ == '__main__':
  46. unittest.main()