mboot.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. # Driver for Mboot, the MicroPython boot loader
  2. # MIT license; Copyright (c) 2018 Damien P. George
  3. import struct, time, os, hashlib
  4. I2C_CMD_ECHO = 1
  5. I2C_CMD_GETID = 2
  6. I2C_CMD_GETCAPS = 3
  7. I2C_CMD_RESET = 4
  8. I2C_CMD_CONFIG = 5
  9. I2C_CMD_GETLAYOUT = 6
  10. I2C_CMD_MASSERASE = 7
  11. I2C_CMD_PAGEERASE = 8
  12. I2C_CMD_SETRDADDR = 9
  13. I2C_CMD_SETWRADDR = 10
  14. I2C_CMD_READ = 11
  15. I2C_CMD_WRITE = 12
  16. I2C_CMD_COPY = 13
  17. I2C_CMD_CALCHASH = 14
  18. I2C_CMD_MARKVALID = 15
  19. class Bootloader:
  20. def __init__(self, i2c, addr):
  21. self.i2c = i2c
  22. self.addr = addr
  23. self.buf1 = bytearray(1)
  24. try:
  25. self.i2c.writeto(addr, b'')
  26. except OSError:
  27. raise Exception('no I2C mboot device found')
  28. def wait_response(self):
  29. start = time.ticks_ms()
  30. while 1:
  31. try:
  32. self.i2c.readfrom_into(self.addr, self.buf1)
  33. n = self.buf1[0]
  34. break
  35. except OSError as er:
  36. time.sleep_us(500)
  37. if time.ticks_diff(time.ticks_ms(), start) > 5000:
  38. raise Exception('timeout')
  39. if n >= 129:
  40. raise Exception(n)
  41. if n == 0:
  42. return b''
  43. else:
  44. return self.i2c.readfrom(self.addr, n)
  45. def wait_empty_response(self):
  46. ret = self.wait_response()
  47. if ret:
  48. raise Exception('expected empty response got %r' % ret)
  49. else:
  50. return None
  51. def echo(self, data):
  52. self.i2c.writeto(self.addr, struct.pack('<B', I2C_CMD_ECHO) + data)
  53. return self.wait_response()
  54. def getid(self):
  55. self.i2c.writeto(self.addr, struct.pack('<B', I2C_CMD_GETID))
  56. ret = self.wait_response()
  57. unique_id = ret[:12]
  58. mcu_name, board_name = ret[12:].split(b'\x00')
  59. return unique_id, str(mcu_name, 'ascii'), str(board_name, 'ascii')
  60. def reset(self):
  61. self.i2c.writeto(self.addr, struct.pack('<B', I2C_CMD_RESET))
  62. # we don't expect any response
  63. def getlayout(self):
  64. self.i2c.writeto(self.addr, struct.pack('<B', I2C_CMD_GETLAYOUT))
  65. layout = self.wait_response()
  66. id, flash_addr, layout = layout.split(b'/')
  67. assert id == b'@Internal Flash '
  68. flash_addr = int(flash_addr, 16)
  69. pages = []
  70. for chunk in layout.split(b','):
  71. n, sz = chunk.split(b'*')
  72. n = int(n)
  73. assert sz.endswith(b'Kg')
  74. sz = int(sz[:-2]) * 1024
  75. for i in range(n):
  76. pages.append((flash_addr, sz))
  77. flash_addr += sz
  78. return pages
  79. def pageerase(self, addr):
  80. self.i2c.writeto(self.addr, struct.pack('<BI', I2C_CMD_PAGEERASE, addr))
  81. self.wait_empty_response()
  82. def setrdaddr(self, addr):
  83. self.i2c.writeto(self.addr, struct.pack('<BI', I2C_CMD_SETRDADDR, addr))
  84. self.wait_empty_response()
  85. def setwraddr(self, addr):
  86. self.i2c.writeto(self.addr, struct.pack('<BI', I2C_CMD_SETWRADDR, addr))
  87. self.wait_empty_response()
  88. def read(self, n):
  89. self.i2c.writeto(self.addr, struct.pack('<BB', I2C_CMD_READ, n))
  90. return self.wait_response()
  91. def write(self, buf):
  92. self.i2c.writeto(self.addr, struct.pack('<B', I2C_CMD_WRITE) + buf)
  93. self.wait_empty_response()
  94. def calchash(self, n):
  95. self.i2c.writeto(self.addr, struct.pack('<BI', I2C_CMD_CALCHASH, n))
  96. return self.wait_response()
  97. def markvalid(self):
  98. self.i2c.writeto(self.addr, struct.pack('<B', I2C_CMD_MARKVALID))
  99. self.wait_empty_response()
  100. def deployfile(self, filename, addr):
  101. pages = self.getlayout()
  102. page_erased = [False] * len(pages)
  103. buf = bytearray(128) # maximum payload supported by I2C protocol
  104. start_addr = addr
  105. self.setwraddr(addr)
  106. fsize = os.stat(filename)[6]
  107. local_sha = hashlib.sha256()
  108. print('Deploying %s to location 0x%08x' % (filename, addr))
  109. with open(filename, 'rb') as f:
  110. t0 = time.ticks_ms()
  111. while True:
  112. n = f.readinto(buf)
  113. if n == 0:
  114. break
  115. # check if we need to erase the page
  116. for i, p in enumerate(pages):
  117. if p[0] <= addr < p[0] + p[1]:
  118. # found page
  119. if not page_erased[i]:
  120. print('\r% 3u%% erase 0x%08x' % (100 * (addr - start_addr) // fsize, addr), end='')
  121. self.pageerase(addr)
  122. page_erased[i] = True
  123. break
  124. else:
  125. raise Exception('address 0x%08x not valid' % addr)
  126. # write the data
  127. self.write(buf)
  128. # update local SHA256, with validity bits set
  129. if addr == start_addr:
  130. buf[0] |= 3
  131. if n == len(buf):
  132. local_sha.update(buf)
  133. else:
  134. local_sha.update(buf[:n])
  135. addr += n
  136. ntotal = addr - start_addr
  137. if ntotal % 2048 == 0 or ntotal == fsize:
  138. print('\r% 3u%% % 7u bytes ' % (100 * ntotal // fsize, ntotal), end='')
  139. t1 = time.ticks_ms()
  140. print()
  141. print('rate: %.2f KiB/sec' % (1024 * ntotal / (t1 - t0) / 1000))
  142. local_sha = local_sha.digest()
  143. print('Local SHA256: ', ''.join('%02x' % x for x in local_sha))
  144. self.setrdaddr(start_addr)
  145. remote_sha = self.calchash(ntotal)
  146. print('Remote SHA256:', ''.join('%02x' % x for x in remote_sha))
  147. if local_sha == remote_sha:
  148. print('Marking app firmware as valid')
  149. self.markvalid()
  150. self.reset()