flowno

Flowno: A Python DSL for building dataflow programs.

This module provides tools for creating concurrent, cyclic, and streaming dataflow programs.

Key features:
  • Node-based design with the @node decorator

  • Support for cyclic dependencies and streaming data

  • Built-in concurrency with a custom event loop

  • Type-checked node connections

Configure logging with environment variables:
  • FLOWNO_LOG_LEVEL: Set logging level (default: ERROR)

  • FLOWNO_LOG_TAG_FILTER: Filter logs by tags (default: ALL)

class flowno.AsyncQueue(maxsize: int | None = None)[source]

An asynchronous queue for the Flowno event loop.

This queue allows tasks to exchange data safely, with proper synchronization handled by the event loop. When used as an async iterator, it yields items until the queue is closed.

Parameters:

maxsize – The maximum number of items allowed in the queue. When the queue reaches this size, put() operations will block until items are removed. If None, the queue size is unbounded.

async close() None[source]

Close the queue, preventing further put operations.

After closing:
  • put() will raise QueueClosedError

  • get() will succeed until the queue is empty, then raise QueueClosedError

  • AsyncIterator interface will stop iteration when the queue is empty

async get() _T[source]

Get an item from the queue.

If the queue is empty, this will wait until an item is put into the queue.

Returns:

The next item from the queue

Raises:

QueueClosedError – If the queue is closed and empty

is_closed() bool[source]

Check if the queue is closed.

Returns:

True if the queue is closed, False otherwise

async peek() _T[source]

Peek at the next item without removing it from the queue.

If the queue is empty, this will wait until an item is put into the queue.

Returns:

The next item from the queue (without removing it)

Raises:

QueueClosedError – If the queue is closed and empty

async put(item: _T) None[source]

Put an item into the queue.

If the queue is full and has a maxsize, this will wait until space is available.

Parameters:

item – The item to put into the queue

Raises:

QueueClosedError – If the queue is closed

until_empty() AsyncIterator[_T][source]

Get an async iterator that consumes all items until the queue is empty.

This iterator will close the queue automatically when all items are consumed, unless specified otherwise.

Returns:

An async iterator that yields items until the queue is empty

class flowno.DraftNode(*args: Unpack)[source]

Abstract Base class for all connectable draft nodes in the flow graph.

DraftNode subclass instance represents a node that can be connected to other nodes to form a computational graph. It only handles input/output connections. The node must be wrapped in a FinalizedNode to be used in a running flow.

Warning

Do not use this class directly, use the @node decorator instead

Examples

>>> from flowno import node, FlowHDL
>>> @node
... async def add(a: int, b: int) -> int:
...     return a + b
>>> with FlowHDL() as f:
...     f.result = add(1, f.result)
get_input_nodes() list[DraftNode[Unpack, tuple[object, ...]]][source]

Get all nodes connected to this node’s inputs.

get_output_nodes() list[DraftNode[Unpack, tuple[object, ...]]][source]

Get all nodes connected to this node’s outputs.

class flowno.EventLoop[source]

The core event loop implementation for Flowno’s asynchronous execution model.

Manages task scheduling, I/O operations, and synchronization primitives for the dataflow runtime.

cancel(raw_task: Coroutine[Command, Any, Any]) bool[source]

Cancel a task.

Parameters:

raw_task – The task to cancel.

Returns:

True if the task was successfully cancelled; False if it was already finished or errored.

handle_queue_close(queue: AsyncQueue[Any]) None[source]

Handle a queue being closed. Also resumes all tasks waiting on the queue with an appropriate exception.

has_living_tasks() bool[source]

Return True if there are any tasks still needing processing.

run_until_complete(root_task: Coroutine[Command, Any, _ReturnT], join: bool = False, wait_for_spawned_tasks: bool = True, _debug_max_wait_time: float | None = None) _ReturnT | None[source]

Run the event loop until the given root task is complete.

This method executes the main event loop, processing tasks, handling I/O operations, and managing task synchronization until the root task completes. It can optionally wait for all spawned tasks to finish as well.

