websocket_basic.py 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. try:
  2. import uio
  3. import uerrno
  4. import websocket
  5. except ImportError:
  6. print("SKIP")
  7. raise SystemExit
  8. # put raw data in the stream and do a websocket read
  9. def ws_read(msg, sz):
  10. ws = websocket.websocket(uio.BytesIO(msg))
  11. return ws.read(sz)
  12. # do a websocket write and then return the raw data from the stream
  13. def ws_write(msg, sz):
  14. s = uio.BytesIO()
  15. ws = websocket.websocket(s)
  16. ws.write(msg)
  17. s.seek(0)
  18. return s.read(sz)
  19. # basic frame
  20. print(ws_read(b"\x81\x04ping", 4))
  21. print(ws_read(b"\x80\x04ping", 4)) # FRAME_CONT
  22. print(ws_write(b"pong", 6))
  23. # split frames are not supported
  24. # print(ws_read(b"\x01\x04ping", 4))
  25. # extended payloads
  26. print(ws_read(b'\x81~\x00\x80' + b'ping' * 32, 128))
  27. print(ws_write(b"pong" * 32, 132))
  28. # mask (returned data will be 'mask' ^ 'mask')
  29. print(ws_read(b"\x81\x84maskmask", 4))
  30. # close control frame
  31. s = uio.BytesIO(b'\x88\x00') # FRAME_CLOSE
  32. ws = websocket.websocket(s)
  33. print(ws.read(1))
  34. s.seek(2)
  35. print(s.read(4))
  36. # misc control frames
  37. print(ws_read(b"\x89\x00\x81\x04ping", 4)) # FRAME_PING
  38. print(ws_read(b"\x8a\x00\x81\x04pong", 4)) # FRAME_PONG
  39. # close method
  40. ws = websocket.websocket(uio.BytesIO())
  41. ws.close()
  42. # ioctl
  43. ws = websocket.websocket(uio.BytesIO())
  44. print(ws.ioctl(8)) # GET_DATA_OPTS
  45. print(ws.ioctl(9, 2)) # SET_DATA_OPTS
  46. print(ws.ioctl(9))
  47. try:
  48. ws.ioctl(-1)
  49. except OSError as e:
  50. print("ioctl: EINVAL:", e.args[0] == uerrno.EINVAL)