Flowno Internals: Flow Execution and Resolution Mechanics

This document provides a technical deep dive into the internal mechanics of Flowno, focusing on how flows are executed, how data dependencies are resolved, and how cycles are handled. It is intended for library contributors and developers debugging complex flows. If you’re new to Flowno, start with the README for an overview of the library’s features and Walkthrough for a step-by-step guide to the process of constructing a non-trivial flow.

1. Flow Overview

1.1 Dataflow Model

Flowno is built around a dataflow programming model, where computations are represented as nodes connected by data dependencies. Nodes are executed when their inputs are ready, and their outputs trigger downstream nodes. This model enables concurrency, streaming, and cyclic dependencies. In a tangled web of nodes, Flowno schedules the concurrent execution of nodes in the optimal way to preserve the proper ordering of each node’s neighbors.

1.2 Key Components

  • Nodes: The basic units of computation. Nodes can be stateless (pure functions) or stateful (classes with internal state). They are defined with the @node decorator and finalized by the FlowHDL context.

  • FlowHDL Context: A declarative way to construct and connect nodes. Nodes are instantiated and connected within a with FlowHDL() as f: block.

  • Event Loop: The concurrency is provided by a custom event loop, not threads or asyncio. It is a completely separate concurrency model. This is emphasized because Flowno uses Python’s async/await syntax similarly to Asyncio, but is incompatible with Asyncio. (see Common Pitfalls)

1.3 Types and Constructing Nodes

Nodes are defined using the @node decorator on async functions. By convention, they are capitalized like classes because they behave like classes, constructing NodeBase subclassed objects.

@node
async def SomeNode(string_input: str):
    # do something
    return 42

reveal_type(SomeNode)  # type[DraftNode1[str, tuple[int]]]
a = SomeNode("hello")  # create some nodes; does not execute
b = SomeNode("worlds")
reveal_type(a)  # DraftNode1[str, tuple[int]]

DraftNode1 is a generic type only visible during typechecking.

1.4 Connecting Nodes during Instantiation

These node objects are connected together by passing dependencies into the node constructor. The type checker reports when incompatible nodes are connected. For example, SomeNode(SomeNode("hello")) tries to pass an integer output (42) to a node expecting a string.

@node
async def SomeIntNode(int_input: int):
    # do something with the input
    pass
    # returns nothing

reveal_type(SomeIntNode)  # type[DraftNode1[int, tuple[None]]]
a = SomeNode("hello")
c = SomeIntNode(a)  # ok
d = SomeNode(a)     # type error

There is a connect method for forming connections after node instantiation, but typically you don’t need it.

1.5 FlowHDL Context

The FlowHDL context is the mechanism for defining circular dependencies without resorting to the connect method. Example:

@node
async def IntToStr(value: int):
    return str(value)

with FlowHDL() as f:
    f.a = SomeNode(f.b)
    f.b = IntToStr(f.a)

The FlowHDL returns NodePlaceholder objects when accessing an attribute that is not yet defined on the context. When the context exits, the nodes defined on the context are finalized, replacing placeholders with actual node connections. This allows you to reference a node before it is defined, which is necessary for describing cycles.

A side effect of this behavior is that statements can be in any order, similarly to the <= operator in hardware description languages. For instance:

with FlowHDL() as f:
    f.b = IntToStr(f.a)  # f.a is a placeholder here
    f.a = SomeNode(f.b)

returns a flow with identical behavior to a version in which the lines are reversed.

1.6 Executing the Flow

The flow must be “finalized” before it can run. The with block raises an exception if a nonexistent node output is connected or if a node was referenced but never defined. Exiting the block finalizes and replaces DraftNode objects with actual Node types:

with FlowHDL() as f:
    f.node_instance = MyNode()
    assert isinstance(f.node_instance, DraftNode)
# the context exits and finalizes f.node_instance
assert isinstance(f.node_instance, Node)

Starting the flow happens outside the with block:

f.run_until_complete()

If any node raises an uncaught exception, the whole flow terminates, and the exception propagates.

1.7 Some Definitions

DraftNode

A node constructed by an @node-decorated class factory or class.

Functional Node:

A node defined by an @node-decorated async function.

Stateful Node:

A node defined by an @node-decorated class.

Node:

Generally refers to any node that has been finalized by a FlowHDL context.

Mono-Node:

A node that does not stream values out.

Streaming Node / Streaming-out Node:

A node that uses the yield keyword to produce partial chunks.

Streaming-input Node:

A node that marks an input as requiring a streaming input using @node(stream_in=[...]). Such a node does not need to stream out; it could still return a single value.

2. Execution Order Intuition

2.1 Basic Rule: Dependencies Run Before Dependents

In Flowno, a node executes when its inputs (dependencies) are “fresher” than its own last run. If an input is stale, Flowno recursively attempts to execute that stale input node first. The f.run_until_complete() method picks an arbitrary node to start.

Example: Linear Chain

@node
async def MyNode0():
    ...

@node
async def MyNode(value):
    ...