Parameters:
  • root_task (RawTask[Command, Any, _ReturnT]) – The coroutine task to execute as the root of the execution graph.

  • join (bool) – When True, returns the result value of the root task. When False, returns None regardless of the task’s result. If the task raises an exception and join=True, the exception is re-raised.

  • wait_for_spawned_tasks (bool) – When True, continue running the event loop until all tasks spawned by the root task have completed. When False, stop as soon as the root task completes.

  • _debug_max_wait_time (float | None) – Optional timeout value in seconds used for debugging. Limits how long the event loop will wait for network or sleeping operations.

Returns:

If join=True, returns the result of the root task (of type _ReturnT).

If join=False, returns None.

Return type:

_ReturnT | None

Raises:
  • RuntimeError – If the event loop exits without completing the root task when join=True.

  • Exception – Any exception raised by the root task is propagated if join=True.

class flowno.Flow(is_finalized: bool = True)[source]

Dataflow graph execution engine.

The Flow class manages the execution of a dataflow graph, handling dependency resolution, node scheduling, and cycle breaking. It uses a custom event loop to execute nodes concurrently while respecting data dependencies.

Key features:
  • Automatic dependency-based scheduling

  • Cycle detection and resolution

  • Support for streaming data (run levels)

  • Concurrency management

unvisited

List of nodes that have not yet been visited during execution

Type:

list[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]]]

visited

Set of nodes that have been visited

Type:

set[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]]]

node_tasks

Dictionary mapping nodes to their tasks and status

Type:

dict[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]], flowno.core.flow.flow.NodeTaskAndStatus]

running_nodes

Set of nodes currently running

Type:

set[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]]]

resolution_queue

Queue of nodes waiting to be resolved

Type:

flowno.core.event_loop.queues.AsyncSetQueue[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]]]

add_node(node: FinalizedNode[Unpack, tuple[Any, ...]])[source]

Add a node to the flow.

Parameters:

node – The node to add

add_nodes(nodes: list[FinalizedNode[Unpack, tuple[Any, ...]]])[source]

Add multiple nodes to the flow.

Parameters:

nodes – The nodes to add

clear_defaulted_inputs(node: FinalizedNode[Unpack, tuple[object, ...]]) None[source]

Remove defaulted input information for a node.

Parameters:

node – The node to clear defaulted inputs for

evaluate_node(node: FinalizedNode[Unpack, tuple[object, ...]]) Never[source]

The persistent task that evaluates a node.

This is the main execution function for a node. It:
  1. Waits for the node to be ready to run

  2. Gathers inputs and handles defaulted values

  3. Calls the node with its inputs

  4. Processes the result (either coroutine or async generator)

  5. Propagates outputs to dependent nodes

  6. Repeats

Parameters:

node – The node to evaluate

Returns:

Never returns; runs as a persistent coroutine

Raises:

NotImplementedError – If the node does not return a coroutine or async generator

is_input_defaulted(node: FinalizedNode[Unpack, tuple[object, ...]], input_port: InputPortIndex) bool[source]

Check if a specific input port is using a default value.

Parameters:
  • node – The node to check

  • input_port – The input port index to check

Returns:

True if the input port is using a default value, False otherwise

run_until_complete(stop_at_node_generation: dict[FinalizedNode[Unpack, tuple[Any, ...]] | DraftNode[Unpack, tuple[Any, ...]], tuple[int, ...] | None] | tuple[int, ...] | None = (), terminate_on_node_error: bool = False, _debug_max_wait_time: float | None = None)[source]

Execute the flow until completion or until a termination condition is met.

This is the main entry point for running a flow. It starts the resolution process and runs until all nodes have completed or a termination condition (like reaching a generation limit or an error) is met.

Parameters:
  • stop_at_node_generation – Generation limit for nodes, either as a global limit or as a dict mapping nodes to their individual limits

  • terminate_on_node_error – Whether to terminate the flow if a node raises an exception

  • _debug_max_wait_time – Maximum time in seconds to wait for I/O operations (useful for debugging)

Raises:
set_defaulted_inputs(node: FinalizedNode[Unpack, tuple[object, ...]], defaulted_inputs: list[InputPortIndex]) None[source]

