Source code for seamonsters.generators

__author__ = "seamonsters"

import itertools, logging

logger = logging.getLogger("seamonsters")

class ParallelSignal:
    """
    A signal that can be returned from a generator in a parallel group to
    trigger an action on the group.
    """

class StopParallelSignal(ParallelSignal):
    """
    Value to signal that a group of parallel commands should be stopped.
    """

[docs] def __init__(self, value=None): self.value = value
class AddParallelSignal(ParallelSignal): """ Value to signal that a new generator should be added to the group of parallel commands. """
[docs] def __init__(self, iterable): self.iterable = iterable
def sequence(*iterables): """ Run a set of iterables sequentially """ return itertools.chain(*iterables) def parallel(*iterables): """ Run a group of iterables in parallel. Ends when none are left running. An iterable can yield or return a ParallelSignal to trigger an action. """ iterables = list(iterables) try: while len(iterables) != 0: toRemove = [ ] for iter in iterables: try: result = next(iter) except StopIteration as e: result = e.value toRemove.append(iter) except BaseException as e: logger.exception("A parallel action crashed", exc_info=e) result = None toRemove.append(iter) if isinstance(result, ParallelSignal): if isinstance(result, StopParallelSignal): return result.value elif isinstance(result, AddParallelSignal): iterables.append(result.iterable) for iter in toRemove: iterables.remove(iter) yield finally: for iter in iterables: iter.close() def wait(time): """ Wait for a certain number of iterations. """ for _ in range(int(time)): yield def forever(): """ Iterate forever. """ while True: yield def timeLimit(iterable, time): """ Run the iterable until it finishes or the given time limit has passed. Return the value of the iterable if it ends early, None otherwise. """ return itertools.islice(iterable, time) def untilTrue(iterable): """ Run the iterable until it yields True, then stop. """ return itertools.takewhile(lambda x: not x, iterable) def _ensureBool(iterable, requiredCount, b): try: count = 0 while True: try: value = next(iterable) except StopIteration as e: return e.value yield value if value == b: count += 1 else: count = 0 if count >= requiredCount: return b finally: iterable.close() def ensureTrue(iterable, requiredCount): """ Wait until the iterable yields True for a certain number of consecutive iterations before finishing. :return: the return value of the iterable if it exits early, True otherwise """ return (yield from _ensureBool(iterable, requiredCount, True)) def ensureFalse(iterable, requiredCount): """ Wait until the iterable yields False for a certain number of consecutive iterations before finishing. :return: the return value of the iterable if it exits early, False otherwise (note difference from ``ensureTrue``) """ return (yield from _ensureBool(iterable, requiredCount, False)) def returnValue(iterable, value): """ Run an iterable but change the return value. :return: value """ yield from iterable return value def stopAllWhenDone(iterable): """ If run in a ``sea.parallel`` block, when the iterable completes all parallel commands will be stopped. """ value = yield from iterable return StopParallelSignal(value) class State: """ An action to run in a StateMachine. """
[docs] def __init__(self, function, subMachine=None): """ :param function: A function with no arguments that returns a generator. If the generator returns another State, that State will be pushed to the stack. Otherwise the State will be popped when it completes. :param subMachine: Optional, another StateMachine to run in parallel with this State. Allows States to have nested sub-States. """ self.function = function self.subMachine = subMachine
[docs] def runState(self): if self.subMachine is None: yield from self.function() else: yield from parallel( stopAllWhenDone(self.function()), self.subMachine.updateGenerator())
IDLE_STATE = State(forever) class StateMachine: """ Implementation of a Pushdown Automaton. Has one state always running at a time, and keeps track of a stack of states. """
[docs] def __init__(self): self.stateStack = [] self._cancelState = False
[docs] def currentState(self): """ Get the current running state. If the state stack is empty, IDLE_STATE is the current state. """ if len(self.stateStack) == 0: return IDLE_STATE return self.stateStack[-1]
[docs] def updateGenerator(self): """ Generator to update the state machine. """ while True: self._cancelState = False yield from parallel( self._watchForCancelGenerator(), self._runCurrentState())
def _runCurrentState(self): ret = yield from self.currentState().runState() if isinstance(ret, State): self.stateStack.append(ret) else: self.stateStack.pop() return StopParallelSignal()
[docs] def clear(self): """ Cancel the current state and clear the stack. """ self.stateStack.clear() self._cancelState = True
[docs] def push(self, state): """ Cancel the current running State and push a new State to the stack. """ self.stateStack.append(state) self._cancelState = True
[docs] def pop(self): """ Cancel the current running State and pop it. Run the State below it on the stack. """ if len(self.stateStack) != 0: self.stateStack.pop() self._cancelState = True
[docs] def replace(self, state): """ Cancel the current running state and replace it with a new one. """ self.pop() self.push(state)
[docs] def runUntilStopped(self, state): """ Push a state and wait until the state is either cancelled by pushing or popping, or it exits normally. """ self.push(state) while self.currentState() == state: yield
[docs] def runUntilPopped(self, state): """ Push a state and wait until the state is popped from the stack. """ self.push(state) stackLen = len(self.stateStack) while len(self.stateStack) >= stackLen: yield
def _watchForCancelGenerator(self): while not self._cancelState: yield return StopParallelSignal()