base.py 51 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296
  1. # Copyright 2016,2017 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """
  4. Base functionality for ThingFlow. All the core abstractions
  5. are defined here. Everything else is just subclassing or using
  6. these abstractions.
  7. The key abstractions are:
  8. * Thing - a unit of computation in the data flow graph. Things can be
  9. Filters (with inputs and outputs) or Adapters (with only inputs
  10. or only outputs).
  11. * OutputThing - Base class and interface for things that emit event streams on
  12. output ports.
  13. * Sensor - an object that is (indirectly) connected to the physical world.
  14. It can provide its current value through a sample() method.
  15. Sensors can be turned into Things by wrapping them with
  16. the SensorAsInputThing class.
  17. * InputThing - interface for things that receive a stream of events on one or
  18. more input ports.
  19. * Filter - a thing that is both an InputThing and an OutputThing, with one
  20. input and one output. Filters transform data streams.
  21. * Scheduler - The scheduler wraps an event loop. It provides periodic and
  22. one-time scheduling of OutputThings that originate events.
  23. * event - ThingFlow largely does not care about the particulars of the
  24. events it processes. However, we define a generic SensorEvent
  25. datatype that can be used when the details of the event matter
  26. to a thing.
  27. See the README.rst file for more details.
  28. """
  29. from collections import namedtuple
  30. import threading
  31. import time
  32. import queue
  33. import traceback as tb
  34. import logging
  35. logger = logging.getLogger(__name__)
  36. from thingflow.internal import noop
  37. class InputThing:
  38. """This is the interface for the default input
  39. port of a Thing. Other (named) input ports will
  40. define similar methods with the names as
  41. on_PORT_next(), on_PORT_error(), and
  42. on_PORT_completed().
  43. """
  44. def on_next(self, x):
  45. pass
  46. def on_error(self, e):
  47. pass
  48. def on_completed(self):
  49. pass
  50. def _on_next_name(port):
  51. if port==None or port=='default':
  52. return 'on_next'
  53. else:
  54. return 'on_%s_next' % port
  55. def _on_error_name(port):
  56. if port==None or port=='default':
  57. return 'on_error'
  58. else:
  59. return 'on_%s_error' % port
  60. def _on_completed_name(port):
  61. if port==None or port=='default':
  62. return 'on_completed'
  63. else:
  64. return 'on_%s_completed' % port
  65. class CallableAsInputThing:
  66. """Wrap any callable with the InputThing interface.
  67. We only pass it the on_next() calls. on_error and on_completed
  68. can be passed in or default to noops.
  69. """
  70. def __init__(self, on_next=None, on_error=None, on_completed=None,
  71. port=None):
  72. setattr(self, _on_next_name(port), on_next or noop)
  73. if on_error:
  74. setattr(self, _on_error_name(port), on_error)
  75. else:
  76. def default_error(err):
  77. if isinstance(err, FatalError):
  78. raise err.with_traceback(err.__traceback__)
  79. else:
  80. logger.error("%s: Received on_error(%s)" %
  81. (self, err))
  82. setattr(self, _on_error_name(port), default_error)
  83. setattr(self, _on_completed_name(port), on_completed or noop)
  84. def __str__(self):
  85. return 'CallableAsInputThing(%s)' % str(self.on_next)
  86. def __repr__(self):
  87. return 'CallableAsInputThing(on_next=%s, on_error=%s, on_completed=%s)' % \
  88. (repr(self.on_next), repr(self.on_error), repr(self.on_completed))
  89. class FatalError(Exception):
  90. """This is the base class for exceptions that should terminate the event
  91. loop. This should be for out-of-bound errors, not for normal errors in
  92. the data stream. Examples of out-of-bound errors include an exception
  93. in the infrastructure or an error in configuring or dispatching an event
  94. stream (e.g. publishing to a non-existant port).
  95. """
  96. pass
  97. class InvalidPortError(FatalError):
  98. pass
  99. class UnknownPortError(FatalError):
  100. pass
  101. class PortAlreadyClosed(FatalError):
  102. pass
  103. class ExcInDispatch(FatalError):
  104. """Dispatching an event should not raise an error, other than a
  105. fatal error.
  106. """
  107. pass
  108. # Internal representation of a connection. The first three fields
  109. # are functions which dispatch to the InputThing. The InputThing and input_port
  110. # fields are not needed at runtime, but helpful in debugging.
  111. # We use a class with slots instead of a named tuple because we want to
  112. # change the values of the on_next, etc. functions when tracing (tuples
  113. # are read-only. The attribute access of a named tuple by name is no
  114. # faster than slots. If we need to spead this up at some point, use a
  115. # named tuple but access via the index values (at a cost to readability
  116. # of the code).
  117. class _Connection:
  118. __slots__ = ('on_next', 'on_completed', 'on_error', 'input_thing',
  119. 'input_port')
  120. def __init__(self, on_next, on_completed, on_error, input_thing,
  121. input_port):
  122. self.on_next = on_next
  123. self.on_completed = on_completed
  124. self.on_error = on_error
  125. self.input_thing = input_thing
  126. self.input_port = input_port
  127. def __repr__(self):
  128. return '_Connection(%s,%s,%s,%s,%s)' % \
  129. (repr(self.on_next), repr(self.on_completed), repr(self.on_error),
  130. repr(self.input_thing), repr(self.input_port))
  131. def __str__(self):
  132. return '_Connection(%s,%s)' % \
  133. (str(self.input_thing), str(self.input_port))
  134. class OutputThing:
  135. """Base class for event generators (output things). The non-underscore
  136. methods are the public end-user interface. The methods starting with
  137. underscores are for interactions with the scheduler.
  138. """
  139. def __init__(self, ports=None):
  140. self.__connections__ = {} # map from port to InputThing set
  141. if ports is None:
  142. self.__ports__ = set(['default',])
  143. else:
  144. self.__ports__ = set(ports)
  145. for port in self.__ports__:
  146. self.__connections__[port] = []
  147. self.__enqueue_fn__ = None
  148. self.__closed_ports__ = []
  149. def connect(self, input_thing, port_mapping=None):
  150. """Connect the InputThing to events on a specific port. The port
  151. mapping is a tuple of the OutputThing's port name and InputThing's port
  152. name. It defaults to (default, default).
  153. This returns a fuction that can be called to remove the connection.
  154. """
  155. if port_mapping==None:
  156. output_port = 'default'
  157. input_port = 'default'
  158. else:
  159. (output_port, input_port) = port_mapping
  160. if output_port not in self.__ports__:
  161. raise InvalidPortError("Invalid publish port '%s', valid ports are %s" %
  162. (output_port,
  163. ', '.join([str(s) for s in self.__ports__])))
  164. if not hasattr(input_thing, _on_next_name(input_port)) and callable(input_thing):
  165. input_thing = CallableAsInputThing(input_thing, port=input_port)
  166. try:
  167. connection = \
  168. _Connection(on_next=getattr(input_thing, _on_next_name(input_port)),
  169. on_completed=getattr(input_thing, _on_completed_name(input_port)),
  170. on_error=getattr(input_thing, _on_error_name(input_port)),
  171. input_thing=input_thing,
  172. input_port=input_port)
  173. except AttributeError:
  174. raise InvalidPortError("Invalid input port '%s', missing method(s) on InputThing %s" %
  175. (input_port, input_thing))
  176. new_connections = self.__connections__[output_port].copy()
  177. new_connections.append(connection)
  178. self.__connections__[output_port] = new_connections
  179. def disconnect():
  180. # To remove the connection, we replace the entire list with a copy
  181. # that is missing the connection. This allows disconnect() to be
  182. # called within a _dispatch method. Otherwise, we get an error if
  183. # we attempt to change the list of connections while iterating over
  184. # it.
  185. new_connections = self.__connections__[output_port].copy()
  186. #new_connections.remove(connection)
  187. # we look for a connection to the same port and thing rather than
  188. # the same object - the object may have changed due to tracing
  189. found = False
  190. for c in self.__connections__[output_port]:
  191. if c.input_thing==input_thing and c.input_port==input_port:
  192. new_connections.remove(c)
  193. found = True
  194. break
  195. assert found
  196. self.__connections__[output_port] = new_connections
  197. return disconnect
  198. def _has_connections(self):
  199. """Used by the scheduler to see the thing has any more outgoing connections.
  200. If a scheduled thing no longer has output connections, it is descheduled.
  201. """
  202. for (port, conns) in self.__connections__.items():
  203. if len(conns)>0:
  204. return True
  205. return False
  206. def _schedule(self, enqueue_fn):
  207. """This method is used by the scheduler to specify an enqueue function
  208. to be called
  209. when dispatching events to the connections. This is used when the
  210. OutputThing runs in a separate thread from the main event loop. If
  211. that is not the case, the enqueue function should be None.
  212. """
  213. self.__enqueue_fn__ = enqueue_fn
  214. def _close_port(self, port):
  215. """Port will receive no more messages. Remove the port from
  216. this OutputThing.
  217. """
  218. #print("Closing port %s on %s" % (port, self)) # XXX
  219. del self.__connections__[port]
  220. self.__ports__.remove(port)
  221. self.__closed_ports__.append(port)
  222. def _dispatch_next(self, x, port=None):
  223. #print("Dispatch next called on %s, port %s, msg %s" % (self, port, str(x)))
  224. if port==None:
  225. port = 'default'
  226. try:
  227. connections = self.__connections__[port]
  228. except KeyError as e:
  229. if port in self.__closed_ports__:
  230. raise PortAlreadyClosed("Port '%s' on OutputThing %s already had an on_completed or on_error_event" %
  231. (port, self))
  232. else:
  233. raise UnknownPortError("Unknown port '%s' in OutputThing %s" %
  234. (port, self)) from e
  235. if len(connections) == 0:
  236. return
  237. enq = self.__enqueue_fn__
  238. if enq:
  239. for s in connections:
  240. enq(s.on_next, x)
  241. else:
  242. try:
  243. for s in connections:
  244. s.on_next(x)
  245. except FatalError:
  246. raise
  247. except Exception as e:
  248. raise ExcInDispatch("Unexpected exception when dispatching event '%s' to InputThing %s from OutputThing %s" %
  249. (repr(x), s.input_thing, self)) from e
  250. def _dispatch_completed(self, port=None):
  251. if port==None:
  252. port = 'default'
  253. try:
  254. connections = self.__connections__[port]
  255. except KeyError as e:
  256. if port in self.__closed_ports__:
  257. raise PortAlreadyClosed("Port '%s' on OutputThing %s already had an on_completed or on_error_event" %
  258. (port, self))
  259. else:
  260. raise UnknownPortError("Unknown port '%s' in OutputThing %s" % (port, self)) from e
  261. enq = self.__enqueue_fn__
  262. if enq:
  263. for s in connections:
  264. enq(s.on_completed)
  265. else:
  266. try:
  267. for s in connections:
  268. s.on_completed()
  269. except FatalError:
  270. raise
  271. except Exception as e:
  272. raise ExcInDispatch("Unexpected exception when dispatching completed to InputThing %s from OutputThing %s" %
  273. (s.input_thing, self)) from e
  274. self._close_port(port)
  275. def _dispatch_error(self, e, port=None):
  276. if port==None:
  277. port = 'default'
  278. try:
  279. connections = self.__connections__[port]
  280. except KeyError as e:
  281. if port in self.__closed_ports__:
  282. raise PortAlreadyClosed("Port '%s' on OutputThing %s already had an on_completed or on_error_event" %
  283. (port, self))
  284. else:
  285. raise UnknownPortError("Unknown port '%s' in OutputThing %s" % (port, self)) from e
  286. enq = self.__enqueue_fn__
  287. if enq:
  288. for s in connections:
  289. enq(s.on_error, e)
  290. else:
  291. try:
  292. for s in connections:
  293. s.on_error(e)
  294. except FatalError:
  295. raise
  296. except Exception as e:
  297. raise ExcInDispatch("Unexpected exception when dispatching error '%s' to InputThing %s from OutputThing %s" %
  298. (repr(e), s.input_thing, self)) from e
  299. self._close_port(port)
  300. def print_downstream(self):
  301. """Recursively print all the downstream paths. This is for debugging.
  302. """
  303. def has_connections(thing):
  304. if not hasattr(thing, '__connections__'):
  305. return False
  306. return thing._has_connections()
  307. def print_from(current_seq, thing):
  308. if has_connections(thing):
  309. for (port, connections) in thing.__connections__.items():
  310. for connection in connections:
  311. if port=='default' and \
  312. connection.input_port=='default':
  313. next_seq = " => %s" % connection.input_thing
  314. else:
  315. next_seq = " [%s]=>[%s] %s" % \
  316. (port, connection.input_port,
  317. connection.input_thing)
  318. print_from(current_seq + next_seq,
  319. connection.input_thing)
  320. else:
  321. print(current_seq)
  322. print("***** Dump of all paths from %s *****" % self.__str__())
  323. print_from(" " + self.__str__(), self)
  324. print("*"*(12+len(self.__str__())))
  325. def trace_downstream(self):
  326. """Install wrappers that print a trace message for each
  327. event on this thing and all downsteam things.
  328. """
  329. def has_connections(thing):
  330. if not hasattr(thing, '__connections__'):
  331. return False
  332. return thing._has_connections()
  333. def fmt(thing, port):
  334. return '%s.%s' % (str(thing), port) if port!='default' \
  335. else str(thing)
  336. def trace_on_next(thing, output_port, connection, x):
  337. print(" %s => (%s) => %s" %
  338. (fmt(thing, output_port), str(x),
  339. fmt(connection.input_thing,
  340. connection.input_port)))
  341. connection.on_next(x)
  342. def trace_on_error(thing, output_port, connection, error):
  343. print(" %s => on_error(%s) => %s" %
  344. (fmt(thing, output_port), str(error),
  345. fmt(connection.input_thing,
  346. connection.input_port)))
  347. connection.on_error(error)
  348. def trace_on_completed(thing, output_port, connection):
  349. print(" %s => on_completed => %s" %
  350. (fmt(thing, output_port),
  351. fmt(connection.input_thing,
  352. connection.input_port)))
  353. connection.on_completed()
  354. def make_trace_connection(src_thing, output_port, old_connection):
  355. return _Connection(
  356. on_next=lambda x: trace_on_next(src_thing, output_port,
  357. old_connection, x),
  358. on_error=lambda e: trace_on_error(src_thing, output_port,
  359. old_connection, e),
  360. on_completed=lambda : trace_on_completed(src_thing,
  361. output_port,
  362. old_connection),
  363. input_thing=old_connection.input_thing,
  364. input_port=old_connection.input_port)
  365. def trace_from(thing):
  366. if has_connections(thing):
  367. new_connections = {}
  368. for (port, connections) in thing.__connections__.items():
  369. connections_for_port = []
  370. for connection in connections:
  371. trace_from(connection.input_thing)
  372. connections_for_port.append(make_trace_connection(thing,
  373. port,
  374. connection))
  375. new_connections[port] = connections_for_port
  376. thing.__connections__ = new_connections
  377. trace_from(self)
  378. print("***** installed tracing in all paths starting from %s" %
  379. str(self))
  380. def pp_connections(self):
  381. """pretty print the set of connections"""
  382. h1 = "***** InputThings for %s *****" % self
  383. print(h1)
  384. for port in sorted(self.__connections__.keys()):
  385. print(" Port %s" % port)
  386. for s in self.__connections__[port]:
  387. print(" [%s] => %s" % (s.input_port, s.input_thing))
  388. print(" on_next: %s" % s.on_next)
  389. print(" on_completed: %s" % s.on_completed)
  390. print(" on_error: %s" % s.on_error)
  391. print("*"*len(h1))
  392. def __str__(self):
  393. return self.__class__.__name__ + '()'
  394. class Filter(OutputThing, InputThing):
  395. """A filter has a default input port and a default output port. It is
  396. used for data transformations. The default implementations of on_next(),
  397. on_completed(), and on_error() just pass the event on to the downstream
  398. connection.
  399. """
  400. def __init__(self, previous_in_chain):
  401. super().__init__()
  402. # connect to the previous filter
  403. self.disconnect_from_upstream = previous_in_chain.connect(self)
  404. def on_next(self, x):
  405. self._dispatch_next(x)
  406. def on_error(self, e):
  407. self._dispatch_error(e)
  408. def on_completed(self):
  409. self._dispatch_completed()
  410. def __str__(self):
  411. return self.__class__.__name__ + '()'
  412. class XformOrDropFilter(Filter):
  413. """Implements a slightly more complex filter protocol where events may be
  414. transformed or dropped. Subclasses just need to implement the _filter() and
  415. _complete() methods.
  416. """
  417. def __init__(self, previous_in_chain):
  418. super().__init__(previous_in_chain)
  419. def on_next(self, x):
  420. """Calls _filter(x) to process
  421. the event. If _filter() returns None, nothing futher is done. Otherwise,
  422. the return value is passed to the downstream connection. This allows you
  423. to both transform as well as send only selected events.
  424. Errors other than FatalError are handled gracefully by calling
  425. self.on_error() and then disconnecing from the upstream OutputThing.
  426. """
  427. try:
  428. x_prime = self._filter(x)
  429. except FatalError:
  430. raise
  431. except Exception as e:
  432. logger.exception("Got an exception on %s._filter(%s)" %
  433. (self, x))
  434. self.on_error(e)
  435. self.disconnect_from_upstream()
  436. else:
  437. if x_prime is not None:
  438. self._dispatch_next(x_prime)
  439. def _filter(self, x):
  440. """Filtering method to be implemented by subclasses.
  441. """
  442. return x
  443. def _complete(self):
  444. """Method to be overridden by subclasses. It is called as a part of
  445. on_error() and on_completed() to give a chance to pass down a held-back
  446. event. Return None if there is no such event.
  447. You can also clean up any state in this method (e.g. close connections).
  448. Shold not throw any exceptions other than FatalError.
  449. """
  450. return None
  451. def on_error(self, e):
  452. """Passes on any final event and then passes the notification to the
  453. next Thing.
  454. If you need to clean up any state, do it in _complete().
  455. """
  456. x = self._complete()
  457. if x is not None:
  458. self._dispatch_next(x)
  459. self._dispatch_error(e)
  460. def on_completed(self):
  461. """Passes on any final event and then passes the notification to the
  462. next Thing.
  463. If you need to clean up any state, do it in _complete().
  464. """
  465. x = self._complete()
  466. if x is not None:
  467. self._dispatch_next(x)
  468. self._dispatch_completed()
  469. class FunctionFilter(Filter):
  470. """Implement a filter by providing functions that implement the
  471. on_next, on_completed, and one_error logic. This is useful
  472. when the logic is really simple or when a more functional programming
  473. style is more convenient.
  474. Each function takes a "self" parameter, so it works almost like it was
  475. defined as a bound method. The signatures are then::
  476. on_next(self, x)
  477. on_completed(self)
  478. on_error(self, e)
  479. If a function is not provided to __init__, we just dispatch the call downstream.
  480. """
  481. def __init__(self, previous_in_chain,
  482. on_next=None, on_completed=None,
  483. on_error=None, name=None):
  484. """name is an option name to be used in __str__() calls.
  485. """
  486. super().__init__(previous_in_chain)
  487. self._on_next = on_next
  488. self._on_error = on_error
  489. self._on_completed = on_completed
  490. if name:
  491. self.name = name
  492. def on_next(self, x):
  493. try:
  494. if self._on_next:
  495. # we pass in an extra "self" since this is a function, not a method
  496. self._on_next(self, x)
  497. else:
  498. self._dispatch_next(x)
  499. except FatalError:
  500. raise
  501. except Exception as e:
  502. logger.exception("Got an exception on %s.on_next(%s)" %
  503. (self, x))
  504. self.on_error(e)
  505. self.disconnect_from_upstream() # stop from getting upstream events
  506. def on_error(self, e):
  507. if self._on_error:
  508. self._on_error(self, e)
  509. else:
  510. self._dispatch_error(e)
  511. def on_completed(self):
  512. if self._on_completed:
  513. self._on_completed(self)
  514. else:
  515. self._dispatch_completed()
  516. def __str__(self):
  517. if hasattr(self, 'name'):
  518. return self.name
  519. else:
  520. return self.__class__.__name__ + '()'
  521. def _is_thunk(t):
  522. return hasattr(t, '__thunk__')
  523. def _make_thunk(t):
  524. setattr(t, '__thunk__', True)
  525. class _ThunkBuilder:
  526. """This is used to create a thunk from a linq-style
  527. method.
  528. """
  529. def __init__(self, func):
  530. self.func = func
  531. self.__name__ = func.__name__
  532. def __call__(self, *args, **kwargs):
  533. if len(args)==0 and len(kwargs)==0:
  534. _make_thunk(self.func)
  535. return self.func
  536. def apply(this):
  537. return self.func(this, *args, **kwargs)
  538. apply.__name__ = self.__name__
  539. _make_thunk(apply)
  540. return apply
  541. def __repr__(self):
  542. return "_ThunkBuilder(%s)" % self.__name__
  543. def _connect_thunk(prev, thunk):
  544. """Connect the thunk to the previous in the chain. Handles
  545. all the cases where we might be given a filter, a thunk,
  546. a thunk builder (unevaluated linq function), or a bare callable."""
  547. if callable(thunk):
  548. if _is_thunk(thunk):
  549. return thunk(prev)
  550. elif isinstance(thunk, _ThunkBuilder):
  551. real_thunk = thunk()
  552. assert _is_thunk(real_thunk)
  553. return real_thunk(prev)
  554. else: # bare callable, will be wrapped by the connect() method
  555. prev.connect(thunk)
  556. return None
  557. else:
  558. return prev.connect(thunk) # assumed to be a filter
  559. def filtermethod(base, alias=None):
  560. """Function decorator that creates a linq-style filter out of the
  561. specified function. As described in the thingflow.linq documentation,
  562. it should take a OutputThing as its first argument (the source of events)
  563. and return a OutputThing (representing the end the filter sequence once
  564. the filter is included. The returned OutputThing is typically an instance
  565. of thingflow.base.Filter.
  566. The specified function is used in two places:
  567. 1. A method with the specified name is added to the specified class
  568. (usually the OutputThing base class). This is for the fluent (method
  569. chaining) API.
  570. 2. A function is created in the local namespace for use in the functional API.
  571. This function does not take the OutputThing as an argument. Instead,
  572. it takes the remaining arguments and then returns a function which,
  573. when passed a OutputThing, connects to it and returns a filter.
  574. Decorator arguments:
  575. * param T base: Base class to extend with method
  576. (usually thingflow.base.OutputThing)
  577. * param string alias: an alias for this function or list of aliases
  578. (e.g. map for select, etc.).
  579. * returns: A function that takes the class to be decorated.
  580. * rtype: func -> func
  581. This was adapted from the RxPy extensionmethod decorator.
  582. """
  583. def inner(func):
  584. """This function is returned by the outer filtermethod()
  585. :param types.FunctionType func: Function to be decorated
  586. """
  587. func_names = [func.__name__,]
  588. if alias:
  589. aliases = alias if isinstance(alias, list) else [alias]
  590. func_names += aliases
  591. _thunk = _ThunkBuilder(func)
  592. # For the primary name and all aliases, set the name on the
  593. # base class as well as in the local namespace.
  594. for func_name in func_names:
  595. setattr(base, func_name, func)
  596. func.__globals__[func_name] = _thunk
  597. return _thunk
  598. return inner
  599. class DirectOutputThingMixin:
  600. """This is the interface for OutputThings that should be directly
  601. scheduled by the scheduler (e.g. through schedule_recurring(),
  602. schedule_periodic(), or schedule_periodic_on_separate_thread).
  603. """
  604. def _observe(self):
  605. """Get an event and call the appropriate dispatch function.
  606. """
  607. raise NotImplemented
  608. class EventLoopOutputThingMixin:
  609. """OutputThing that gets messages from an event loop, either the same
  610. loop as the scheduler or a separate one.
  611. """
  612. def _observe_event_loop(self):
  613. """Call the event OutputThing's event loop. When
  614. an event occurs, the appropriate _dispatch method should
  615. be called.
  616. """
  617. raise NotImplemented
  618. def _stop_loop(self):
  619. """When this method is called, the OutputThing should exit the
  620. event loop as soon as possible.
  621. """
  622. raise NotImplemented
  623. class IterableAsOutputThing(OutputThing, DirectOutputThingMixin):
  624. """Convert any interable to an OutputThing. This can be
  625. used with the schedule_recurring() and schedule_periodic()
  626. methods of the scheduler.
  627. """
  628. def __init__(self, iterable, name=None):
  629. super().__init__()
  630. self.iterable = iterable
  631. self.name = name
  632. def _observe(self):
  633. try:
  634. event = self.iterable.__next__()
  635. except StopIteration:
  636. self._close()
  637. self._dispatch_completed()
  638. except FatalError:
  639. self._close()
  640. raise
  641. except Exception as e:
  642. # If the iterable throws an exception, we treat it as non-fatal.
  643. # The error is dispatched downstream and the connection closed.
  644. # If other sensors are running, things will continue.
  645. tb.print_exc()
  646. self._close()
  647. self._dispatch_error(e)
  648. else:
  649. self._dispatch_next(event)
  650. def _close(self):
  651. """This method is called when we stop the iteration, either due to
  652. reaching the end of the sequence or an error. It can be overridden by
  653. subclasses to clean up any state and release resources (e.g. closing
  654. open files/connections).
  655. """
  656. pass
  657. def __str__(self):
  658. if hasattr(self, 'name') and self.name:
  659. return self.name
  660. else:
  661. return super().__str__()
  662. def from_iterable(i):
  663. return IterableAsOutputThing(i)
  664. def from_list(l):
  665. return IterableAsOutputThing(iter(l))
  666. # XXX Move this out of base.py
  667. class FunctionIteratorAsOutputThing(OutputThing, DirectOutputThingMixin):
  668. """Generates an OutputThing sequence by running a state-driven loop
  669. producing the sequence's elements. Example::
  670. res = GenerateOutputThing(0,
  671. lambda x: x < 10,
  672. lambda x: x + 1,
  673. lambda x: x)
  674. initial_state: Initial state.
  675. condition: Condition to terminate generation (upon returning False).
  676. iterate: Iteration step function.
  677. result_selector: Selector function for results produced in the sequence.
  678. Returns the generated sequence.
  679. """
  680. def __init__(self, initial_state, condition, iterate, result_selector):
  681. super().__init__()
  682. self.value = initial_state
  683. self.condition = condition
  684. self.iterate = iterate
  685. self.result_selector = result_selector
  686. self.first = True
  687. def _observe(self):
  688. try:
  689. if self.first: # first time: just send the value
  690. self.first = False
  691. if self.condition(self.value):
  692. r = self.result_selector(self.value)
  693. self._dispatch_next(r)
  694. else:
  695. self._dispatch_completed()
  696. else:
  697. if self.condition(self.value):
  698. self.value = self.iterate(self.value)
  699. r = self.result_selector(self.value)
  700. self._dispatch_next(r)
  701. else:
  702. self._dispatch_completed()
  703. except Exception as e:
  704. self._dispatch_error(e)
  705. def from_func(init, cond, iter, selector):
  706. return FunctionIteratorAsOutputThing(init, cond, iter, selector)
  707. # Define a default sensor event as a tuple of sensor id, timestamp, and value.
  708. SensorEvent = namedtuple('SensorEvent', ['sensor_id', 'ts', 'val'])
  709. def make_sensor_event(sensor, sample):
  710. """Given a sensor object and a sample taken from that sensor,
  711. return a SensorEvent tuple."""
  712. return SensorEvent(sensor_id=sensor.sensor_id, ts=time.time(),
  713. val=sample)
  714. class SensorAsOutputThing(OutputThing, DirectOutputThingMixin):
  715. """OutputThing that samples a sensor upon its observe call, creates
  716. an event from the sample, and dispatches it forward. A sensor is just
  717. an object that has a sensor_id property and a sample() method. If the
  718. sensor wants to complete the stream, it should throw a StopIteration
  719. exception.
  720. By default, it generates SensorEvent instances. This behavior can be
  721. changed by passing in a different function for make_event_fn.
  722. """
  723. def __init__(self, sensor, make_event_fn=make_sensor_event):
  724. super().__init__()
  725. self.sensor = sensor
  726. self.make_event_fn = make_event_fn
  727. def _observe(self):
  728. try:
  729. self._dispatch_next(self.make_event_fn(self.sensor,
  730. self.sensor.sample()))
  731. except FatalError:
  732. raise
  733. except StopIteration:
  734. self._dispatch_completed()
  735. except Exception as e:
  736. self._dispatch_error(e)
  737. def __repr__(self):
  738. return 'SensorAsOutputThing(%s)' % repr(self.sensor)
  739. class BlockingInputThing:
  740. """This implements a InputThing which may potential block when sending an
  741. event outside the system. The InputThing is run on a separate thread. We
  742. create proxy methods for each port that can be called directly - these
  743. methods just queue up the call to run in the worker thread.
  744. The actual implementation of the InputThing goes in the _on_next,
  745. _on_completed, and _on_error methods. Note that we don't dispatch to separate
  746. methods for each port. This is because the port is likely to end up as
  747. just a message field rather than as a separate destination in the lower
  748. layers.
  749. """
  750. def __init__(self, scheduler, ports=None):
  751. if ports==None:
  752. self.ports = ['default',]
  753. else:
  754. self.ports = ports
  755. self.num_closed_ports = 0
  756. # create local proxy methods for each port
  757. for port in self.ports:
  758. setattr(self, _on_next_name(port),
  759. lambda x: self.__queue__.put((self._on_next, False,
  760. [port, x]),))
  761. setattr(self, _on_completed_name(port),
  762. lambda: self.__queue__.put((self._on_completed, True,
  763. [port]),))
  764. setattr(self, _on_error_name(port),
  765. lambda e: self.__queue__.put((self._on_error, True,
  766. [port, e]),))
  767. self.__queue__ = queue.Queue()
  768. self.scheduler = scheduler
  769. self.thread = _ThreadForBlockingInputThing(self, scheduler)
  770. self.scheduler.active_schedules[self] = self.request_stop
  771. def start():
  772. self.thread.start()
  773. self.scheduler.event_loop.call_soon(start)
  774. def request_stop(self):
  775. """This can be called to stop the thread before it is automatically
  776. stopped when all ports are closed. The close() method will be
  777. called and the InputThing cannot be restarted later.
  778. """
  779. if self.thread==None:
  780. return # no thread to stop
  781. self.__queue__.put(None) # special stop token
  782. def _wait_and_dispatch(self):
  783. """Called by main loop of blocking thread to block for a request
  784. and then dispatch it. Returns True if it processed a normal request
  785. and False if it got a stop message or there is no more events possible.
  786. """
  787. action = self.__queue__.get()
  788. if action is not None:
  789. (method, closing_port, args) = action
  790. method(*args)
  791. if closing_port:
  792. self.num_closed_ports += 1
  793. if self.num_closed_ports==len(self.ports):
  794. # no more ports can receive events, treat this
  795. # as a stop.
  796. print("Stopping blocking InputThing %s" % self)
  797. return False
  798. return True # more work possible
  799. else:
  800. return False # stop requested
  801. def _on_next(self, port, x):
  802. """Process the on_next event. Called in blocking thread."""
  803. pass
  804. def _on_completed(self, port):
  805. """Process the on_completed event. Called in blocking thread."""
  806. pass
  807. def _on_error(self, port, e):
  808. """Process the on_error event. Called in blocking thread."""
  809. pass
  810. def _close(self):
  811. """This is called when all ports have been closed. This can be used
  812. to close any connections, etc.
  813. """
  814. pass
  815. class _ThreadForBlockingInputThing(threading.Thread):
  816. """Background thread for a InputThing that passes events to the
  817. external world and might block.
  818. """
  819. def __init__(self, input_thing, scheduler):
  820. self.input_thing = input_thing
  821. self.scheduler= scheduler
  822. self.stop_requested = False
  823. super().__init__()
  824. def run(self):
  825. try:
  826. more = True
  827. while more:
  828. more = self.input_thing._wait_and_dispatch()
  829. except Exception as e:
  830. msg = "_wait_and_dispatch for %s exited with error: %s" % \
  831. (self.input_thing, e)
  832. logger.exception(msg)
  833. self.input_thing._close()
  834. self.input_thing.thread = None # disassociate this thread
  835. def die(): # need to stop the scheduler in the main loop
  836. del self.scheduler.active_schedules[self.input_thing]
  837. raise ScheduleError(msg) from e
  838. self.scheduler.event_loop.call_soon_threadsafe(die)
  839. else:
  840. self.input_thing._close()
  841. self.input_thing.thread = None # disassociate this thread
  842. def done():
  843. self.scheduler._remove_from_active_schedules(self.input_thing)
  844. self.scheduler.event_loop.call_soon_threadsafe(done)
  845. class _ThreadForBlockingOutputThing(threading.Thread):
  846. """Background thread for OutputThings that might block.
  847. """
  848. def __init__(self, output_thing, interval, scheduler):
  849. self.output_thing = output_thing
  850. self.interval = interval
  851. self.scheduler = scheduler
  852. self.stop_requested = False
  853. super().__init__()
  854. def _stop_loop(self):
  855. self.stop_requested = True
  856. def run(self):
  857. def enqueue_fn(fn, *args):
  858. self.scheduler.event_loop.call_soon_threadsafe(fn, *args)
  859. self.output_thing._schedule(enqueue_fn=enqueue_fn)
  860. try:
  861. while True:
  862. if self.stop_requested:
  863. break
  864. start = time.time()
  865. self.output_thing._observe()
  866. if self.output_thing._has_connections():
  867. break
  868. time_left = self.interval - (time.time() - start)
  869. if time_left > 0 and (not self.stop_requested):
  870. time.sleep(time_left)
  871. except Exception as e:
  872. msg = "_observe for %s exited with error" % self.output_thing
  873. logger.exception(msg)
  874. def die(): # need to stop the scheduler in the main loop
  875. del self.scheduler.active_schedules[self.output_thing]
  876. raise ScheduleError(msg) from e
  877. self.scheduler.event_loop.call_soon_threadsafe(die)
  878. else:
  879. def done():
  880. self.scheduler._remove_from_active_schedules(self.output_thing)
  881. self.scheduler.event_loop.call_soon_threadsafe(done)
  882. class ScheduleError(FatalError):
  883. pass
  884. class Scheduler:
  885. """Wrap an asyncio event loop and provide methods for various kinds of
  886. periodic scheduling.
  887. """
  888. def __init__(self, event_loop):
  889. self.event_loop = event_loop
  890. self.active_schedules = {} # mapping from task to schedule handle
  891. self.pending_futures = {}
  892. self.next_future_id = 1
  893. # Set the following to an exception if we are exiting the loop due to
  894. # an exception. We will then raise a SchedulerError when the event loop
  895. # exits.
  896. self.fatal_error = None
  897. # we set the exception handler to stop all active schedules and
  898. # break out of the event loop if we get an unexpected error.
  899. def exception_handler(loop, context):
  900. assert loop==self.event_loop
  901. self.fatal_error = context['exception']
  902. self.stop()
  903. self.event_loop.set_exception_handler(exception_handler)
  904. def _remove_from_active_schedules(self, output_thing):
  905. """Remove the specified OutputThing from the active_schedules map.
  906. If there are no more active schedules, we will request exiting of
  907. the event loop. This method must be run from the main thread.
  908. """
  909. del self.active_schedules[output_thing]
  910. if len(self.active_schedules)==0:
  911. print("No more active schedules, will exit event loop")
  912. self.stop()
  913. def schedule_periodic(self, output_thing, interval):
  914. """Returns a callable that can be used to remove the OutputThing from the
  915. scheduler.
  916. """
  917. def cancel():
  918. try:
  919. handle = self.active_schedules[output_thing]
  920. except KeyError:
  921. raise ScheduleError("Attempt to de-schedule OutputThing %s, which does not have an active schedule" %
  922. output_thing)
  923. handle.cancel()
  924. self._remove_from_active_schedules(output_thing)
  925. def run():
  926. assert output_thing in self.active_schedules
  927. output_thing._observe()
  928. more = output_thing._has_connections()
  929. if not more and output_thing in self.active_schedules:
  930. self._remove_from_active_schedules(output_thing)
  931. elif output_thing in self.active_schedules:
  932. handle = self.event_loop.call_later(interval, run)
  933. self.active_schedules[output_thing] = handle
  934. output_thing._schedule(enqueue_fn=None)
  935. handle = self.event_loop.call_later(interval, run)
  936. self.active_schedules[output_thing] = handle
  937. output_thing._schedule(enqueue_fn=None)
  938. return cancel
  939. def schedule_sensor(self, sensor, interval, *input_thing_sequence,
  940. make_event_fn=make_sensor_event,
  941. print_downstream=False):
  942. """Create a OutputThing wrapper for the sensor and schedule it at the
  943. specified interval. Compose the specified connections (and/or thunks)
  944. into a sequence and connect the sequence to the sensor's OutputThing.
  945. Returns a thunk that can be used to remove the OutputThing from the
  946. scheduler.
  947. """
  948. output_thing = SensorAsOutputThing(sensor, make_event_fn=make_event_fn)
  949. prev = output_thing
  950. for s in input_thing_sequence:
  951. assert prev,\
  952. "attempted to compose a terminal InputThing/thunk in a non-final position"
  953. prev = _connect_thunk(prev, s)
  954. if print_downstream:
  955. output_thing.print_downstream() # just for debugging
  956. return self.schedule_periodic(output_thing, interval)
  957. def schedule_recurring(self, output_thing):
  958. """Takes a DirectOutputThingMixin and calls _observe() to get events. If,
  959. after the call, there are no downstream connections, the scheduler will
  960. deschedule the output thing.
  961. This variant is useful for something like an iterable. If the call to get
  962. the next event would block, don't use this! Instead, one of the calls
  963. that runs in a separate thread (e.g. schedule_recuring_separate_thread()
  964. or schedule_periodic_separate_thread()).
  965. Returns a callable that can be used to remove the OutputThing from the
  966. scheduler.
  967. """
  968. def cancel():
  969. print("canceling schedule of %s" % output_thing)
  970. try:
  971. handle = self.active_schedules[output_thing]
  972. except KeyError:
  973. raise ScheduleError("Attempt to de-schedule OutputThing %s, which does not have an active schedule" %
  974. output_thing)
  975. handle.cancel()
  976. self._remove_from_active_schedules(output_thing)
  977. def run():
  978. assert output_thing in self.active_schedules
  979. output_thing._observe()
  980. more = output_thing._has_connections()
  981. if not more and output_thing in self.active_schedules:
  982. self._remove_from_active_schedules(output_thing)
  983. elif output_thing in self.active_schedules:
  984. handle = self.event_loop.call_soon(run)
  985. self.active_schedules[output_thing] = handle
  986. output_thing._schedule(enqueue_fn=None)
  987. handle = self.event_loop.call_soon(run)
  988. self.active_schedules[output_thing] = handle
  989. output_thing._schedule(enqueue_fn=None)
  990. return cancel
  991. def schedule_on_main_event_loop(self, output_thing):
  992. """Schedule an OutputThing that runs on the main event loop.
  993. The OutputThing is assumed to implement EventLoopOutputThingMixin.
  994. Returns a callable that can be used to unschedule the OutputThing.
  995. """
  996. def stop():
  997. # tell the OutputThing to stop. When the OutputThing has finished
  998. # processing any messages, it MUST call
  999. # _remove_from_active_schedules() on the scheduler.
  1000. output_thing._stop_loop()
  1001. self.active_schedules[output_thing] = stop
  1002. self.event_loop.call_soon(output_thing._observe_event_loop)
  1003. return stop
  1004. def schedule_on_private_event_loop(self, output_thing):
  1005. """Schedule an OutputThing that has its own event loop on another thread.
  1006. The OutputThing is assumed to implement EventLoopOutputThingMixin.
  1007. Returns a callable that can be used to unschedule the OutputThing, by
  1008. requesting that the event loop stop.
  1009. """
  1010. def enqueue_fn(fn, *args):
  1011. self.event_loop.call_soon_threadsafe(fn, *args)
  1012. def thread_main():
  1013. try:
  1014. output_thing._schedule(enqueue_fn=enqueue_fn)
  1015. # ok, lets run the event loop
  1016. output_thing._observe_event_loop()
  1017. except Exception as e:
  1018. msg = "Event loop for %s exited with error" % output_thing
  1019. logger.exception(msg)
  1020. def die(): # need to stop the scheduler in the main loop
  1021. del self.active_schedules[output_thing]
  1022. raise ScheduleError(msg) from e
  1023. self.event_loop.call_soon_threadsafe(die)
  1024. else:
  1025. def loop_done():
  1026. self._remove_from_active_schedules(output_thing)
  1027. self.event_loop.call_soon_threadsafe(loop_done)
  1028. t = threading.Thread(target=thread_main)
  1029. self.active_schedules[output_thing] = output_thing._stop_loop
  1030. self.event_loop.call_soon(t.start)
  1031. return output_thing._stop_loop
  1032. def schedule_periodic_on_separate_thread(self, output_thing, interval):
  1033. """Schedule an OutputThing to run in a separate thread. It should
  1034. implement the DirectOutputThingMixin.
  1035. Returns a callable that can be used to unschedule the OutputThing, by
  1036. requesting that the child thread stop.
  1037. """
  1038. t = _ThreadForBlockingOutputThing(output_thing, interval, self)
  1039. self.active_schedules[output_thing] = t._stop_loop
  1040. self.event_loop.call_soon(t.start)
  1041. return t._stop_loop
  1042. def schedule_sensor_on_separate_thread(self, sensor, interval, *input_thing_sequence,
  1043. make_event_fn=make_sensor_event):
  1044. """Create a OutputThing wrapper for the sensor and schedule it at the
  1045. specified interval. Compose the specified connections (and/or thunks)
  1046. into a sequence and connect the sequence to the sensor's OutputThing.
  1047. Returns a thunk that can be used to remove the OutputThing from the
  1048. scheduler.
  1049. """
  1050. output_thing = SensorAsOutputThing(sensor, make_event_fn=make_event_fn)
  1051. prev = output_thing
  1052. for s in input_thing_sequence:
  1053. assert prev,\
  1054. "attempted to compose a terminal InputThing/thunk in a non-final position"
  1055. prev = _connect_thunk(prev, s)
  1056. return self.schedule_periodic_on_separate_thread(output_thing, interval)
  1057. def schedule_later_one_time(self, output_thing, interval):
  1058. def cancel():
  1059. print("canceling schedule of %s" % output_thing)
  1060. try:
  1061. handle = self.active_schedules[output_thing]
  1062. except KeyError:
  1063. raise ScheduleError("Attempt to de-schedule OutputThing %s, which does not have an active schedule" %
  1064. output_thing)
  1065. handle.cancel()
  1066. self._remove_from_active_schedules(output_thing)
  1067. def run():
  1068. assert output_thing in self.active_schedules
  1069. # Remove from the active schedules since this was a one-time schedule.
  1070. # Note that the _observe() call could potentially reschedule the
  1071. # OutputThing through another call to the scheduler.
  1072. self._remove_from_active_schedules(output_thing)
  1073. output_thing._observe()
  1074. handle = self.event_loop.call_later(interval, run)
  1075. self.active_schedules[output_thing] = handle
  1076. output_thing._schedule(enqueue_fn=None)
  1077. return cancel
  1078. def run_forever(self):
  1079. """Call the event loop's run_forever(). We don't really run forever:
  1080. the event loop is exited if we run out of scheduled events or if stop()
  1081. is called.
  1082. """
  1083. try:
  1084. self.event_loop.run_forever()
  1085. except KeyboardInterrupt:
  1086. # If someone hit Control-C to break out of the loop,
  1087. # they might be trying to diagonose a hang. Print the
  1088. # active OutputThings here before passing on the interrupt.
  1089. print("Active OutputThings: %s" %
  1090. ', '.join([('%s'%o) for o in self.active_schedules.keys()]))
  1091. raise
  1092. if self.fatal_error is not None:
  1093. raise ScheduleError("Scheduler aborted due to fatal error") \
  1094. from self.fatal_error
  1095. def _schedule_coroutine(self, coro, done_callback):
  1096. """This is for low-level components that deal directly with
  1097. the event loop to to schedule a coroutine. We
  1098. track them so we can either wait for or cancel them when stop()
  1099. is called.
  1100. """
  1101. fid = self.next_future_id
  1102. future = self.event_loop.create_task(coro)
  1103. # the combined callback. To avoid race conditions, always
  1104. # call the provided done callback before we remove the future.
  1105. def cb(f):
  1106. done_callback(f)
  1107. del self.pending_futures[fid]
  1108. self.pending_futures[fid] = future
  1109. future.add_done_callback(cb)
  1110. self.next_future_id += 1
  1111. return future
  1112. def stop(self):
  1113. """Stop any active schedules for output things and then call stop() on
  1114. the event loop.
  1115. """
  1116. for (task, handle) in self.active_schedules.items():
  1117. #print("Stopping %s" % task)
  1118. # The handles are either event scheduler handles (with a cancel
  1119. # method) or just callables to be called directly.
  1120. if hasattr(handle, 'cancel'):
  1121. handle.cancel()
  1122. else:
  1123. handle()
  1124. self.active_schedules = {}
  1125. # go through the pending futures. We don't stop the
  1126. # event loop until all the pending futures have been
  1127. # completed or stopped by their callers.
  1128. for (fid, f) in self.pending_futures.items():
  1129. if f.done() == False:
  1130. # if we still have pending futures, we try the
  1131. # stop again after the first one we see has
  1132. # completed.
  1133. #print("Waiting for future %d (%s)" % (fid, repr(f)))
  1134. def recheck_stop(f):
  1135. exc = f.exception()
  1136. if exc:
  1137. raise FatalError("Exception in coroutine %s" % repr(f)) from exc
  1138. else:
  1139. self.stop()
  1140. f.add_done_callback(recheck_stop)
  1141. return
  1142. elif f.exception():
  1143. raise FatalError("Exception in coroutine %s" % repr(f)) from exc
  1144. self.event_loop.stop()