Mark specific inputs of a node as using default values.

When a node uses default values for inputs that are part of a cycle, this method records that information and increments the stitch level to prevent infinite recursion.

Parameters:
  • node – The node with defaulted inputs

  • defaulted_inputs – List of input port indices using default values

set_node_status(node: FinalizedNode[Unpack, tuple[object, ...]], status: Ready | Running | Error | Stalled) None[source]

Update the status of a node and notify instrumentation.

Parameters:
  • node – The node whose status is being updated

  • status – The new status to set

class flowno.FlowHDL[source]

Context manager for building dataflow graphs.

The FlowHDL context allows:

  • Assigning nodes as attributes

  • Forward-referencing nodes that haven’t been defined yet

  • Automatic resolution of placeholder references when exiting the context

Attributes within the context become nodes in the final flow. The context automatically finalizes all node connections when exited.

Use the special syntax:

>>> with FlowHDL() as f:
...     f.node1 = Node1(f.node2)
...     f.node2 = Node2()
>>> f.run_until_complete()

User defined attributes should not start with an underscore.

Canonical:

flowno.core.flow_hdl.FlowHDL

KEYWORDS: ClassVar[list[str]] = ['KEYWORDS', 'run_until_complete']

Keywords that should not be treated as nodes in the graph.

run_until_complete(stop_at_node_generation: dict[DraftNode[Unpack, tuple[Any, ...]] | FinalizedNode[Unpack, tuple[Any, ...]], tuple[int, ...] | None] | tuple[int, ...] | None = (), terminate_on_node_error: bool = True, _debug_max_wait_time: float | None = None) None[source]

Run the flow until all nodes have completed processing.

Parameters:
  • stop_at_node_generation – Optional generation number or mapping of nodes to generation numbers to stop execution at

  • terminate_on_node_error – Whether to terminate the entire flow if any node raises an exception

  • _debug_max_wait_time – Maximum time to wait for nodes to complete (for debugging only)

class flowno.SocketHandle(socket: socket)[source]

Wrapper around the built-in socket object.

This class provides methods that integrate with Flowno’s event loop, allowing socket operations to be performed asynchronously.

accept() Generator[SocketAcceptCommand, None, tuple[SocketHandle, tuple[Any, ...] | str]][source]

Accept a connection on a listening socket.

This coroutine yields a SocketAcceptCommand for the event loop to process. When the event loop detects an incoming connection, it resumes this coroutine.

Returns:

A tuple containing a new SocketHandle for the client connection and the client’s address.

bind(address: tuple[Any, ...] | str, /) None[source]

Bind the socket to the specified address.

Parameters:

address – The address (host, port) to bind to.

connect(address: tuple[Any, ...] | str, /) None[source]

Connect to a remote socket at the specified address.

This is a blocking operation. For non-blocking connections, use the socket primitives from the flowno module.

Parameters:

address – The address to connect to (host, port).

listen(backlog: int | None = None, /) None[source]

Enable a server socket to accept connections.

Parameters:

backlog – The number of unaccepted connections the system will allow before refusing new connections.

recv(bufsize: int) Generator[SocketRecvCommand, None, bytes][source]

Receive data from the socket.

This coroutine yields a SocketRecvCommand for the event loop to process. When data is available to read, the event loop resumes this coroutine.

Parameters:

bufsize – The maximum number of bytes to receive.

Returns:

The bytes received from the socket.

send(data: bytes) Generator[SocketSendCommand, None, int][source]

Send data to the socket.

Unlike sendAll, this sends data once and returns the number of bytes sent.

Parameters:

data – The bytes to send.

Returns:

The number of bytes sent.

sendAll(data: bytes) Generator[SocketSendCommand, None, None][source]

Send all data to the socket.

This coroutine continues yielding SocketSendCommand until all data is sent.

Parameters:

data – The bytes to send.

class flowno.Stream(input: FinalizedInputPortRef[_InputType], output: FinalizedOutputPortRef[_InputType])[source]

A stream of values from one node to another.

Streams connect nodes that produce multiple values over time (run_level > 0) to consuming nodes. They act as async iterators that yield values as they become available.

