flowno.core.flow.flow

Flow execution and graph resolution module for Flowno.

This module contains the Flow class, which is the core execution engine for dataflow graphs. It manages node scheduling, dependency resolution, cycle breaking, and concurrent execution.

Key components:
  • Flow: The main dataflow graph execution engine

  • FlowEventLoop: A custom event loop for handling Flow-specific commands

  • NodeTaskStatus: State tracking for node execution

class flowno.core.flow.flow.Flow(is_finalized: bool = True)[source]

Dataflow graph execution engine.

The Flow class manages the execution of a dataflow graph, handling dependency resolution, node scheduling, and cycle breaking. It uses a custom event loop to execute nodes concurrently while respecting data dependencies.

Key features:
  • Automatic dependency-based scheduling

  • Cycle detection and resolution

  • Support for streaming data (run levels)

  • Concurrency management

unvisited

List of nodes that have not yet been visited during execution

Type:

list[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]]]

visited

Set of nodes that have been visited

Type:

set[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]]]

node_tasks

Dictionary mapping nodes to their tasks and status

Type:

dict[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]], flowno.core.flow.flow.NodeTaskAndStatus]

running_nodes

Set of nodes currently running

Type:

set[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]]]

resolution_queue

Queue of nodes waiting to be resolved

Type:

flowno.core.event_loop.queues.AsyncSetQueue[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]]]

_condensed_tree(head: FinalizedNode[Unpack, tuple[object, ...]]) SuperNode[source]

Build a condensed graph of strongly connected components (SCCs) from stale connections.

This method implements Tarjan’s algorithm to find strongly connected components (cycles) in the dependency graph, but only following connections that are “stale” (where the input’s generation is <= the node’s generation).

Parameters:

head – The starting point for building the condensed graph

Returns:

A SuperNode representing the root of the condensed graph

async _enqueue_node(node: FinalizedNode[Unpack, tuple[object, ...]])[source]

Enqueue a single node for resolution.

Parameters:

node – The node to enqueue

async _enqueue_output_nodes(out_node: FinalizedNode[Unpack, tuple[object, ...]])[source]

Enqueue all nodes that depend on the given node.

Parameters:

out_node – The node whose dependents should be enqueued

_find_leaf_supernodes(root: SuperNode) list[SuperNode][source]

Identify all leaf supernodes in the condensed DAG. Leaf supernodes are those with no dependencies.

Returns:

A list of all leaf supernodes in the graph.

Return type:

list[SuperNode]

_find_node_solution(node: FinalizedNode[Unpack, tuple[object, ...]]) list[FinalizedNode[Unpack, tuple[object, ...]]][source]

Find the nodes that are ultimately preventing the given node from running.

This method is key to Flowno’s cycle resolution algorithm. It:
  1. Builds a condensed graph of strongly connected components (SCCs)

  2. Finds the leaf SCCs in this condensed graph

  3. For each leaf SCC, picks a node to force evaluate based on default values

Parameters:

node – The node whose dependencies need to be resolved

Returns:

A list of nodes that should be forced to evaluate to unblock the given node

Raises:

MissingDefaultError – If a cycle is detected with no default values to break it

async _handle_async_generator_node(node: FinalizedNode[Unpack, tuple[object, ...]], returned: AsyncGenerator[tuple[object, ...], None])[source]

Handle a node that returns an async generator (streaming output).

This processes each yielded item from the generator, storing them as run level 1 data, and accumulates them for the final run level 0 result when the generator completes.

Parameters:
  • node – The node to handle

  • returned – The async generator returned by the node’s call

async _handle_coroutine_node(node: FinalizedNode[Unpack, tuple[object, ...]], returned: Awaitable[tuple[object, ...]])[source]

Handle a node that returns a coroutine (single output).

This awaits the result of the node’s coroutine and stores the result in the node’s data.

Parameters:
  • node – The node to handle

  • returned – The coroutine returned by the node’s call

_mark_node_as_visited(node: FinalizedNode[Unpack, tuple[object, ...]])[source]