with FlowHDL() as f:
    f.a = MyNode0()
    f.b = MyNode(f.a)  # f.b depends on f.a
    f.c = MyNode(f.b)  # f.c depends on f.b
f.run_until_complete()

Execution order remains consistent even if Flowno picks a different node first.

../_images/d216b9ba38fecd160c714da32e0526ba03bb6fd0d43c11dd4f6e9d25e49b95ae.svg

2.2 Basic Rule: Everything That Can Run Concurrently, Does

with FlowHDL() as f:
    f.a = MyNode()

    f.b1 = MyNode(f.a)
    f.b2 = MyNode(f.b1)
    f.c = MyNode(f.a)

When a fans out, both c and b can run in parallel. Flowno computes an activity diagram based on the data dependencies:

../_images/a64774919437e259444474b628f52635943e3e4a25a4a154a834ab007cdf50d0.svg

Internally, it behaves similarly to explicit concurrency:

from flowno import spawn

async def main():
    a_result = await my_node_work()
    async def branch_b():
        b1_result = await my_node_work(a_result)
        b2_result = await my_node_work(b1_result)
        return b2_result

    branch_b_task = await spawn(branch_b())
    branch_c_task = await spawn(my_node_work(a_result))

    b2_result = await branch_b_task.join()
    c_result = await branch_c_task.join()

2.3 Basic Rule: Cycles are Bootstrapped by Default Arguments

If you have a DAG (Directed Acyclic Graph), a topological sort suffices. But when cycles exist, you need a mechanism to “break” them. In Flowno, that mechanism is the default argument on at least one node input.

Example: Simple Feedback Loop

@node
async def MyNodeWithDefault(value="default"):
    ...

with FlowHDL() as f:
    f.a = MyNode(f.c)
    f.b = MyNodeWithDefault(f.a)
    f.c = MyNode(f.b)

In a multi-cycle network or with streaming data, Flowno’s scheduling becomes more valuable.

../_images/b369fe6db210aedea3c3dd3a1b429da8ca4503e8994002706e398a9cd7a6c7ed.svg

2.4 Basic Rule: Each Node Executes the Same Number of Times

A key consequence of Flowno’s resolution algorithm is that all nodes evaluate the same number of times. Even nodes that generate streaming data (run level 1 data) ultimately produce final data at run level 0 in lockstep.

async def MyNodeWithSelfLoop(value1, old_value=None):
    ...

with FlowHDL() as f:
    f.a = MyNodeWithDefault(f.c)
    f.b = MyNodeWithSelfLoop(f.a, f.c)
    f.c = MyNode(f.b)

    .. uml::

@startuml
title Component Diagram (Data Flows)
component """f.a""" as a <<MyNodeWithDefault>>
component """f.b""" as b <<MyNodeWithSelfLoop>>
component """f.c""" as c <<MyNode>>
a -> b
b --> c
b -> b
c -> a
@enduml

.. uml::

@startuml
hide empty description
title Activity Diagram (Execution Ordering)
state a
state b
state c
[*] -> a
a -> b
b --> c
c -> a
@enduml

2.5 Basic Rule: Streams are Pipelined

A node with a streaming output won’t continue until all consumers read its most recent output. If a consumer stalls, the producer is paused.

from flowno import Stream

@node
async def MyStreamOutNode():
    yield "Hello"
    yield " Worlds"

@node(stream_in=["words"])
async def MyStreamInOutNode(words: Stream[str]):
    async for word in words:
        yield word.upper()

@node(stream_in=["words"])
async def MyStreamInNode(words: Stream[str]):
    async for word in words:
        print(word, end="")
    print()

with FlowHDL() as f:
    f.producer = MyStreamOutNode()
    f.transform = MyStreamInOutNode(f.producer)
    f.consumer = MyStreamInNode(f.transform)
../_images/98ab256fd0f11f25ca6a43f78a60c1a368ac487f5fe7c6f8f8e8a9fe220b8723.svg

Below is the actual event flow as Flowno juggles control among these nodes:

../_images/c836354492a1ee1c6d595f4065c7ab77341f4b09d8dd11438873a6615baf42b2.svg

From a node’s perspective, it feels like:

../_images/4b4e7f7062f5d0cb865f67146b48924a4022904e41384a06d61d98e68b8ce27e.svg

3. Generation Value

3.1 Definition & Structure

The generation value is a tuple of integers (main_gen, sub_gen_1, ...) that versions the data produced by a node. It helps determine execution order and resolve dependencies. (Higher sub-generations may be used for streaming data in subflows later.)

  • main_gen: Tracks the primary execution count, e.g. (0,) for the first final data produced by a node.

  • sub_gen: Tracks nested levels for streaming or partial results, e.g. (1, 0) for the first chunk of the second run.

  • Run level: The index of the last sub-generation. Regular data is run level 0; streaming data is run level 1.

  • node.generation: A getter property that returns the highest generation produced by the node. Each run increments the generation.

3.2 Intuition

As a node yields streaming output for the first time (e.g., “H