| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- # Copyright 2016 by MPI-SWS and Data-Ken Research.
- # Licensed under the Apache 2.0 License.
- """These tests are designed to be run on a desktop. You can use
- them to validate the system before deploying to 8266. They use stub
- sensors.
- Test the scheduler's internal api.
- """
- import sys
- import os
- import os.path
- try:
- from thingflow import *
- except ImportError:
- sys.path.append(os.path.abspath('../'))
- from thingflow import *
- import unittest
- class TestSensor(object):
- def __init__(self, initial_val):
- self.val = initial_val
- def get(self):
- return self.val
-
- class TestScheduler(unittest.TestCase):
- def test_simple(self):
- sched = Scheduler()
- sched._add_task(1, 30)
- sched._add_task(2, 60)
- sched._add_task(3, 24)
- samples = sched._get_tasks()
- self.assertEqual(samples, [3, 1, 2])
- sleep = sched._get_next_sleep_interval()
- self.assertEqual(sleep, 24)
- sched._advance_time(24)
- samples = sched._get_tasks()
- self.assertEqual(samples, [3])
- sleep = sched._get_next_sleep_interval()
- self.assertEqual(sleep, 6)
- sched._advance_time(6)
- samples = sched._get_tasks()
- self.assertEqual(samples, [1])
- sleep = sched._get_next_sleep_interval()
- self.assertEqual(sleep, 18)
- sched._advance_time(18)
- samples = sched._get_tasks()
- self.assertEqual(samples, [3])
- sleep = sched._get_next_sleep_interval()
- self.assertEqual(sleep, 12)
- sched._advance_time(12)
- samples = sched._get_tasks()
- self.assertEqual(samples, [1, 2])
- def test_nonzero_sample_time(self):
- sched = Scheduler()
- sched._add_task(1, 30)
- sched._add_task(2, 60)
- samples = sched._get_tasks()
- self.assertEqual(samples, [1, 2])
- sched._advance_time(4)
- sleep = sched._get_next_sleep_interval()
- self.assertEqual(sleep, 26)
- sched._advance_time(28)
- samples = sched._get_tasks()
- self.assertEqual(samples, [1])
- sched._advance_time(1)
- sleep = sched._get_next_sleep_interval()
- self.assertEqual(sleep, 27)
- sched._advance_time(27)
- samples = sched._get_tasks()
- self.assertEqual(samples, [1, 2])
- def test_clock_wrap(self):
- sched = Scheduler(clock_wrap=64)
- sched._add_task(1, 8)
- sched._add_task(2, 16)
- for i in range(20):
- samples = sched._get_tasks()
- if (i%2)==0:
- self.assertEqual(samples, [1, 2])
- else:
- self.assertEqual(samples, [1])
- sleep = sched._get_next_sleep_interval()
- self.assertEqual(sleep, 8,
- "sleep was %d instead of 8 for iteration %d" % (sleep, i))
- sched._advance_time(8)
-
- def test_overdue_clock_wrap(self):
- sched = Scheduler(clock_wrap=32)
- sched._add_task(1, 8)
- for i in range(10):
- samples = sched._get_tasks()
- self.assertEqual(samples,[1])
- sleep = sched._get_next_sleep_interval()
- if i==0:
- self.assertEqual(sleep, 8)
- else:
- self.assertEqual(sleep, 6)
- sched._advance_time(sleep+2) # go 2 seconds overdue
-
- def test__add_task_after_start(self):
- sched = Scheduler()
- sched._add_task(1, 4)
- sched._add_task(2, 8)
- for i in range(4):
- samples = sched._get_tasks()
- if (i%2)==0:
- self.assertEqual(samples, [1, 2])
- else:
- self.assertEqual(samples, [1])
- sleep = sched._get_next_sleep_interval()
- sched._advance_time(sleep)
- sched._add_task(3, 4)
- for i in range(4):
- samples = sched._get_tasks()
- if (i%2)==0:
- # order is based on creation order of intervals
- self.assertEqual(samples, [1, 3, 2])
- else:
- self.assertEqual(samples, [1, 3])
- sleep = sched._get_next_sleep_interval()
- sched._advance_time(sleep)
- def test__remove_task(self):
- sched = Scheduler()
- sched._add_task(1, 4)
- sched._add_task(2, 8)
- for i in range(4):
- samples = sched._get_tasks()
- if (i%2)==0:
- self.assertEqual(samples, [1, 2])
- else:
- self.assertEqual(samples, [1])
- sleep = sched._get_next_sleep_interval()
- sched._advance_time(sleep)
- sched._remove_task(1)
- for i in range(4):
- samples = sched._get_tasks()
- self.assertEqual(samples, [2])
- sleep = sched._get_next_sleep_interval()
- sched._advance_time(sleep)
- def test_noninteger_sensor(self):
- """So far, we've tested where we are passing interger sensor ids. We
- can also pass objects
- """
- s1 = TestSensor(1)
- s2 = TestSensor(2)
- sched = Scheduler()
- sched._add_task(s1, 4)
- sched._add_task(s2, 8)
- for i in range(4):
- samples = sched._get_tasks()
- if (i%2)==0:
- self.assertEqual(samples, [s1, s2])
- else:
- self.assertEqual(samples, [s1])
- sleep = sched._get_next_sleep_interval()
- sched._advance_time(sleep)
- sched._remove_task(s1)
- for i in range(4):
- samples = sched._get_tasks()
- self.assertEqual(samples, [s2])
- sleep = sched._get_next_sleep_interval()
- sched._advance_time(sleep)
-
-
- if __name__ == '__main__':
- unittest.main()
|