Key Concepts
Flowno is a tool for making dataflow programs[1]. It shares many concepts with reactive programming[2]. A flowno flow follows some simple behavior:
When a node evaluates, it triggers all of its dependent downstream nodes to evaluate next given the new data.
Before a node evaluates, it recursively ensures all of its inputs are newer than its most recent evaluation, waiting until its inputs are fresh.
Cycles or feedback loops in the flow are bootstrapped by default argument values.
The entire flow is kept in lockstep by the internal, incrementing, per-node, generation value. See Developer Guide for more details.
Stateless Nodes
The @node
decorator transforms an async
function
into a DraftNode
subclass factory. When you call the decorated
function, it returns a DraftNode
instance configured with the
connections to other nodes or constant values.
@node
async def Sum(x, y):
return x + y
# Sum is a class factory. Create two draft-node instances.
sum_node1 = Sum(1, 2) # Will always compute 1 + 2
sum_node2 = Sum() # Must be connected to other nodes before running
Stateful Nodes
Nodes can also be stateful—maintaining internal state across multiple calls.
This is useful for remembering previous inputs or outputs. Class variables set
inside @node
decorated classes are copied to the instances, similar
to dataclasses.
from flowno import node
@node
class RollingSum:
total = 0
async def call(self, x):
self.total += x
return self.total
# rolling_sum has a private `total` attribute.
rolling_sum = RollingSum(10) # Returns 10 the first time, 20 the second time, etc.
FlowHDL Context Manager
The FlowHDL
context is where you define how nodes connect to each other. It
also “finalizes” the nodes, replacing DraftNode
instances with
fully specified Node
instances.
with FlowHDL() as f:
f.node_x = Sum(1, 2)
f.node_y = Sum(f.node_x, 3)
# here, f.node_x and f.node_y are DraftNode instances
# here, f.node_x and f.node_y are now Node instances
f.run_until_complete()
f.node_y.get_data() # Returns (6,)
The FlowHDL
context allows referencing nodes before they are defined, which
is necessary for describing cyclic dependencies.
with FlowHDL() as f:
f.node_y = Sum(f.node_x, 3) # This is fine
f.node_x = Sum(1, 2)
f.run_until_complete()
f.node_y.get_data() # Still returns (6,)
Cycle Breaking
Consider a simple cycle sketched out below:
We can describe it in Flowno, no problem! That is the purpose of the somewhat
awkward with FlowHDL() as f:
block.
from flowno import node, FlowHDL, TerminateLimitReached
@node
async def Increment(value: int) -> int:
return value + 1
@node
async def Double(value: int) -> int:
return value * 2
with FlowHDL() as f:
f.a = Increment(f.b)
f.b = Double(f.a)
f.run_until_complete()
However, there is a problem. Flowno doesn’t know which node should be executed first, or what the initial ‘bootstrapped’ value should be. If we run this, we get the following exception:
Traceback (most recent call last):
...
flowno.core.node_base.MissingDefaultError: Detected a cycle without default values. You must add defaults to the indicated arguments for at least ONE of the following nodes:
Double#0 must have defaults for EACH/ALL the underlined parameters:
Defined at <doctest default[0]>:7
Full Signature:
async def Double(value: int)
----------
OR
Increment#0 must have defaults for EACH/ALL the underlined parameters:
Defined at <doctest default[0]>:3
Full Signature:
async def Increment(value: int)
----------
The exception is trying to tell you that you need to break the cycle somewhere
by adding a default value to Double
or Increment
. I’ll
add a default value of 0 to the increment node, then add some print statements
so we can see what is happening. Finally, I’ll use the
stop_at_node_generation
for testing to stop the flow if any node’s generation
exceeds some arbitrary value.
from flowno import node, FlowHDL, TerminateLimitReached
@node
async def Increment(value: int = 0) -> int:
print(f"Increment({value}) => {value+1}")
return value + 1
@node
async def Double(value: int) -> int:
print(f"Double({value}) => {value*2}")
return value * 2
with FlowHDL() as f:
f.a = Increment(f.b)
f.b = Double(f.a)
try:
f.run_until_complete(stop_at_node_generation=(3,))
except TerminateLimitReached:
print("Finished Normally")
Increment(0) => 1
Double(1) => 2
Increment(2) => 3
Double(3) => 6
Increment(6) => 7
Double(7) => 14
Increment(14) => 15
Finished Normally
As you can see, the cycle starts with Increment(0)
, runs through the loop a
couple of times then finishes. You probably shouldn’t use
stop_at_node_generation
outside of testing, instead explicitly raising an
exception when you want a cyclic flow to terminate.
TODO
Explain Streaming inputs vs outputs
Explain Streaming node vs mononode
Read More