Flowno Documentation

Welcome to the Flowno documentation! Flowno is an embedded python DSL for making concurrent, cyclic, branching, and streaming dataflow programs. It combines a functional node definition syntax (@node) with a declarative flow descriptions (with FlowHDL() as f:). Though still a work in progress, my goal is to make LLM agent design easy and modular.

Hint

Almost all the classes, types and functions referenced in this documentation are internal or external hyperlinks. Try clicking on them to explore! Ex: Coroutine

Features

Dataflow Evaluation

Automatically schedules nodes based on data dependencies.

Streaming Data

Uses asynchronous generators for pipelining the processing of partial results.

Custom Event Loop

Uses async/await syntax for non-blocking execution and concurrency. All nodes run concurrently by default (subject to data dependency constraints).

Declarative Connections

Use the FlowHDL context manager to describe complex graphs, including cycles.

Warning

Flowno does not use and is not compatible with the built-in asyncio event loop. If you see import asyncio in your code, you are mixing concurrency models, which is likely incorrect. Ensure you understand Python’s async/await syntax at a high level—but do not expect Flowno nodes to run concurrently with asyncio tasks. (This may change in the future.)

Below is a quick demonstration of Flowno’s node syntax, streaming, concurrency, and the FlowHDL description context:

from flowno import node, FlowHDL, sleep, Stream, azip

@node
async def Range(start: int, end: int):
    print(f"Range started: {start}..{end}")
    for i in range(start, end):
        await sleep(1.0)
        yield i

@node(stream_in=["left", "right"])
async def AddPiecewise(left: Stream[int], right: Stream[int]):
    async for l_item, r_item in azip(left, right):
        print(f"{l_item} + {r_item}")
        yield l_item + r_item

@node(stream_in=["stream"])
async def Sum(stream: Stream[int]) -> int:
    total = 0
    async for item in stream:
        print(f"{item}")
        await sleep(1.0)
        total += item
    print(f"total: {total}")
    return total

def main():
    with FlowHDL() as f:
        f.range_a = Range(0, 3)
        f.range_b = Range(100, 103)
        f.total = Sum(AddPiecewise(f.range_a, f.range_b))
    f.run_until_complete()  # takes 4 seconds to complete

main()

Should produce the following output over 4 seconds:

Range started: 0..3
Range started: 100..103
0 + 100
100
1 + 101
102
2 + 102
104
total: 306

Naming Convention

The function names here are capitalized because @node decorated functions behave similarly to classes (class factories).

Next Steps

License

Flowno is licensed under the MIT License.