Type Parameters:

_InputType: The type of data being streamed

exception flowno.TerminateLimitReached[source]

Exception raised when a node reaches its generation limit.

async flowno.azip(*args: Unpack) AsyncGenerator[tuple[object, ...], None][source]

Combine multiple async iterators, similar to the built-in zip() function.

This function takes multiple async iterators and yields tuples containing items from each iterator, advancing all iterators in lockstep. It stops when the shortest iterator is exhausted.

Parameters:

*args – Two or more async iterators to combine

Yields:

Tuples containing one item from each iterator

Example

>>> async def gen1():
...     for i in range(3):
...         yield i
>>>
>>> async def gen2():
...     yield "a"
...     yield "b"
>>>
>>> # Will yield (0, "a") and (1, "b")
>>> async for pair in azip(gen1(), gen2()):
...     print(pair)
flowno.node(func_or_cls: Callable[[...], Coroutine[Any, Any, _ReturnT_co]] | Callable[[...], AsyncGenerator[_ReturnT_co, None]] | type[ClassCall[Any, _ReturnT_co]] | None = None, /, *, multiple_outputs: Literal[False] | Literal[True] | None = None, stream_in: list[str] = []) type[MonoNode[Unpack, tuple[_ReturnT_co]]] | type[StreamingNode[Unpack, tuple[_ReturnT_co]]] | node_meta_single_dec | node_meta_multiple_dec[Unpack, _ReturnTupleT_co][source]

Decorator that transforms async functions or classes into DraftNode subclasses.

Parameters:
  • func_or_cls – The async function or class to transform

  • multiple_outputs – Whether the node has multiple outputs

  • stream_in – List of input streams

Returns:

A DraftNode subclass or a node_meta decorator

Examples

Basic usage:

>>> from flowno import node
>>>
>>> @node
... async def Add(x: int, y: int) -> int:
...     return x + y
>>>
>>> add_node = Add(1, 2)
>>> print(add_node)  # DraftNode instance

With stream inputs:

>>> from flowno import node, Stream
>>>
>>> @node(stream_in=["a"])
... async def SumStream(x: int, a: Stream[int]) -> int:
...     total = x
...     async for value in a:
...         total += value
...     return total
>>>
>>> sum_stream_node = SumStream(1)
>>> print(sum_stream_node)  # DraftNode instance with stream input

With multiple outputs:

>>> from flowno import node
>>>
>>> @node(multiple_outputs=True)
... async def SumAndDiff(x: int, y: int) -> tuple[int, int]:
...     return x + y, x - y
>>>
>>> sum_and_diff_node = SumAndDiff(3, 1)
>>> print(sum_and_diff_node)  # DraftNode instance with multiple outputs
flowno.sleep(duration: float) Generator[SleepCommand, None, float][source]

Suspend the current task for the specified duration.

Parameters:

duration – The number of seconds to sleep.

Returns:

The actual time slept (which may be slightly longer than requested).

flowno.socket(family: AddressFamily | int = -1, type: SocketKind | int = -1, proto: int = -1, fileno: int | None = None, use_tls: bool = False, ssl_context: SSLContext | None = None, server_hostname: str | None = None) SocketHandle[source]

Create a new socket compatible with Flowno’s event loop.

Parameters:
  • family – The address family (default: AF_INET)

  • type – The socket type (default: SOCK_STREAM)

  • proto – The protocol number (default: 0)

  • fileno – If specified, the socket is created from an existing file descriptor

  • use_tls – When True, creates a TLS-wrapped socket

  • ssl_context – The SSL context to use (if use_tls=True)

  • server_hostname – The server hostname for TLS certificate validation

Returns:

A SocketHandle that can be used with the Flowno event loop.

flowno.spawn(raw_task: Coroutine[Any, Any, _T_co]) Generator[SpawnCommand[_T_co], Any, TaskHandle[_T_co]][source]

Spawn a new task to run concurrently with the current task.

Parameters:

raw_task – The coroutine to run as a new task.

Returns:

A TaskHandle that can be used to wait for the task to complete.