async_with2.py 854 B

12345678910111213141516171819202122232425262728293031323334353637
  1. # test waiting within async with enter/exit functions
  2. import sys
  3. if sys.implementation.name == 'micropython':
  4. # uPy allows normal generators to be awaitables
  5. coroutine = lambda f: f
  6. else:
  7. import types
  8. coroutine = types.coroutine
  9. @coroutine
  10. def f(x):
  11. print('f start:', x)
  12. yield x + 1
  13. yield x + 2
  14. return x + 3
  15. class AContext:
  16. async def __aenter__(self):
  17. print('enter')
  18. print('f returned:', await f(10))
  19. async def __aexit__(self, exc_type, exc, tb):
  20. print('exit', exc_type, exc)
  21. print('f returned:', await f(20))
  22. async def coro():
  23. async with AContext():
  24. print('body start')
  25. print('body f returned:', await f(30))
  26. print('body end')
  27. o = coro()
  28. try:
  29. while True:
  30. print('coro yielded:', o.send(None))
  31. except StopIteration:
  32. print('finished')