flowno.core.flow.flow
Flow execution and graph resolution module for Flowno.
This module contains the Flow class, which is the core execution engine for dataflow graphs. It manages node scheduling, dependency resolution, cycle breaking, and concurrent execution.
- Key components:
Flow: The main dataflow graph execution engine
FlowEventLoop: A custom event loop for handling Flow-specific commands
NodeTaskStatus: State tracking for node execution
- class flowno.core.flow.flow.Flow(is_finalized: bool = True)[source]
Dataflow graph execution engine.
The Flow class manages the execution of a dataflow graph, handling dependency resolution, node scheduling, and cycle breaking. It uses a custom event loop to execute nodes concurrently while respecting data dependencies.
- Key features:
Automatic dependency-based scheduling
Cycle detection and resolution
Support for streaming data (run levels)
Concurrency management
- unvisited
List of nodes that have not yet been visited during execution
- Type:
list[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]]]
- visited
Set of nodes that have been visited
- Type:
set[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]]]
- node_tasks
Dictionary mapping nodes to their tasks and status
- Type:
dict[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]], flowno.core.flow.flow.NodeTaskAndStatus]
- running_nodes
Set of nodes currently running
- Type:
set[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]]]
- resolution_queue
Queue of nodes waiting to be resolved
- Type:
flowno.core.event_loop.queues.AsyncSetQueue[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]]]
- _condensed_tree(head: FinalizedNode[Unpack, tuple[object, ...]]) SuperNode [source]
Build a condensed graph of strongly connected components (SCCs) from stale connections.
This method implements Tarjan’s algorithm to find strongly connected components (cycles) in the dependency graph, but only following connections that are “stale” (where the input’s generation is <= the node’s generation).
- Parameters:
head – The starting point for building the condensed graph
- Returns:
A SuperNode representing the root of the condensed graph
- async _enqueue_node(node: FinalizedNode[Unpack, tuple[object, ...]])[source]
Enqueue a single node for resolution.
- Parameters:
node – The node to enqueue
- async _enqueue_output_nodes(out_node: FinalizedNode[Unpack, tuple[object, ...]])[source]
Enqueue all nodes that depend on the given node.
- Parameters:
out_node – The node whose dependents should be enqueued
- _find_leaf_supernodes(root: SuperNode) list[SuperNode] [source]
Identify all leaf supernodes in the condensed DAG. Leaf supernodes are those with no dependencies.
- _find_node_solution(node: FinalizedNode[Unpack, tuple[object, ...]]) list[FinalizedNode[Unpack, tuple[object, ...]]] [source]
Find the nodes that are ultimately preventing the given node from running.
- This method is key to Flowno’s cycle resolution algorithm. It:
Builds a condensed graph of strongly connected components (SCCs)
Finds the leaf SCCs in this condensed graph
For each leaf SCC, picks a node to force evaluate based on default values
- Parameters:
node – The node whose dependencies need to be resolved
- Returns:
A list of nodes that should be forced to evaluate to unblock the given node
- Raises:
MissingDefaultError – If a cycle is detected with no default values to break it
- async _handle_async_generator_node(node: FinalizedNode[Unpack, tuple[object, ...]], returned: AsyncGenerator[tuple[object, ...], None])[source]
Handle a node that returns an async generator (streaming output).
This processes each yielded item from the generator, storing them as run level 1 data, and accumulates them for the final run level 0 result when the generator completes.
- Parameters:
node – The node to handle
returned – The async generator returned by the node’s call
- async _handle_coroutine_node(node: FinalizedNode[Unpack, tuple[object, ...]], returned: Awaitable[tuple[object, ...]])[source]
Handle a node that returns a coroutine (single output).
This awaits the result of the node’s coroutine and stores the result in the node’s data.
- Parameters:
node – The node to handle
returned – The coroutine returned by the node’s call
- _mark_node_as_visited(node: FinalizedNode[Unpack, tuple[object, ...]])[source]
Mark a node as visited during the resolution process.
- Parameters:
node – The node to mark as visited
- _node_resolve_loop(stop_at_node_generation: dict[FinalizedNode[Unpack, tuple[object, ...]], tuple[int, ...] | None] | tuple[int, ...] | None, terminate_on_node_error: bool)[source]
Main resolution loop for the flow.
This function implements the core algorithm for resolving node dependencies and executing nodes in the correct order. It:
Picks an initial node
- For each node in the resolution queue:
Finds the set of nodes that must be executed first
Marks those nodes as visited
Resumes their execution
Continues until the resolution queue is empty
- Parameters:
stop_at_node_generation – Generation limit for nodes
terminate_on_node_error – Whether to terminate on node errors
- _pick_node_to_force_evaluate(leaf_supernode: SuperNode) FinalizedNode[Unpack, tuple[object, ...]] [source]
Pick a node to force evaluate according to the cycle breaking heuristic.
- Parameters:
leaf_supernode (SuperNode) – The leaf Super-Node of the Condensed subgraph.
- Returns:
The node to force evaluate.
- Return type:
FinalizedNode[Unpack[tuple[object, …]], tuple[object, …]]
- Undefined Behavior:
If the argument is not a leaf in the condensed graph, the behavior is undefined.
- _register_node(node: FinalizedNode[Unpack, tuple[object, ...]])[source]
Register a node’s task with the flow.
This creates the persistent task for the node and adds it to the node_tasks dictionary.
- Parameters:
node – The node to register
- async _terminate_if_reached_limit(node: FinalizedNode[Unpack, tuple[object, ...]])[source]
Check if a node has reached its generation limit and terminate if so.
- Parameters:
node – The node to check
- Raises:
TerminateLimitReached – If the node reached its generation limit
- add_node(node: FinalizedNode[Unpack, tuple[Any, ...]])[source]
Add a node to the flow.
- Parameters:
node – The node to add
- add_nodes(nodes: list[FinalizedNode[Unpack, tuple[Any, ...]]])[source]
Add multiple nodes to the flow.
- Parameters:
nodes – The nodes to add
- clear_defaulted_inputs(node: FinalizedNode[Unpack, tuple[object, ...]]) None [source]
Remove defaulted input information for a node.
- Parameters:
node – The node to clear defaulted inputs for
- evaluate_node(node: FinalizedNode[Unpack, tuple[object, ...]]) Never [source]
The persistent task that evaluates a node.
- This is the main execution function for a node. It:
Waits for the node to be ready to run
Gathers inputs and handles defaulted values
Calls the node with its inputs
Processes the result (either coroutine or async generator)
Propagates outputs to dependent nodes
Repeats
- Parameters:
node – The node to evaluate
- Returns:
Never returns; runs as a persistent coroutine
- Raises:
NotImplementedError – If the node does not return a coroutine or async generator
- is_input_defaulted(node: FinalizedNode[Unpack, tuple[object, ...]], input_port: InputPortIndex) bool [source]
Check if a specific input port is using a default value.
- Parameters:
node – The node to check
input_port – The input port index to check
- Returns:
True if the input port is using a default value, False otherwise
- run_until_complete(stop_at_node_generation: dict[FinalizedNode[Unpack, tuple[Any, ...]] | DraftNode[Unpack, tuple[Any, ...]], tuple[int, ...] | None] | tuple[int, ...] | None = (), terminate_on_node_error: bool = False, _debug_max_wait_time: float | None = None)[source]
Execute the flow until completion or until a termination condition is met.
This is the main entry point for running a flow. It starts the resolution process and runs until all nodes have completed or a termination condition (like reaching a generation limit or an error) is met.
- Parameters:
stop_at_node_generation – Generation limit for nodes, either as a global limit or as a dict mapping nodes to their individual limits
terminate_on_node_error – Whether to terminate the flow if a node raises an exception
_debug_max_wait_time – Maximum time in seconds to wait for I/O operations (useful for debugging)
- Raises:
Exception – Any exception raised by nodes and not caught
TerminateLimitReached – When a node reaches its generation limit
- set_defaulted_inputs(node: FinalizedNode[Unpack, tuple[object, ...]], defaulted_inputs: list[InputPortIndex]) None [source]
Mark specific inputs of a node as using default values.
When a node uses default values for inputs that are part of a cycle, this method records that information and increments the stitch level to prevent infinite recursion.
- Parameters:
node – The node with defaulted inputs
defaulted_inputs – List of input port indices using default values
- exception flowno.core.flow.flow.NodeExecutionError(node: FinalizedNode[Unpack, tuple[object, ...]])[source]
Exception raised when a node execution fails.
- class flowno.core.flow.flow.NodeTaskAndStatus(task: Coroutine[Command, object, Never], status: Ready | Running | Error | Stalled)[source]
Container for a node’s task and its current status.
- _asdict()
Return a new dict which maps field names to their values.
- classmethod _make(iterable)
Make a new NodeTaskAndStatus object from a sequence or iterable
- _replace(**kwds)
Return a new NodeTaskAndStatus object replacing specified fields with new values
- class flowno.core.flow.flow.NodeTaskStatus[source]
Represents the possible states of a node’s task within the flow execution.
- States:
Running: The node is currently executing.
Ready: The node is ready to execute but not yet running.
Error: The node encountered an error during execution.
Stalled: The node is blocked waiting on input data.
- class Stalled(stalling_input: FinalizedInputPortRef[object])[source]
Node is stalled waiting for input data.
- class flowno.core.flow.flow.ResumeNodeCommand(node: FinalizedNode[Unpack, tuple[object, ...]])[source]
Command to resume a node’s execution.
- exception flowno.core.flow.flow.TerminateLimitReached[source]
Exception raised when a node reaches its generation limit.
- class flowno.core.flow.flow.TerminateReachedLimitCommand[source]
Command to terminate the flow because a node reached its generation limit.
- class flowno.core.flow.flow.TerminateWithExceptionCommand(node: FinalizedNode[Unpack, tuple[object, ...]], exception: Exception)[source]
Command to terminate the flow with an exception.
- class flowno.core.flow.flow.WaitForStartNextGenerationCommand(node: FinalizedNode[Unpack, tuple[object, ...]], run_level: int = 0)[source]
Command to wait for a node to start its next generation.
- flowno.core.flow.flow._resume_node(node: FinalizedNode[Unpack, tuple[object, ...]]) Generator[ResumeNodeCommand, None, None] [source]
Resume the concurrent node task. Does not guarantee that the node will resume if already running.
- flowno.core.flow.flow._terminate_reached_limit() Generator[TerminateReachedLimitCommand, None, None] [source]
Coroutine that yields a command to terminate when a generation limit is reached.
- flowno.core.flow.flow._terminate_with_exception(node: FinalizedNode[Unpack, tuple[object, ...]], exception: Exception) Generator[TerminateWithExceptionCommand, None, None] [source]
Coroutine that yields a command to terminate with an exception.
- flowno.core.flow.flow._wait_for_start_next_generation(node: FinalizedNode[Unpack, tuple[object, ...]], run_level: int = 0) Generator[WaitForStartNextGenerationCommand, None, None] [source]
Coroutine that yields a command to wait for a node’s next generation.