Mark a node as visited during the resolution process.

Parameters:

node – The node to mark as visited

_node_resolve_loop(stop_at_node_generation: dict[FinalizedNode[Unpack, tuple[object, ...]], tuple[int, ...] | None] | tuple[int, ...] | None, terminate_on_node_error: bool)[source]

Main resolution loop for the flow.

This function implements the core algorithm for resolving node dependencies and executing nodes in the correct order. It:

  1. Picks an initial node

  2. For each node in the resolution queue:
    1. Finds the set of nodes that must be executed first

    2. Marks those nodes as visited

    3. Resumes their execution

  3. Continues until the resolution queue is empty

Parameters:
  • stop_at_node_generation – Generation limit for nodes

  • terminate_on_node_error – Whether to terminate on node errors

_pick_node_to_force_evaluate(leaf_supernode: SuperNode) FinalizedNode[Unpack, tuple[object, ...]][source]

Pick a node to force evaluate according to the cycle breaking heuristic.

Parameters:

leaf_supernode (SuperNode) – The leaf Super-Node of the Condensed subgraph.

Returns:

The node to force evaluate.

Return type:

FinalizedNode[Unpack[tuple[object, …]], tuple[object, …]]

Undefined Behavior:

If the argument is not a leaf in the condensed graph, the behavior is undefined.

_register_node(node: FinalizedNode[Unpack, tuple[object, ...]])[source]

Register a node’s task with the flow.

This creates the persistent task for the node and adds it to the node_tasks dictionary.

Parameters:

node – The node to register

async _terminate_if_reached_limit(node: FinalizedNode[Unpack, tuple[object, ...]])[source]

Check if a node has reached its generation limit and terminate if so.

Parameters:

node – The node to check

Raises:

TerminateLimitReached – If the node reached its generation limit

add_node(node: FinalizedNode[Unpack, tuple[Any, ...]])[source]

Add a node to the flow.

Parameters:

node – The node to add

add_nodes(nodes: list[FinalizedNode[Unpack, tuple[Any, ...]]])[source]

Add multiple nodes to the flow.

Parameters:

nodes – The nodes to add

clear_defaulted_inputs(node: FinalizedNode[Unpack, tuple[object, ...]]) None[source]

Remove defaulted input information for a node.

Parameters:

node – The node to clear defaulted inputs for

evaluate_node(node: FinalizedNode[Unpack, tuple[object, ...]]) Never[source]

The persistent task that evaluates a node.

This is the main execution function for a node. It:
  1. Waits for the node to be ready to run

  2. Gathers inputs and handles defaulted values

  3. Calls the node with its inputs

  4. Processes the result (either coroutine or async generator)

  5. Propagates outputs to dependent nodes

  6. Repeats

Parameters:

node – The node to evaluate

Returns:

Never returns; runs as a persistent coroutine

Raises:

NotImplementedError – If the node does not return a coroutine or async generator

is_input_defaulted(node: FinalizedNode[Unpack, tuple[object, ...]], input_port: InputPortIndex) bool[source]

Check if a specific input port is using a default value.

Parameters:
  • node – The node to check

  • input_port – The input port index to check

Returns:

True if the input port is using a default value, False otherwise

run_until_complete(stop_at_node_generation: dict[FinalizedNode[Unpack, tuple[Any, ...]] | DraftNode[Unpack, tuple[Any, ...]], tuple[int, ...] | None] | tuple[int, ...] | None = (), terminate_on_node_error: bool = False, _debug_max_wait_time: float | None = None)[source]

Execute the flow until completion or until a termination condition is met.

This is the main entry point for running a flow. It starts the resolution process and runs until all nodes have completed or a termination condition (like reaching a generation limit or an error) is met.

Parameters:
  • stop_at_node_generation – Generation limit for nodes, either as a global limit or as a dict mapping nodes to their individual limits

  • terminate_on_node_error – Whether to terminate the flow if a node raises an exception

  • _debug_max_wait_time – Maximum time in seconds to wait for I/O operations (useful for debugging)

