Key Concepts

Flowno is a tool for making dataflow programs[1]. It shares many concepts with reactive programming[2]. A flowno flow follows some simple behavior:

  1. When a node evaluates, it triggers all of its dependent downstream nodes to evaluate next given the new data.

  2. Before a node evaluates, it recursively ensures all of its inputs are newer than its most recent evaluation, waiting until its inputs are fresh.

  3. Cycles or feedback loops in the flow are bootstrapped by default argument values.

The entire flow is kept in lockstep by the internal, incrementing, per-node, generation value. See Developer Guide for more details.

Stateless Nodes

The @node decorator transforms an async function into a DraftNode subclass factory. When you call the decorated function, it returns a DraftNode instance configured with the connections to other nodes or constant values.

@node
async def Sum(x, y):
    return x + y

# Sum is a class factory. Create two draft-node instances.
sum_node1 = Sum(1, 2)  # Will always compute 1 + 2
sum_node2 = Sum()      # Must be connected to other nodes before running

Stateful Nodes

Nodes can also be stateful—maintaining internal state across multiple calls. This is useful for remembering previous inputs or outputs. Class variables set inside @node decorated classes are copied to the instances, similar to dataclasses.

from flowno import node

@node
class RollingSum:
    total = 0

    async def call(self, x):
        self.total += x
        return self.total

# rolling_sum has a private `total` attribute.
rolling_sum = RollingSum(10)  # Returns 10 the first time, 20 the second time, etc.

FlowHDL Context Manager

The FlowHDL context is where you define how nodes connect to each other. It also “finalizes” the nodes, replacing DraftNode instances with fully specified Node instances.

with FlowHDL() as f:
    f.node_x = Sum(1, 2)
    f.node_y = Sum(f.node_x, 3)
    # here, f.node_x and f.node_y are DraftNode instances

# here, f.node_x and f.node_y are now Node instances

f.run_until_complete()
f.node_y.get_data()  # Returns (6,)

The FlowHDL context allows referencing nodes before they are defined, which is necessary for describing cyclic dependencies.

with FlowHDL() as f:
    f.node_y = Sum(f.node_x, 3)  # This is fine
    f.node_x = Sum(1, 2)

f.run_until_complete()
f.node_y.get_data()  # Still returns (6,)

Cycle Breaking

Consider a simple cycle sketched out below:

../_images/a03073c855a762f1477310430dcb4a5c63e5ce9f8b45bd9a76d63dd3881b2e55.svg

We can describe it in Flowno, no problem! That is the purpose of the somewhat awkward with FlowHDL() as f: block.

from flowno import node, FlowHDL, TerminateLimitReached

@node
async def Increment(value: int) -> int:
    return value + 1

@node
async def Double(value: int) -> int:
    return value * 2

with FlowHDL() as f:
    f.a = Increment(f.b)
    f.b = Double(f.a)

f.run_until_complete()

However, there is a problem. Flowno doesn’t know which node should be executed first, or what the initial ‘bootstrapped’ value should be. If we run this, we get the following exception:

Traceback (most recent call last):
...
flowno.core.node_base.MissingDefaultError: Detected a cycle without default values. You must add defaults to the indicated arguments for at least ONE of the following nodes:
  Double#0 must have defaults for EACH/ALL the underlined parameters:
  Defined at <doctest default[0]>:7
  Full Signature:
  async def Double(value: int)
                   ----------
OR
  Increment#0 must have defaults for EACH/ALL the underlined parameters:
  Defined at <doctest default[0]>:3
  Full Signature:
  async def Increment(value: int)
                      ----------

The exception is trying to tell you that you need to break the cycle somewhere by adding a default value to Double or Increment. I’ll add a default value of 0 to the increment node, then add some print statements so we can see what is happening. Finally, I’ll use the stop_at_node_generation for testing to stop the flow if any node’s generation exceeds some arbitrary value.

from flowno import node, FlowHDL, TerminateLimitReached

@node
async def Increment(value: int = 0) -> int:
    print(f"Increment({value}) => {value+1}")
    return value + 1

@node
async def Double(value: int) -> int:
    print(f"Double({value}) => {value*2}")
    return value * 2

with FlowHDL() as f:
    f.a = Increment(f.b)
    f.b = Double(f.a)

try:
    f.run_until_complete(stop_at_node_generation=(3,))
except TerminateLimitReached:
    print("Finished Normally")
Increment(0) => 1
Double(1) => 2
Increment(2) => 3
Double(3) => 6
Increment(6) => 7
Double(7) => 14
Increment(14) => 15
Finished Normally

As you can see, the cycle starts with Increment(0), runs through the loop a couple of times then finishes. You probably shouldn’t use stop_at_node_generation outside of testing, instead explicitly raising an exception when you want a cyclic flow to terminate.

TODO

Explain Streaming inputs vs outputs

Explain Streaming node vs mononode

Read More