test_scheduler.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """These tests are designed to be run on a desktop. You can use
  4. them to validate the system before deploying to 8266. They use stub
  5. sensors.
  6. Test the scheduler's internal api.
  7. """
  8. import sys
  9. import os
  10. import os.path
  11. try:
  12. from thingflow import *
  13. except ImportError:
  14. sys.path.append(os.path.abspath('../'))
  15. from thingflow import *
  16. import unittest
  17. class TestSensor(object):
  18. def __init__(self, initial_val):
  19. self.val = initial_val
  20. def get(self):
  21. return self.val
  22. class TestScheduler(unittest.TestCase):
  23. def test_simple(self):
  24. sched = Scheduler()
  25. sched._add_task(1, 30)
  26. sched._add_task(2, 60)
  27. sched._add_task(3, 24)
  28. samples = sched._get_tasks()
  29. self.assertEqual(samples, [3, 1, 2])
  30. sleep = sched._get_next_sleep_interval()
  31. self.assertEqual(sleep, 24)
  32. sched._advance_time(24)
  33. samples = sched._get_tasks()
  34. self.assertEqual(samples, [3])
  35. sleep = sched._get_next_sleep_interval()
  36. self.assertEqual(sleep, 6)
  37. sched._advance_time(6)
  38. samples = sched._get_tasks()
  39. self.assertEqual(samples, [1])
  40. sleep = sched._get_next_sleep_interval()
  41. self.assertEqual(sleep, 18)
  42. sched._advance_time(18)
  43. samples = sched._get_tasks()
  44. self.assertEqual(samples, [3])
  45. sleep = sched._get_next_sleep_interval()
  46. self.assertEqual(sleep, 12)
  47. sched._advance_time(12)
  48. samples = sched._get_tasks()
  49. self.assertEqual(samples, [1, 2])
  50. def test_nonzero_sample_time(self):
  51. sched = Scheduler()
  52. sched._add_task(1, 30)
  53. sched._add_task(2, 60)
  54. samples = sched._get_tasks()
  55. self.assertEqual(samples, [1, 2])
  56. sched._advance_time(4)
  57. sleep = sched._get_next_sleep_interval()
  58. self.assertEqual(sleep, 26)
  59. sched._advance_time(28)
  60. samples = sched._get_tasks()
  61. self.assertEqual(samples, [1])
  62. sched._advance_time(1)
  63. sleep = sched._get_next_sleep_interval()
  64. self.assertEqual(sleep, 27)
  65. sched._advance_time(27)
  66. samples = sched._get_tasks()
  67. self.assertEqual(samples, [1, 2])
  68. def test_clock_wrap(self):
  69. sched = Scheduler(clock_wrap=64)
  70. sched._add_task(1, 8)
  71. sched._add_task(2, 16)
  72. for i in range(20):
  73. samples = sched._get_tasks()
  74. if (i%2)==0:
  75. self.assertEqual(samples, [1, 2])
  76. else:
  77. self.assertEqual(samples, [1])
  78. sleep = sched._get_next_sleep_interval()
  79. self.assertEqual(sleep, 8,
  80. "sleep was %d instead of 8 for iteration %d" % (sleep, i))
  81. sched._advance_time(8)
  82. def test_overdue_clock_wrap(self):
  83. sched = Scheduler(clock_wrap=32)
  84. sched._add_task(1, 8)
  85. for i in range(10):
  86. samples = sched._get_tasks()
  87. self.assertEqual(samples,[1])
  88. sleep = sched._get_next_sleep_interval()
  89. if i==0:
  90. self.assertEqual(sleep, 8)
  91. else:
  92. self.assertEqual(sleep, 6)
  93. sched._advance_time(sleep+2) # go 2 seconds overdue
  94. def test__add_task_after_start(self):
  95. sched = Scheduler()
  96. sched._add_task(1, 4)
  97. sched._add_task(2, 8)
  98. for i in range(4):
  99. samples = sched._get_tasks()
  100. if (i%2)==0:
  101. self.assertEqual(samples, [1, 2])
  102. else:
  103. self.assertEqual(samples, [1])
  104. sleep = sched._get_next_sleep_interval()
  105. sched._advance_time(sleep)
  106. sched._add_task(3, 4)
  107. for i in range(4):
  108. samples = sched._get_tasks()
  109. if (i%2)==0:
  110. # order is based on creation order of intervals
  111. self.assertEqual(samples, [1, 3, 2])
  112. else:
  113. self.assertEqual(samples, [1, 3])
  114. sleep = sched._get_next_sleep_interval()
  115. sched._advance_time(sleep)
  116. def test__remove_task(self):
  117. sched = Scheduler()
  118. sched._add_task(1, 4)
  119. sched._add_task(2, 8)
  120. for i in range(4):
  121. samples = sched._get_tasks()
  122. if (i%2)==0:
  123. self.assertEqual(samples, [1, 2])
  124. else:
  125. self.assertEqual(samples, [1])
  126. sleep = sched._get_next_sleep_interval()
  127. sched._advance_time(sleep)
  128. sched._remove_task(1)
  129. for i in range(4):
  130. samples = sched._get_tasks()
  131. self.assertEqual(samples, [2])
  132. sleep = sched._get_next_sleep_interval()
  133. sched._advance_time(sleep)
  134. def test_noninteger_sensor(self):
  135. """So far, we've tested where we are passing interger sensor ids. We
  136. can also pass objects
  137. """
  138. s1 = TestSensor(1)
  139. s2 = TestSensor(2)
  140. sched = Scheduler()
  141. sched._add_task(s1, 4)
  142. sched._add_task(s2, 8)
  143. for i in range(4):
  144. samples = sched._get_tasks()
  145. if (i%2)==0:
  146. self.assertEqual(samples, [s1, s2])
  147. else:
  148. self.assertEqual(samples, [s1])
  149. sleep = sched._get_next_sleep_interval()
  150. sched._advance_time(sleep)
  151. sched._remove_task(s1)
  152. for i in range(4):
  153. samples = sched._get_tasks()
  154. self.assertEqual(samples, [s2])
  155. sleep = sched._get_next_sleep_interval()
  156. sched._advance_time(sleep)
  157. if __name__ == '__main__':
  158. unittest.main()