Raises:
set_defaulted_inputs(node: FinalizedNode[Unpack, tuple[object, ...]], defaulted_inputs: list[InputPortIndex]) None[source]

Mark specific inputs of a node as using default values.

When a node uses default values for inputs that are part of a cycle, this method records that information and increments the stitch level to prevent infinite recursion.

Parameters:
  • node – The node with defaulted inputs

  • defaulted_inputs – List of input port indices using default values

set_node_status(node: FinalizedNode[Unpack, tuple[object, ...]], status: Ready | Running | Error | Stalled) None[source]

Update the status of a node and notify instrumentation.

Parameters:
  • node – The node whose status is being updated

  • status – The new status to set

class flowno.core.flow.flow.FlowEventLoop(flow: Flow)[source]
_handle_command(current_task_packet: tuple[Coroutine[Command, Any, Any], Any, Exception | None], command: Command) bool[source]

Handle the command yielded by the current task.

Returns True if the command was successfully handled.

exception flowno.core.flow.flow.NodeExecutionError(node: FinalizedNode[Unpack, tuple[object, ...]])[source]

Exception raised when a node execution fails.

class flowno.core.flow.flow.NodeTaskAndStatus(task: Coroutine[Command, object, Never], status: Ready | Running | Error | Stalled)[source]

Container for a node’s task and its current status.

_asdict()

Return a new dict which maps field names to their values.

classmethod _make(iterable)

Make a new NodeTaskAndStatus object from a sequence or iterable

_replace(**kwds)

Return a new NodeTaskAndStatus object replacing specified fields with new values

status: Ready | Running | Error | Stalled

Alias for field number 1

task: Coroutine[Command, object, Never]

Alias for field number 0

class flowno.core.flow.flow.NodeTaskStatus[source]

Represents the possible states of a node’s task within the flow execution.

States:
  • Running: The node is currently executing.

  • Ready: The node is ready to execute but not yet running.

  • Error: The node encountered an error during execution.

  • Stalled: The node is blocked waiting on input data.

class Error[source]

Node encountered an error during execution.

class Ready[source]

Node is ready to be executed.

class Running[source]

Node is actively executing.

class Stalled(stalling_input: FinalizedInputPortRef[object])[source]

Node is stalled waiting for input data.

class flowno.core.flow.flow.ResumeNodeCommand(node: FinalizedNode[Unpack, tuple[object, ...]])[source]

Command to resume a node’s execution.

exception flowno.core.flow.flow.TerminateLimitReached[source]

Exception raised when a node reaches its generation limit.

class flowno.core.flow.flow.TerminateReachedLimitCommand[source]

Command to terminate the flow because a node reached its generation limit.

class flowno.core.flow.flow.TerminateWithExceptionCommand(node: FinalizedNode[Unpack, tuple[object, ...]], exception: Exception)[source]

Command to terminate the flow with an exception.

class flowno.core.flow.flow.WaitForStartNextGenerationCommand(node: FinalizedNode[Unpack, tuple[object, ...]], run_level: int = 0)[source]

Command to wait for a node to start its next generation.

flowno.core.flow.flow._resume_node(node: FinalizedNode[Unpack, tuple[object, ...]]) Generator[ResumeNodeCommand, None, None][source]

Resume the concurrent node task. Does not guarantee that the node will resume if already running.

flowno.core.flow.flow._terminate_reached_limit() Generator[TerminateReachedLimitCommand, None, None][source]

Coroutine that yields a command to terminate when a generation limit is reached.

flowno.core.flow.flow._terminate_with_exception(node: FinalizedNode[Unpack, tuple[object, ...]], exception: Exception) Generator[TerminateWithExceptionCommand, None, None][source]

Coroutine that yields a command to terminate with an exception.

flowno.core.flow.flow._wait_for_start_next_generation(node: FinalizedNode[Unpack, tuple[object, ...]], run_level: int = 0) Generator[WaitForStartNextGenerationCommand, None, None][source]

Coroutine that yields a command to wait for a node’s next generation.