can.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. try:
  2. from pyb import CAN
  3. except ImportError:
  4. print('SKIP')
  5. raise SystemExit
  6. from array import array
  7. import micropython
  8. import pyb
  9. # test we can correctly create by id or name
  10. for bus in (-1, 0, 1, 2, 3, "YA", "YB", "YC"):
  11. try:
  12. CAN(bus, CAN.LOOPBACK)
  13. print("CAN", bus)
  14. except ValueError:
  15. print("ValueError", bus)
  16. CAN(1).deinit()
  17. CAN.initfilterbanks(14)
  18. can = CAN(1)
  19. print(can)
  20. # Test state when de-init'd
  21. print(can.state() == can.STOPPED)
  22. can.init(CAN.LOOPBACK)
  23. print(can)
  24. print(can.any(0))
  25. # Test state when freshly created
  26. print(can.state() == can.ERROR_ACTIVE)
  27. # Test that restart can be called
  28. can.restart()
  29. # Test info returns a sensible value
  30. print(can.info())
  31. # Catch all filter
  32. can.setfilter(0, CAN.MASK16, 0, (0, 0, 0, 0))
  33. can.send('abcd', 123, timeout=5000)
  34. print(can.any(0), can.info())
  35. print(can.recv(0))
  36. can.send('abcd', -1, timeout=5000)
  37. print(can.recv(0))
  38. can.send('abcd', 0x7FF + 1, timeout=5000)
  39. print(can.recv(0))
  40. # Test too long message
  41. try:
  42. can.send('abcdefghi', 0x7FF, timeout=5000)
  43. except ValueError:
  44. print('passed')
  45. else:
  46. print('failed')
  47. # Test that recv can work without allocating memory on the heap
  48. buf = bytearray(10)
  49. l = [0, 0, 0, memoryview(buf)]
  50. l2 = None
  51. micropython.heap_lock()
  52. can.send('', 42)
  53. l2 = can.recv(0, l)
  54. assert l is l2
  55. print(l, len(l[3]), buf)
  56. can.send('1234', 42)
  57. l2 = can.recv(0, l)
  58. assert l is l2
  59. print(l, len(l[3]), buf)
  60. can.send('01234567', 42)
  61. l2 = can.recv(0, l)
  62. assert l is l2
  63. print(l, len(l[3]), buf)
  64. can.send('abc', 42)
  65. l2 = can.recv(0, l)
  66. assert l is l2
  67. print(l, len(l[3]), buf)
  68. micropython.heap_unlock()
  69. # Test that recv can work with different arrays behind the memoryview
  70. can.send('abc', 1)
  71. print(bytes(can.recv(0, [0, 0, 0, memoryview(array('B', range(8)))])[3]))
  72. can.send('def', 1)
  73. print(bytes(can.recv(0, [0, 0, 0, memoryview(array('b', range(8)))])[3]))
  74. # Test for non-list passed as second arg to recv
  75. can.send('abc', 1)
  76. try:
  77. can.recv(0, 1)
  78. except TypeError:
  79. print('TypeError')
  80. # Test for too-short-list passed as second arg to recv
  81. can.send('abc', 1)
  82. try:
  83. can.recv(0, [0, 0, 0])
  84. except ValueError:
  85. print('ValueError')
  86. # Test for non-memoryview passed as 4th element to recv
  87. can.send('abc', 1)
  88. try:
  89. can.recv(0, [0, 0, 0, 0])
  90. except TypeError:
  91. print('TypeError')
  92. # Test for read-only-memoryview passed as 4th element to recv
  93. can.send('abc', 1)
  94. try:
  95. can.recv(0, [0, 0, 0, memoryview(bytes(8))])
  96. except ValueError:
  97. print('ValueError')
  98. # Test for bad-typecode-memoryview passed as 4th element to recv
  99. can.send('abc', 1)
  100. try:
  101. can.recv(0, [0, 0, 0, memoryview(array('i', range(8)))])
  102. except ValueError:
  103. print('ValueError')
  104. del can
  105. # Testing extended IDs
  106. can = CAN(1, CAN.LOOPBACK, extframe = True)
  107. # Catch all filter
  108. can.setfilter(0, CAN.MASK32, 0, (0, 0))
  109. print(can)
  110. try:
  111. can.send('abcde', 0x7FF + 1, timeout=5000)
  112. except ValueError:
  113. print('failed')
  114. else:
  115. r = can.recv(0)
  116. if r[0] == 0x7FF+1 and r[3] == b'abcde':
  117. print('passed')
  118. else:
  119. print('failed, wrong data received')
  120. # Test filters
  121. for n in [0, 8, 16, 24]:
  122. filter_id = 0b00001000 << n
  123. filter_mask = 0b00011100 << n
  124. id_ok = 0b00001010 << n
  125. id_fail = 0b00011010 << n
  126. can.clearfilter(0)
  127. can.setfilter(0, pyb.CAN.MASK32, 0, (filter_id, filter_mask))
  128. can.send('ok', id_ok, timeout=3)
  129. if can.any(0):
  130. msg = can.recv(0)
  131. print((hex(filter_id), hex(filter_mask), hex(msg[0]), msg[3]))
  132. can.send("fail", id_fail, timeout=3)
  133. if can.any(0):
  134. msg = can.recv(0)
  135. print((hex(filter_id), hex(filter_mask), hex(msg[0]), msg[3]))
  136. del can
  137. # Test RxCallbacks
  138. can = CAN(1, CAN.LOOPBACK)
  139. can.setfilter(0, CAN.LIST16, 0, (1, 2, 3, 4))
  140. can.setfilter(1, CAN.LIST16, 1, (5, 6, 7, 8))
  141. def cb0(bus, reason):
  142. print('cb0')
  143. if reason == 0:
  144. print('pending')
  145. if reason == 1:
  146. print('full')
  147. if reason == 2:
  148. print('overflow')
  149. def cb1(bus, reason):
  150. print('cb1')
  151. if reason == 0:
  152. print('pending')
  153. if reason == 1:
  154. print('full')
  155. if reason == 2:
  156. print('overflow')
  157. def cb0a(bus, reason):
  158. print('cb0a')
  159. if reason == 0:
  160. print('pending')
  161. if reason == 1:
  162. print('full')
  163. if reason == 2:
  164. print('overflow')
  165. def cb1a(bus, reason):
  166. print('cb1a')
  167. if reason == 0:
  168. print('pending')
  169. if reason == 1:
  170. print('full')
  171. if reason == 2:
  172. print('overflow')
  173. can.rxcallback(0, cb0)
  174. can.rxcallback(1, cb1)
  175. can.send('11111111',1, timeout=5000)
  176. can.send('22222222',2, timeout=5000)
  177. can.send('33333333',3, timeout=5000)
  178. can.rxcallback(0, cb0a)
  179. can.send('44444444',4, timeout=5000)
  180. can.send('55555555',5, timeout=5000)
  181. can.send('66666666',6, timeout=5000)
  182. can.send('77777777',7, timeout=5000)
  183. can.rxcallback(1, cb1a)
  184. can.send('88888888',8, timeout=5000)
  185. print(can.recv(0))
  186. print(can.recv(0))
  187. print(can.recv(0))
  188. print(can.recv(1))
  189. print(can.recv(1))
  190. print(can.recv(1))
  191. can.send('11111111',1, timeout=5000)
  192. can.send('55555555',5, timeout=5000)
  193. print(can.recv(0))
  194. print(can.recv(1))
  195. del can
  196. # Testing asynchronous send
  197. can = CAN(1, CAN.LOOPBACK)
  198. can.setfilter(0, CAN.MASK16, 0, (0, 0, 0, 0))
  199. while can.any(0):
  200. can.recv(0)
  201. can.send('abcde', 1, timeout=0)
  202. print(can.any(0))
  203. while not can.any(0):
  204. pass
  205. print(can.recv(0))
  206. try:
  207. can.send('abcde', 2, timeout=0)
  208. can.send('abcde', 3, timeout=0)
  209. can.send('abcde', 4, timeout=0)
  210. can.send('abcde', 5, timeout=0)
  211. except OSError as e:
  212. if str(e) == '16':
  213. print('passed')
  214. else:
  215. print('failed')
  216. pyb.delay(500)
  217. while can.any(0):
  218. print(can.recv(0))
  219. # Testing rtr messages
  220. bus1 = CAN(1, CAN.LOOPBACK)
  221. bus2 = CAN(2, CAN.LOOPBACK, extframe = True)
  222. while bus1.any(0):
  223. bus1.recv(0)
  224. while bus2.any(0):
  225. bus2.recv(0)
  226. bus1.setfilter(0, CAN.LIST16, 0, (1, 2, 3, 4))
  227. bus1.setfilter(1, CAN.LIST16, 0, (5, 6, 7, 8), rtr=(True, True, True, True))
  228. bus1.setfilter(2, CAN.MASK16, 0, (64, 64, 32, 32), rtr=(False, True))
  229. bus2.setfilter(0, CAN.LIST32, 0, (1, 2), rtr=(True, True))
  230. bus2.setfilter(1, CAN.LIST32, 0, (3, 4), rtr=(True, False))
  231. bus2.setfilter(2, CAN.MASK32, 0, (16, 16), rtr=(False,))
  232. bus2.setfilter(2, CAN.MASK32, 0, (32, 32), rtr=(True,))
  233. bus1.send('',1,rtr=True)
  234. print(bus1.any(0))
  235. bus1.send('',5,rtr=True)
  236. print(bus1.recv(0))
  237. bus1.send('',6,rtr=True)
  238. print(bus1.recv(0))
  239. bus1.send('',7,rtr=True)
  240. print(bus1.recv(0))
  241. bus1.send('',16,rtr=True)
  242. print(bus1.any(0))
  243. bus1.send('',32,rtr=True)
  244. print(bus1.recv(0))
  245. bus2.send('',1,rtr=True)
  246. print(bus2.recv(0))
  247. bus2.send('',2,rtr=True)
  248. print(bus2.recv(0))
  249. bus2.send('',3,rtr=True)
  250. print(bus2.recv(0))
  251. bus2.send('',4,rtr=True)
  252. print(bus2.any(0))