| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- # Test for utimeq module which implements task queue with support for
- # wraparound time (utime.ticks_ms() style).
- try:
- from utime import ticks_add, ticks_diff
- from utimeq import utimeq
- except ImportError:
- print("SKIP")
- raise SystemExit
- DEBUG = 0
- MAX = ticks_add(0, -1)
- MODULO_HALF = MAX // 2 + 1
- if DEBUG:
- def dprint(*v):
- print(*v)
- else:
- def dprint(*v):
- pass
- # Try not to crash on invalid data
- h = utimeq(10)
- try:
- h.push(1)
- assert False
- except TypeError:
- pass
- try:
- h.pop(1)
- assert False
- except IndexError:
- pass
- # unsupported unary op
- try:
- ~h
- assert False
- except TypeError:
- pass
- # pushing on full queue
- h = utimeq(1)
- h.push(1, 0, 0)
- try:
- h.push(2, 0, 0)
- assert False
- except IndexError:
- pass
- # popping into invalid type
- try:
- h.pop([])
- assert False
- except TypeError:
- pass
- # length
- assert len(h) == 1
- # peektime
- assert h.peektime() == 1
- # peektime with empty queue
- try:
- utimeq(1).peektime()
- assert False
- except IndexError:
- pass
- def pop_all(h):
- l = []
- while h:
- item = [0, 0, 0]
- h.pop(item)
- #print("!", item)
- l.append(tuple(item))
- dprint(l)
- return l
- def add(h, v):
- h.push(v, 0, 0)
- dprint("-----")
- #h.dump()
- dprint("-----")
- h = utimeq(10)
- add(h, 0)
- add(h, MAX)
- add(h, MAX - 1)
- add(h, 101)
- add(h, 100)
- add(h, MAX - 2)
- dprint(h)
- l = pop_all(h)
- for i in range(len(l) - 1):
- diff = ticks_diff(l[i + 1][0], l[i][0])
- assert diff > 0
- def edge_case(edge, offset):
- h = utimeq(10)
- add(h, ticks_add(0, offset))
- add(h, ticks_add(edge, offset))
- dprint(h)
- l = pop_all(h)
- diff = ticks_diff(l[1][0], l[0][0])
- dprint(diff, diff > 0)
- return diff
- dprint("===")
- diff = edge_case(MODULO_HALF - 1, 0)
- assert diff == MODULO_HALF - 1
- assert edge_case(MODULO_HALF - 1, 100) == diff
- assert edge_case(MODULO_HALF - 1, -100) == diff
- # We expect diff to be always positive, per the definition of heappop() which should return
- # the smallest value.
- # This is the edge case where this invariant breaks, due to assymetry of two's-complement
- # range - there's one more negative integer than positive, so heappushing values like below
- # will then make ticks_diff() return the minimum negative value. We could make heappop
- # return them in a different order, but ticks_diff() result would be the same. Conclusion:
- # never add to a heap values where (a - b) == MODULO_HALF (and which are >= MODULO_HALF
- # ticks apart in real time of course).
- dprint("===")
- diff = edge_case(MODULO_HALF, 0)
- assert diff == -MODULO_HALF
- assert edge_case(MODULO_HALF, 100) == diff
- assert edge_case(MODULO_HALF, -100) == diff
- dprint("===")
- diff = edge_case(MODULO_HALF + 1, 0)
- assert diff == MODULO_HALF - 1
- assert edge_case(MODULO_HALF + 1, 100) == diff
- assert edge_case(MODULO_HALF + 1, -100) == diff
- print("OK")
|