utimeq1.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. # Test for utimeq module which implements task queue with support for
  2. # wraparound time (utime.ticks_ms() style).
  3. try:
  4. from utime import ticks_add, ticks_diff
  5. from utimeq import utimeq
  6. except ImportError:
  7. print("SKIP")
  8. raise SystemExit
  9. DEBUG = 0
  10. MAX = ticks_add(0, -1)
  11. MODULO_HALF = MAX // 2 + 1
  12. if DEBUG:
  13. def dprint(*v):
  14. print(*v)
  15. else:
  16. def dprint(*v):
  17. pass
  18. # Try not to crash on invalid data
  19. h = utimeq(10)
  20. try:
  21. h.push(1)
  22. assert False
  23. except TypeError:
  24. pass
  25. try:
  26. h.pop(1)
  27. assert False
  28. except IndexError:
  29. pass
  30. # unsupported unary op
  31. try:
  32. ~h
  33. assert False
  34. except TypeError:
  35. pass
  36. # pushing on full queue
  37. h = utimeq(1)
  38. h.push(1, 0, 0)
  39. try:
  40. h.push(2, 0, 0)
  41. assert False
  42. except IndexError:
  43. pass
  44. # popping into invalid type
  45. try:
  46. h.pop([])
  47. assert False
  48. except TypeError:
  49. pass
  50. # length
  51. assert len(h) == 1
  52. # peektime
  53. assert h.peektime() == 1
  54. # peektime with empty queue
  55. try:
  56. utimeq(1).peektime()
  57. assert False
  58. except IndexError:
  59. pass
  60. def pop_all(h):
  61. l = []
  62. while h:
  63. item = [0, 0, 0]
  64. h.pop(item)
  65. #print("!", item)
  66. l.append(tuple(item))
  67. dprint(l)
  68. return l
  69. def add(h, v):
  70. h.push(v, 0, 0)
  71. dprint("-----")
  72. #h.dump()
  73. dprint("-----")
  74. h = utimeq(10)
  75. add(h, 0)
  76. add(h, MAX)
  77. add(h, MAX - 1)
  78. add(h, 101)
  79. add(h, 100)
  80. add(h, MAX - 2)
  81. dprint(h)
  82. l = pop_all(h)
  83. for i in range(len(l) - 1):
  84. diff = ticks_diff(l[i + 1][0], l[i][0])
  85. assert diff > 0
  86. def edge_case(edge, offset):
  87. h = utimeq(10)
  88. add(h, ticks_add(0, offset))
  89. add(h, ticks_add(edge, offset))
  90. dprint(h)
  91. l = pop_all(h)
  92. diff = ticks_diff(l[1][0], l[0][0])
  93. dprint(diff, diff > 0)
  94. return diff
  95. dprint("===")
  96. diff = edge_case(MODULO_HALF - 1, 0)
  97. assert diff == MODULO_HALF - 1
  98. assert edge_case(MODULO_HALF - 1, 100) == diff
  99. assert edge_case(MODULO_HALF - 1, -100) == diff
  100. # We expect diff to be always positive, per the definition of heappop() which should return
  101. # the smallest value.
  102. # This is the edge case where this invariant breaks, due to assymetry of two's-complement
  103. # range - there's one more negative integer than positive, so heappushing values like below
  104. # will then make ticks_diff() return the minimum negative value. We could make heappop
  105. # return them in a different order, but ticks_diff() result would be the same. Conclusion:
  106. # never add to a heap values where (a - b) == MODULO_HALF (and which are >= MODULO_HALF
  107. # ticks apart in real time of course).
  108. dprint("===")
  109. diff = edge_case(MODULO_HALF, 0)
  110. assert diff == -MODULO_HALF
  111. assert edge_case(MODULO_HALF, 100) == diff
  112. assert edge_case(MODULO_HALF, -100) == diff
  113. dprint("===")
  114. diff = edge_case(MODULO_HALF + 1, 0)
  115. assert diff == MODULO_HALF - 1
  116. assert edge_case(MODULO_HALF + 1, 100) == diff
  117. assert edge_case(MODULO_HALF + 1, -100) == diff
  118. print("OK")