| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296 |
- # Copyright 2016,2017 by MPI-SWS and Data-Ken Research.
- # Licensed under the Apache 2.0 License.
- """
- Base functionality for ThingFlow. All the core abstractions
- are defined here. Everything else is just subclassing or using
- these abstractions.
- The key abstractions are:
- * Thing - a unit of computation in the data flow graph. Things can be
- Filters (with inputs and outputs) or Adapters (with only inputs
- or only outputs).
- * OutputThing - Base class and interface for things that emit event streams on
- output ports.
- * Sensor - an object that is (indirectly) connected to the physical world.
- It can provide its current value through a sample() method.
- Sensors can be turned into Things by wrapping them with
- the SensorAsInputThing class.
- * InputThing - interface for things that receive a stream of events on one or
- more input ports.
- * Filter - a thing that is both an InputThing and an OutputThing, with one
- input and one output. Filters transform data streams.
- * Scheduler - The scheduler wraps an event loop. It provides periodic and
- one-time scheduling of OutputThings that originate events.
- * event - ThingFlow largely does not care about the particulars of the
- events it processes. However, we define a generic SensorEvent
- datatype that can be used when the details of the event matter
- to a thing.
- See the README.rst file for more details.
- """
- from collections import namedtuple
- import threading
- import time
- import queue
- import traceback as tb
- import logging
- logger = logging.getLogger(__name__)
- from thingflow.internal import noop
- class InputThing:
- """This is the interface for the default input
- port of a Thing. Other (named) input ports will
- define similar methods with the names as
- on_PORT_next(), on_PORT_error(), and
- on_PORT_completed().
- """
- def on_next(self, x):
- pass
-
- def on_error(self, e):
- pass
-
- def on_completed(self):
- pass
- def _on_next_name(port):
- if port==None or port=='default':
- return 'on_next'
- else:
- return 'on_%s_next' % port
- def _on_error_name(port):
- if port==None or port=='default':
- return 'on_error'
- else:
- return 'on_%s_error' % port
- def _on_completed_name(port):
- if port==None or port=='default':
- return 'on_completed'
- else:
- return 'on_%s_completed' % port
- class CallableAsInputThing:
- """Wrap any callable with the InputThing interface.
- We only pass it the on_next() calls. on_error and on_completed
- can be passed in or default to noops.
- """
- def __init__(self, on_next=None, on_error=None, on_completed=None,
- port=None):
- setattr(self, _on_next_name(port), on_next or noop)
- if on_error:
- setattr(self, _on_error_name(port), on_error)
- else:
- def default_error(err):
- if isinstance(err, FatalError):
- raise err.with_traceback(err.__traceback__)
- else:
- logger.error("%s: Received on_error(%s)" %
- (self, err))
- setattr(self, _on_error_name(port), default_error)
- setattr(self, _on_completed_name(port), on_completed or noop)
-
- def __str__(self):
- return 'CallableAsInputThing(%s)' % str(self.on_next)
- def __repr__(self):
- return 'CallableAsInputThing(on_next=%s, on_error=%s, on_completed=%s)' % \
- (repr(self.on_next), repr(self.on_error), repr(self.on_completed))
- class FatalError(Exception):
- """This is the base class for exceptions that should terminate the event
- loop. This should be for out-of-bound errors, not for normal errors in
- the data stream. Examples of out-of-bound errors include an exception
- in the infrastructure or an error in configuring or dispatching an event
- stream (e.g. publishing to a non-existant port).
- """
- pass
- class InvalidPortError(FatalError):
- pass
- class UnknownPortError(FatalError):
- pass
- class PortAlreadyClosed(FatalError):
- pass
- class ExcInDispatch(FatalError):
- """Dispatching an event should not raise an error, other than a
- fatal error.
- """
- pass
- # Internal representation of a connection. The first three fields
- # are functions which dispatch to the InputThing. The InputThing and input_port
- # fields are not needed at runtime, but helpful in debugging.
- # We use a class with slots instead of a named tuple because we want to
- # change the values of the on_next, etc. functions when tracing (tuples
- # are read-only. The attribute access of a named tuple by name is no
- # faster than slots. If we need to spead this up at some point, use a
- # named tuple but access via the index values (at a cost to readability
- # of the code).
- class _Connection:
- __slots__ = ('on_next', 'on_completed', 'on_error', 'input_thing',
- 'input_port')
- def __init__(self, on_next, on_completed, on_error, input_thing,
- input_port):
- self.on_next = on_next
- self.on_completed = on_completed
- self.on_error = on_error
- self.input_thing = input_thing
- self.input_port = input_port
- def __repr__(self):
- return '_Connection(%s,%s,%s,%s,%s)' % \
- (repr(self.on_next), repr(self.on_completed), repr(self.on_error),
- repr(self.input_thing), repr(self.input_port))
-
- def __str__(self):
- return '_Connection(%s,%s)' % \
- (str(self.input_thing), str(self.input_port))
-
- class OutputThing:
- """Base class for event generators (output things). The non-underscore
- methods are the public end-user interface. The methods starting with
- underscores are for interactions with the scheduler.
- """
- def __init__(self, ports=None):
- self.__connections__ = {} # map from port to InputThing set
- if ports is None:
- self.__ports__ = set(['default',])
- else:
- self.__ports__ = set(ports)
- for port in self.__ports__:
- self.__connections__[port] = []
- self.__enqueue_fn__ = None
- self.__closed_ports__ = []
- def connect(self, input_thing, port_mapping=None):
- """Connect the InputThing to events on a specific port. The port
- mapping is a tuple of the OutputThing's port name and InputThing's port
- name. It defaults to (default, default).
- This returns a fuction that can be called to remove the connection.
- """
- if port_mapping==None:
- output_port = 'default'
- input_port = 'default'
- else:
- (output_port, input_port) = port_mapping
- if output_port not in self.__ports__:
- raise InvalidPortError("Invalid publish port '%s', valid ports are %s" %
- (output_port,
- ', '.join([str(s) for s in self.__ports__])))
- if not hasattr(input_thing, _on_next_name(input_port)) and callable(input_thing):
- input_thing = CallableAsInputThing(input_thing, port=input_port)
- try:
- connection = \
- _Connection(on_next=getattr(input_thing, _on_next_name(input_port)),
- on_completed=getattr(input_thing, _on_completed_name(input_port)),
- on_error=getattr(input_thing, _on_error_name(input_port)),
- input_thing=input_thing,
- input_port=input_port)
- except AttributeError:
- raise InvalidPortError("Invalid input port '%s', missing method(s) on InputThing %s" %
- (input_port, input_thing))
- new_connections = self.__connections__[output_port].copy()
- new_connections.append(connection)
- self.__connections__[output_port] = new_connections
- def disconnect():
- # To remove the connection, we replace the entire list with a copy
- # that is missing the connection. This allows disconnect() to be
- # called within a _dispatch method. Otherwise, we get an error if
- # we attempt to change the list of connections while iterating over
- # it.
- new_connections = self.__connections__[output_port].copy()
- #new_connections.remove(connection)
- # we look for a connection to the same port and thing rather than
- # the same object - the object may have changed due to tracing
- found = False
- for c in self.__connections__[output_port]:
- if c.input_thing==input_thing and c.input_port==input_port:
- new_connections.remove(c)
- found = True
- break
- assert found
- self.__connections__[output_port] = new_connections
- return disconnect
- def _has_connections(self):
- """Used by the scheduler to see the thing has any more outgoing connections.
- If a scheduled thing no longer has output connections, it is descheduled.
- """
- for (port, conns) in self.__connections__.items():
- if len(conns)>0:
- return True
- return False
-
- def _schedule(self, enqueue_fn):
- """This method is used by the scheduler to specify an enqueue function
- to be called
- when dispatching events to the connections. This is used when the
- OutputThing runs in a separate thread from the main event loop. If
- that is not the case, the enqueue function should be None.
- """
- self.__enqueue_fn__ = enqueue_fn
- def _close_port(self, port):
- """Port will receive no more messages. Remove the port from
- this OutputThing.
- """
- #print("Closing port %s on %s" % (port, self)) # XXX
- del self.__connections__[port]
- self.__ports__.remove(port)
- self.__closed_ports__.append(port)
- def _dispatch_next(self, x, port=None):
- #print("Dispatch next called on %s, port %s, msg %s" % (self, port, str(x)))
- if port==None:
- port = 'default'
- try:
- connections = self.__connections__[port]
- except KeyError as e:
- if port in self.__closed_ports__:
- raise PortAlreadyClosed("Port '%s' on OutputThing %s already had an on_completed or on_error_event" %
- (port, self))
- else:
- raise UnknownPortError("Unknown port '%s' in OutputThing %s" %
- (port, self)) from e
- if len(connections) == 0:
- return
- enq = self.__enqueue_fn__
- if enq:
- for s in connections:
- enq(s.on_next, x)
- else:
- try:
- for s in connections:
- s.on_next(x)
- except FatalError:
- raise
- except Exception as e:
- raise ExcInDispatch("Unexpected exception when dispatching event '%s' to InputThing %s from OutputThing %s" %
- (repr(x), s.input_thing, self)) from e
- def _dispatch_completed(self, port=None):
- if port==None:
- port = 'default'
- try:
- connections = self.__connections__[port]
- except KeyError as e:
- if port in self.__closed_ports__:
- raise PortAlreadyClosed("Port '%s' on OutputThing %s already had an on_completed or on_error_event" %
- (port, self))
- else:
- raise UnknownPortError("Unknown port '%s' in OutputThing %s" % (port, self)) from e
- enq = self.__enqueue_fn__
- if enq:
- for s in connections:
- enq(s.on_completed)
- else:
- try:
- for s in connections:
- s.on_completed()
- except FatalError:
- raise
- except Exception as e:
- raise ExcInDispatch("Unexpected exception when dispatching completed to InputThing %s from OutputThing %s" %
- (s.input_thing, self)) from e
- self._close_port(port)
- def _dispatch_error(self, e, port=None):
- if port==None:
- port = 'default'
- try:
- connections = self.__connections__[port]
- except KeyError as e:
- if port in self.__closed_ports__:
- raise PortAlreadyClosed("Port '%s' on OutputThing %s already had an on_completed or on_error_event" %
- (port, self))
- else:
- raise UnknownPortError("Unknown port '%s' in OutputThing %s" % (port, self)) from e
- enq = self.__enqueue_fn__
- if enq:
- for s in connections:
- enq(s.on_error, e)
- else:
- try:
- for s in connections:
- s.on_error(e)
- except FatalError:
- raise
- except Exception as e:
- raise ExcInDispatch("Unexpected exception when dispatching error '%s' to InputThing %s from OutputThing %s" %
- (repr(e), s.input_thing, self)) from e
- self._close_port(port)
- def print_downstream(self):
- """Recursively print all the downstream paths. This is for debugging.
- """
- def has_connections(thing):
- if not hasattr(thing, '__connections__'):
- return False
- return thing._has_connections()
- def print_from(current_seq, thing):
- if has_connections(thing):
- for (port, connections) in thing.__connections__.items():
- for connection in connections:
- if port=='default' and \
- connection.input_port=='default':
- next_seq = " => %s" % connection.input_thing
- else:
- next_seq = " [%s]=>[%s] %s" % \
- (port, connection.input_port,
- connection.input_thing)
- print_from(current_seq + next_seq,
- connection.input_thing)
- else:
- print(current_seq)
- print("***** Dump of all paths from %s *****" % self.__str__())
- print_from(" " + self.__str__(), self)
- print("*"*(12+len(self.__str__())))
- def trace_downstream(self):
- """Install wrappers that print a trace message for each
- event on this thing and all downsteam things.
- """
- def has_connections(thing):
- if not hasattr(thing, '__connections__'):
- return False
- return thing._has_connections()
- def fmt(thing, port):
- return '%s.%s' % (str(thing), port) if port!='default' \
- else str(thing)
- def trace_on_next(thing, output_port, connection, x):
- print(" %s => (%s) => %s" %
- (fmt(thing, output_port), str(x),
- fmt(connection.input_thing,
- connection.input_port)))
- connection.on_next(x)
- def trace_on_error(thing, output_port, connection, error):
- print(" %s => on_error(%s) => %s" %
- (fmt(thing, output_port), str(error),
- fmt(connection.input_thing,
- connection.input_port)))
- connection.on_error(error)
- def trace_on_completed(thing, output_port, connection):
- print(" %s => on_completed => %s" %
- (fmt(thing, output_port),
- fmt(connection.input_thing,
- connection.input_port)))
- connection.on_completed()
-
- def make_trace_connection(src_thing, output_port, old_connection):
- return _Connection(
- on_next=lambda x: trace_on_next(src_thing, output_port,
- old_connection, x),
- on_error=lambda e: trace_on_error(src_thing, output_port,
- old_connection, e),
- on_completed=lambda : trace_on_completed(src_thing,
- output_port,
- old_connection),
- input_thing=old_connection.input_thing,
- input_port=old_connection.input_port)
- def trace_from(thing):
- if has_connections(thing):
- new_connections = {}
- for (port, connections) in thing.__connections__.items():
- connections_for_port = []
- for connection in connections:
- trace_from(connection.input_thing)
- connections_for_port.append(make_trace_connection(thing,
- port,
- connection))
- new_connections[port] = connections_for_port
- thing.__connections__ = new_connections
- trace_from(self)
- print("***** installed tracing in all paths starting from %s" %
- str(self))
-
- def pp_connections(self):
- """pretty print the set of connections"""
- h1 = "***** InputThings for %s *****" % self
- print(h1)
- for port in sorted(self.__connections__.keys()):
- print(" Port %s" % port)
- for s in self.__connections__[port]:
- print(" [%s] => %s" % (s.input_port, s.input_thing))
- print(" on_next: %s" % s.on_next)
- print(" on_completed: %s" % s.on_completed)
- print(" on_error: %s" % s.on_error)
- print("*"*len(h1))
- def __str__(self):
- return self.__class__.__name__ + '()'
-
- class Filter(OutputThing, InputThing):
- """A filter has a default input port and a default output port. It is
- used for data transformations. The default implementations of on_next(),
- on_completed(), and on_error() just pass the event on to the downstream
- connection.
- """
- def __init__(self, previous_in_chain):
- super().__init__()
- # connect to the previous filter
- self.disconnect_from_upstream = previous_in_chain.connect(self)
- def on_next(self, x):
- self._dispatch_next(x)
- def on_error(self, e):
- self._dispatch_error(e)
- def on_completed(self):
- self._dispatch_completed()
- def __str__(self):
- return self.__class__.__name__ + '()'
-
- class XformOrDropFilter(Filter):
- """Implements a slightly more complex filter protocol where events may be
- transformed or dropped. Subclasses just need to implement the _filter() and
- _complete() methods.
- """
- def __init__(self, previous_in_chain):
- super().__init__(previous_in_chain)
- def on_next(self, x):
- """Calls _filter(x) to process
- the event. If _filter() returns None, nothing futher is done. Otherwise,
- the return value is passed to the downstream connection. This allows you
- to both transform as well as send only selected events.
- Errors other than FatalError are handled gracefully by calling
- self.on_error() and then disconnecing from the upstream OutputThing.
- """
- try:
- x_prime = self._filter(x)
- except FatalError:
- raise
- except Exception as e:
- logger.exception("Got an exception on %s._filter(%s)" %
- (self, x))
- self.on_error(e)
- self.disconnect_from_upstream()
- else:
- if x_prime is not None:
- self._dispatch_next(x_prime)
- def _filter(self, x):
- """Filtering method to be implemented by subclasses.
- """
- return x
- def _complete(self):
- """Method to be overridden by subclasses. It is called as a part of
- on_error() and on_completed() to give a chance to pass down a held-back
- event. Return None if there is no such event.
- You can also clean up any state in this method (e.g. close connections).
- Shold not throw any exceptions other than FatalError.
- """
- return None
-
- def on_error(self, e):
- """Passes on any final event and then passes the notification to the
- next Thing.
- If you need to clean up any state, do it in _complete().
- """
- x = self._complete()
- if x is not None:
- self._dispatch_next(x)
- self._dispatch_error(e)
- def on_completed(self):
- """Passes on any final event and then passes the notification to the
- next Thing.
- If you need to clean up any state, do it in _complete().
- """
- x = self._complete()
- if x is not None:
- self._dispatch_next(x)
- self._dispatch_completed()
- class FunctionFilter(Filter):
- """Implement a filter by providing functions that implement the
- on_next, on_completed, and one_error logic. This is useful
- when the logic is really simple or when a more functional programming
- style is more convenient.
- Each function takes a "self" parameter, so it works almost like it was
- defined as a bound method. The signatures are then::
- on_next(self, x)
- on_completed(self)
- on_error(self, e)
- If a function is not provided to __init__, we just dispatch the call downstream.
- """
- def __init__(self, previous_in_chain,
- on_next=None, on_completed=None,
- on_error=None, name=None):
- """name is an option name to be used in __str__() calls.
- """
- super().__init__(previous_in_chain)
- self._on_next = on_next
- self._on_error = on_error
- self._on_completed = on_completed
- if name:
- self.name = name
- def on_next(self, x):
- try:
- if self._on_next:
- # we pass in an extra "self" since this is a function, not a method
- self._on_next(self, x)
- else:
- self._dispatch_next(x)
- except FatalError:
- raise
- except Exception as e:
- logger.exception("Got an exception on %s.on_next(%s)" %
- (self, x))
- self.on_error(e)
- self.disconnect_from_upstream() # stop from getting upstream events
- def on_error(self, e):
- if self._on_error:
- self._on_error(self, e)
- else:
- self._dispatch_error(e)
-
- def on_completed(self):
- if self._on_completed:
- self._on_completed(self)
- else:
- self._dispatch_completed()
- def __str__(self):
- if hasattr(self, 'name'):
- return self.name
- else:
- return self.__class__.__name__ + '()'
- def _is_thunk(t):
- return hasattr(t, '__thunk__')
- def _make_thunk(t):
- setattr(t, '__thunk__', True)
- class _ThunkBuilder:
- """This is used to create a thunk from a linq-style
- method.
- """
- def __init__(self, func):
- self.func = func
- self.__name__ = func.__name__
- def __call__(self, *args, **kwargs):
- if len(args)==0 and len(kwargs)==0:
- _make_thunk(self.func)
- return self.func
- def apply(this):
- return self.func(this, *args, **kwargs)
- apply.__name__ = self.__name__
- _make_thunk(apply)
- return apply
- def __repr__(self):
- return "_ThunkBuilder(%s)" % self.__name__
- def _connect_thunk(prev, thunk):
- """Connect the thunk to the previous in the chain. Handles
- all the cases where we might be given a filter, a thunk,
- a thunk builder (unevaluated linq function), or a bare callable."""
- if callable(thunk):
- if _is_thunk(thunk):
- return thunk(prev)
- elif isinstance(thunk, _ThunkBuilder):
- real_thunk = thunk()
- assert _is_thunk(real_thunk)
- return real_thunk(prev)
- else: # bare callable, will be wrapped by the connect() method
- prev.connect(thunk)
- return None
- else:
- return prev.connect(thunk) # assumed to be a filter
-
- def filtermethod(base, alias=None):
- """Function decorator that creates a linq-style filter out of the
- specified function. As described in the thingflow.linq documentation,
- it should take a OutputThing as its first argument (the source of events)
- and return a OutputThing (representing the end the filter sequence once
- the filter is included. The returned OutputThing is typically an instance
- of thingflow.base.Filter.
- The specified function is used in two places:
- 1. A method with the specified name is added to the specified class
- (usually the OutputThing base class). This is for the fluent (method
- chaining) API.
- 2. A function is created in the local namespace for use in the functional API.
- This function does not take the OutputThing as an argument. Instead,
- it takes the remaining arguments and then returns a function which,
- when passed a OutputThing, connects to it and returns a filter.
- Decorator arguments:
- * param T base: Base class to extend with method
- (usually thingflow.base.OutputThing)
- * param string alias: an alias for this function or list of aliases
- (e.g. map for select, etc.).
- * returns: A function that takes the class to be decorated.
- * rtype: func -> func
- This was adapted from the RxPy extensionmethod decorator.
- """
- def inner(func):
- """This function is returned by the outer filtermethod()
- :param types.FunctionType func: Function to be decorated
- """
- func_names = [func.__name__,]
- if alias:
- aliases = alias if isinstance(alias, list) else [alias]
- func_names += aliases
- _thunk = _ThunkBuilder(func)
- # For the primary name and all aliases, set the name on the
- # base class as well as in the local namespace.
- for func_name in func_names:
- setattr(base, func_name, func)
- func.__globals__[func_name] = _thunk
- return _thunk
- return inner
- class DirectOutputThingMixin:
- """This is the interface for OutputThings that should be directly
- scheduled by the scheduler (e.g. through schedule_recurring(),
- schedule_periodic(), or schedule_periodic_on_separate_thread).
- """
- def _observe(self):
- """Get an event and call the appropriate dispatch function.
- """
- raise NotImplemented
-
- class EventLoopOutputThingMixin:
- """OutputThing that gets messages from an event loop, either the same
- loop as the scheduler or a separate one.
- """
- def _observe_event_loop(self):
- """Call the event OutputThing's event loop. When
- an event occurs, the appropriate _dispatch method should
- be called.
- """
- raise NotImplemented
- def _stop_loop(self):
- """When this method is called, the OutputThing should exit the
- event loop as soon as possible.
- """
- raise NotImplemented
- class IterableAsOutputThing(OutputThing, DirectOutputThingMixin):
- """Convert any interable to an OutputThing. This can be
- used with the schedule_recurring() and schedule_periodic()
- methods of the scheduler.
- """
- def __init__(self, iterable, name=None):
- super().__init__()
- self.iterable = iterable
- self.name = name
-
- def _observe(self):
- try:
- event = self.iterable.__next__()
- except StopIteration:
- self._close()
- self._dispatch_completed()
- except FatalError:
- self._close()
- raise
- except Exception as e:
- # If the iterable throws an exception, we treat it as non-fatal.
- # The error is dispatched downstream and the connection closed.
- # If other sensors are running, things will continue.
- tb.print_exc()
- self._close()
- self._dispatch_error(e)
- else:
- self._dispatch_next(event)
- def _close(self):
- """This method is called when we stop the iteration, either due to
- reaching the end of the sequence or an error. It can be overridden by
- subclasses to clean up any state and release resources (e.g. closing
- open files/connections).
- """
- pass
-
- def __str__(self):
- if hasattr(self, 'name') and self.name:
- return self.name
- else:
- return super().__str__()
-
- def from_iterable(i):
- return IterableAsOutputThing(i)
- def from_list(l):
- return IterableAsOutputThing(iter(l))
- # XXX Move this out of base.py
- class FunctionIteratorAsOutputThing(OutputThing, DirectOutputThingMixin):
- """Generates an OutputThing sequence by running a state-driven loop
- producing the sequence's elements. Example::
- res = GenerateOutputThing(0,
- lambda x: x < 10,
- lambda x: x + 1,
- lambda x: x)
- initial_state: Initial state.
- condition: Condition to terminate generation (upon returning False).
- iterate: Iteration step function.
- result_selector: Selector function for results produced in the sequence.
- Returns the generated sequence.
- """
- def __init__(self, initial_state, condition, iterate, result_selector):
- super().__init__()
- self.value = initial_state
- self.condition = condition
- self.iterate = iterate
- self.result_selector = result_selector
- self.first = True
- def _observe(self):
- try:
- if self.first: # first time: just send the value
- self.first = False
- if self.condition(self.value):
- r = self.result_selector(self.value)
- self._dispatch_next(r)
- else:
- self._dispatch_completed()
- else:
- if self.condition(self.value):
- self.value = self.iterate(self.value)
- r = self.result_selector(self.value)
- self._dispatch_next(r)
- else:
- self._dispatch_completed()
- except Exception as e:
- self._dispatch_error(e)
- def from_func(init, cond, iter, selector):
- return FunctionIteratorAsOutputThing(init, cond, iter, selector)
- # Define a default sensor event as a tuple of sensor id, timestamp, and value.
- SensorEvent = namedtuple('SensorEvent', ['sensor_id', 'ts', 'val'])
- def make_sensor_event(sensor, sample):
- """Given a sensor object and a sample taken from that sensor,
- return a SensorEvent tuple."""
- return SensorEvent(sensor_id=sensor.sensor_id, ts=time.time(),
- val=sample)
- class SensorAsOutputThing(OutputThing, DirectOutputThingMixin):
- """OutputThing that samples a sensor upon its observe call, creates
- an event from the sample, and dispatches it forward. A sensor is just
- an object that has a sensor_id property and a sample() method. If the
- sensor wants to complete the stream, it should throw a StopIteration
- exception.
- By default, it generates SensorEvent instances. This behavior can be
- changed by passing in a different function for make_event_fn.
- """
- def __init__(self, sensor, make_event_fn=make_sensor_event):
- super().__init__()
- self.sensor = sensor
- self.make_event_fn = make_event_fn
-
- def _observe(self):
- try:
- self._dispatch_next(self.make_event_fn(self.sensor,
- self.sensor.sample()))
- except FatalError:
- raise
- except StopIteration:
- self._dispatch_completed()
- except Exception as e:
- self._dispatch_error(e)
-
- def __repr__(self):
- return 'SensorAsOutputThing(%s)' % repr(self.sensor)
- class BlockingInputThing:
- """This implements a InputThing which may potential block when sending an
- event outside the system. The InputThing is run on a separate thread. We
- create proxy methods for each port that can be called directly - these
- methods just queue up the call to run in the worker thread.
- The actual implementation of the InputThing goes in the _on_next,
- _on_completed, and _on_error methods. Note that we don't dispatch to separate
- methods for each port. This is because the port is likely to end up as
- just a message field rather than as a separate destination in the lower
- layers.
- """
- def __init__(self, scheduler, ports=None):
- if ports==None:
- self.ports = ['default',]
- else:
- self.ports = ports
- self.num_closed_ports = 0
- # create local proxy methods for each port
- for port in self.ports:
- setattr(self, _on_next_name(port),
- lambda x: self.__queue__.put((self._on_next, False,
- [port, x]),))
- setattr(self, _on_completed_name(port),
- lambda: self.__queue__.put((self._on_completed, True,
- [port]),))
- setattr(self, _on_error_name(port),
- lambda e: self.__queue__.put((self._on_error, True,
- [port, e]),))
- self.__queue__ = queue.Queue()
- self.scheduler = scheduler
- self.thread = _ThreadForBlockingInputThing(self, scheduler)
- self.scheduler.active_schedules[self] = self.request_stop
- def start():
- self.thread.start()
- self.scheduler.event_loop.call_soon(start)
- def request_stop(self):
- """This can be called to stop the thread before it is automatically
- stopped when all ports are closed. The close() method will be
- called and the InputThing cannot be restarted later.
- """
- if self.thread==None:
- return # no thread to stop
- self.__queue__.put(None) # special stop token
- def _wait_and_dispatch(self):
- """Called by main loop of blocking thread to block for a request
- and then dispatch it. Returns True if it processed a normal request
- and False if it got a stop message or there is no more events possible.
- """
- action = self.__queue__.get()
- if action is not None:
- (method, closing_port, args) = action
- method(*args)
- if closing_port:
- self.num_closed_ports += 1
- if self.num_closed_ports==len(self.ports):
- # no more ports can receive events, treat this
- # as a stop.
- print("Stopping blocking InputThing %s" % self)
- return False
- return True # more work possible
- else:
- return False # stop requested
-
-
- def _on_next(self, port, x):
- """Process the on_next event. Called in blocking thread."""
- pass
- def _on_completed(self, port):
- """Process the on_completed event. Called in blocking thread."""
- pass
- def _on_error(self, port, e):
- """Process the on_error event. Called in blocking thread."""
- pass
- def _close(self):
- """This is called when all ports have been closed. This can be used
- to close any connections, etc.
- """
- pass
-
- class _ThreadForBlockingInputThing(threading.Thread):
- """Background thread for a InputThing that passes events to the
- external world and might block.
- """
- def __init__(self, input_thing, scheduler):
- self.input_thing = input_thing
- self.scheduler= scheduler
- self.stop_requested = False
- super().__init__()
- def run(self):
- try:
- more = True
- while more:
- more = self.input_thing._wait_and_dispatch()
- except Exception as e:
- msg = "_wait_and_dispatch for %s exited with error: %s" % \
- (self.input_thing, e)
- logger.exception(msg)
- self.input_thing._close()
- self.input_thing.thread = None # disassociate this thread
- def die(): # need to stop the scheduler in the main loop
- del self.scheduler.active_schedules[self.input_thing]
- raise ScheduleError(msg) from e
- self.scheduler.event_loop.call_soon_threadsafe(die)
- else:
- self.input_thing._close()
- self.input_thing.thread = None # disassociate this thread
- def done():
- self.scheduler._remove_from_active_schedules(self.input_thing)
- self.scheduler.event_loop.call_soon_threadsafe(done)
-
- class _ThreadForBlockingOutputThing(threading.Thread):
- """Background thread for OutputThings that might block.
- """
- def __init__(self, output_thing, interval, scheduler):
- self.output_thing = output_thing
- self.interval = interval
- self.scheduler = scheduler
- self.stop_requested = False
- super().__init__()
- def _stop_loop(self):
- self.stop_requested = True
- def run(self):
- def enqueue_fn(fn, *args):
- self.scheduler.event_loop.call_soon_threadsafe(fn, *args)
- self.output_thing._schedule(enqueue_fn=enqueue_fn)
-
- try:
- while True:
- if self.stop_requested:
- break
- start = time.time()
- self.output_thing._observe()
- if self.output_thing._has_connections():
- break
- time_left = self.interval - (time.time() - start)
- if time_left > 0 and (not self.stop_requested):
- time.sleep(time_left)
- except Exception as e:
- msg = "_observe for %s exited with error" % self.output_thing
- logger.exception(msg)
- def die(): # need to stop the scheduler in the main loop
- del self.scheduler.active_schedules[self.output_thing]
- raise ScheduleError(msg) from e
- self.scheduler.event_loop.call_soon_threadsafe(die)
- else:
- def done():
- self.scheduler._remove_from_active_schedules(self.output_thing)
- self.scheduler.event_loop.call_soon_threadsafe(done)
-
-
- class ScheduleError(FatalError):
- pass
- class Scheduler:
- """Wrap an asyncio event loop and provide methods for various kinds of
- periodic scheduling.
- """
- def __init__(self, event_loop):
- self.event_loop = event_loop
- self.active_schedules = {} # mapping from task to schedule handle
- self.pending_futures = {}
- self.next_future_id = 1
- # Set the following to an exception if we are exiting the loop due to
- # an exception. We will then raise a SchedulerError when the event loop
- # exits.
- self.fatal_error = None
- # we set the exception handler to stop all active schedules and
- # break out of the event loop if we get an unexpected error.
- def exception_handler(loop, context):
- assert loop==self.event_loop
- self.fatal_error = context['exception']
- self.stop()
- self.event_loop.set_exception_handler(exception_handler)
- def _remove_from_active_schedules(self, output_thing):
- """Remove the specified OutputThing from the active_schedules map.
- If there are no more active schedules, we will request exiting of
- the event loop. This method must be run from the main thread.
- """
- del self.active_schedules[output_thing]
- if len(self.active_schedules)==0:
- print("No more active schedules, will exit event loop")
- self.stop()
- def schedule_periodic(self, output_thing, interval):
- """Returns a callable that can be used to remove the OutputThing from the
- scheduler.
- """
- def cancel():
- try:
- handle = self.active_schedules[output_thing]
- except KeyError:
- raise ScheduleError("Attempt to de-schedule OutputThing %s, which does not have an active schedule" %
- output_thing)
- handle.cancel()
- self._remove_from_active_schedules(output_thing)
- def run():
- assert output_thing in self.active_schedules
- output_thing._observe()
- more = output_thing._has_connections()
- if not more and output_thing in self.active_schedules:
- self._remove_from_active_schedules(output_thing)
- elif output_thing in self.active_schedules:
- handle = self.event_loop.call_later(interval, run)
- self.active_schedules[output_thing] = handle
- output_thing._schedule(enqueue_fn=None)
- handle = self.event_loop.call_later(interval, run)
- self.active_schedules[output_thing] = handle
- output_thing._schedule(enqueue_fn=None)
- return cancel
- def schedule_sensor(self, sensor, interval, *input_thing_sequence,
- make_event_fn=make_sensor_event,
- print_downstream=False):
- """Create a OutputThing wrapper for the sensor and schedule it at the
- specified interval. Compose the specified connections (and/or thunks)
- into a sequence and connect the sequence to the sensor's OutputThing.
- Returns a thunk that can be used to remove the OutputThing from the
- scheduler.
- """
- output_thing = SensorAsOutputThing(sensor, make_event_fn=make_event_fn)
- prev = output_thing
- for s in input_thing_sequence:
- assert prev,\
- "attempted to compose a terminal InputThing/thunk in a non-final position"
- prev = _connect_thunk(prev, s)
- if print_downstream:
- output_thing.print_downstream() # just for debugging
- return self.schedule_periodic(output_thing, interval)
-
- def schedule_recurring(self, output_thing):
- """Takes a DirectOutputThingMixin and calls _observe() to get events. If,
- after the call, there are no downstream connections, the scheduler will
- deschedule the output thing.
- This variant is useful for something like an iterable. If the call to get
- the next event would block, don't use this! Instead, one of the calls
- that runs in a separate thread (e.g. schedule_recuring_separate_thread()
- or schedule_periodic_separate_thread()).
- Returns a callable that can be used to remove the OutputThing from the
- scheduler.
- """
- def cancel():
- print("canceling schedule of %s" % output_thing)
- try:
- handle = self.active_schedules[output_thing]
- except KeyError:
- raise ScheduleError("Attempt to de-schedule OutputThing %s, which does not have an active schedule" %
- output_thing)
- handle.cancel()
- self._remove_from_active_schedules(output_thing)
- def run():
- assert output_thing in self.active_schedules
- output_thing._observe()
- more = output_thing._has_connections()
- if not more and output_thing in self.active_schedules:
- self._remove_from_active_schedules(output_thing)
- elif output_thing in self.active_schedules:
- handle = self.event_loop.call_soon(run)
- self.active_schedules[output_thing] = handle
- output_thing._schedule(enqueue_fn=None)
- handle = self.event_loop.call_soon(run)
- self.active_schedules[output_thing] = handle
- output_thing._schedule(enqueue_fn=None)
- return cancel
- def schedule_on_main_event_loop(self, output_thing):
- """Schedule an OutputThing that runs on the main event loop.
- The OutputThing is assumed to implement EventLoopOutputThingMixin.
- Returns a callable that can be used to unschedule the OutputThing.
- """
- def stop():
- # tell the OutputThing to stop. When the OutputThing has finished
- # processing any messages, it MUST call
- # _remove_from_active_schedules() on the scheduler.
- output_thing._stop_loop()
- self.active_schedules[output_thing] = stop
- self.event_loop.call_soon(output_thing._observe_event_loop)
- return stop
-
- def schedule_on_private_event_loop(self, output_thing):
- """Schedule an OutputThing that has its own event loop on another thread.
- The OutputThing is assumed to implement EventLoopOutputThingMixin.
- Returns a callable that can be used to unschedule the OutputThing, by
- requesting that the event loop stop.
- """
- def enqueue_fn(fn, *args):
- self.event_loop.call_soon_threadsafe(fn, *args)
- def thread_main():
- try:
- output_thing._schedule(enqueue_fn=enqueue_fn)
- # ok, lets run the event loop
- output_thing._observe_event_loop()
- except Exception as e:
- msg = "Event loop for %s exited with error" % output_thing
- logger.exception(msg)
- def die(): # need to stop the scheduler in the main loop
- del self.active_schedules[output_thing]
- raise ScheduleError(msg) from e
- self.event_loop.call_soon_threadsafe(die)
- else:
- def loop_done():
- self._remove_from_active_schedules(output_thing)
- self.event_loop.call_soon_threadsafe(loop_done)
-
- t = threading.Thread(target=thread_main)
- self.active_schedules[output_thing] = output_thing._stop_loop
- self.event_loop.call_soon(t.start)
- return output_thing._stop_loop
- def schedule_periodic_on_separate_thread(self, output_thing, interval):
- """Schedule an OutputThing to run in a separate thread. It should
- implement the DirectOutputThingMixin.
- Returns a callable that can be used to unschedule the OutputThing, by
- requesting that the child thread stop.
- """
- t = _ThreadForBlockingOutputThing(output_thing, interval, self)
- self.active_schedules[output_thing] = t._stop_loop
- self.event_loop.call_soon(t.start)
- return t._stop_loop
- def schedule_sensor_on_separate_thread(self, sensor, interval, *input_thing_sequence,
- make_event_fn=make_sensor_event):
- """Create a OutputThing wrapper for the sensor and schedule it at the
- specified interval. Compose the specified connections (and/or thunks)
- into a sequence and connect the sequence to the sensor's OutputThing.
- Returns a thunk that can be used to remove the OutputThing from the
- scheduler.
- """
- output_thing = SensorAsOutputThing(sensor, make_event_fn=make_event_fn)
- prev = output_thing
- for s in input_thing_sequence:
- assert prev,\
- "attempted to compose a terminal InputThing/thunk in a non-final position"
- prev = _connect_thunk(prev, s)
- return self.schedule_periodic_on_separate_thread(output_thing, interval)
-
- def schedule_later_one_time(self, output_thing, interval):
- def cancel():
- print("canceling schedule of %s" % output_thing)
- try:
- handle = self.active_schedules[output_thing]
- except KeyError:
- raise ScheduleError("Attempt to de-schedule OutputThing %s, which does not have an active schedule" %
- output_thing)
- handle.cancel()
- self._remove_from_active_schedules(output_thing)
- def run():
- assert output_thing in self.active_schedules
- # Remove from the active schedules since this was a one-time schedule.
- # Note that the _observe() call could potentially reschedule the
- # OutputThing through another call to the scheduler.
- self._remove_from_active_schedules(output_thing)
- output_thing._observe()
- handle = self.event_loop.call_later(interval, run)
- self.active_schedules[output_thing] = handle
- output_thing._schedule(enqueue_fn=None)
- return cancel
-
- def run_forever(self):
- """Call the event loop's run_forever(). We don't really run forever:
- the event loop is exited if we run out of scheduled events or if stop()
- is called.
- """
- try:
- self.event_loop.run_forever()
- except KeyboardInterrupt:
- # If someone hit Control-C to break out of the loop,
- # they might be trying to diagonose a hang. Print the
- # active OutputThings here before passing on the interrupt.
- print("Active OutputThings: %s" %
- ', '.join([('%s'%o) for o in self.active_schedules.keys()]))
- raise
- if self.fatal_error is not None:
- raise ScheduleError("Scheduler aborted due to fatal error") \
- from self.fatal_error
- def _schedule_coroutine(self, coro, done_callback):
- """This is for low-level components that deal directly with
- the event loop to to schedule a coroutine. We
- track them so we can either wait for or cancel them when stop()
- is called.
- """
- fid = self.next_future_id
- future = self.event_loop.create_task(coro)
- # the combined callback. To avoid race conditions, always
- # call the provided done callback before we remove the future.
- def cb(f):
- done_callback(f)
- del self.pending_futures[fid]
- self.pending_futures[fid] = future
- future.add_done_callback(cb)
- self.next_future_id += 1
- return future
-
- def stop(self):
- """Stop any active schedules for output things and then call stop() on
- the event loop.
- """
- for (task, handle) in self.active_schedules.items():
- #print("Stopping %s" % task)
- # The handles are either event scheduler handles (with a cancel
- # method) or just callables to be called directly.
- if hasattr(handle, 'cancel'):
- handle.cancel()
- else:
- handle()
- self.active_schedules = {}
- # go through the pending futures. We don't stop the
- # event loop until all the pending futures have been
- # completed or stopped by their callers.
- for (fid, f) in self.pending_futures.items():
- if f.done() == False:
- # if we still have pending futures, we try the
- # stop again after the first one we see has
- # completed.
- #print("Waiting for future %d (%s)" % (fid, repr(f)))
- def recheck_stop(f):
- exc = f.exception()
- if exc:
- raise FatalError("Exception in coroutine %s" % repr(f)) from exc
- else:
- self.stop()
- f.add_done_callback(recheck_stop)
- return
- elif f.exception():
- raise FatalError("Exception in coroutine %s" % repr(f)) from exc
- self.event_loop.stop()
|