flowno
Flowno: A Python DSL for building dataflow programs.
This module provides tools for creating concurrent, cyclic, and streaming dataflow programs.
- Key features:
Node-based design with the @node decorator
Support for cyclic dependencies and streaming data
Built-in concurrency with a custom event loop
Type-checked node connections
- Configure logging with environment variables:
FLOWNO_LOG_LEVEL: Set logging level (default: ERROR)
FLOWNO_LOG_TAG_FILTER: Filter logs by tags (default: ALL)
- class flowno.AsyncQueue(maxsize: int | None = None)[source]
An asynchronous queue for the Flowno event loop.
This queue allows tasks to exchange data safely, with proper synchronization handled by the event loop. When used as an async iterator, it yields items until the queue is closed.
- Parameters:
maxsize – The maximum number of items allowed in the queue. When the queue reaches this size, put() operations will block until items are removed. If None, the queue size is unbounded.
- async close() None [source]
Close the queue, preventing further put operations.
- After closing:
put() will raise QueueClosedError
get() will succeed until the queue is empty, then raise QueueClosedError
AsyncIterator interface will stop iteration when the queue is empty
- async get() _T [source]
Get an item from the queue.
If the queue is empty, this will wait until an item is put into the queue.
- Returns:
The next item from the queue
- Raises:
QueueClosedError – If the queue is closed and empty
- is_closed() bool [source]
Check if the queue is closed.
- Returns:
True if the queue is closed, False otherwise
- async peek() _T [source]
Peek at the next item without removing it from the queue.
If the queue is empty, this will wait until an item is put into the queue.
- Returns:
The next item from the queue (without removing it)
- Raises:
QueueClosedError – If the queue is closed and empty
- async put(item: _T) None [source]
Put an item into the queue.
If the queue is full and has a maxsize, this will wait until space is available.
- Parameters:
item – The item to put into the queue
- Raises:
QueueClosedError – If the queue is closed
- until_empty() AsyncIterator[_T] [source]
Get an async iterator that consumes all items until the queue is empty.
This iterator will close the queue automatically when all items are consumed, unless specified otherwise.
- Returns:
An async iterator that yields items until the queue is empty
- class flowno.DraftNode(*args: Unpack)[source]
Abstract Base class for all connectable draft nodes in the flow graph.
DraftNode subclass instance represents a node that can be connected to other nodes to form a computational graph. It only handles input/output connections. The node must be wrapped in a FinalizedNode to be used in a running flow.
Warning
Do not use this class directly, use the
@node
decorator insteadExamples
>>> from flowno import node, FlowHDL >>> @node ... async def add(a: int, b: int) -> int: ... return a + b >>> with FlowHDL() as f: ... f.result = add(1, f.result)
- class flowno.EventLoop[source]
The core event loop implementation for Flowno’s asynchronous execution model.
Manages task scheduling, I/O operations, and synchronization primitives for the dataflow runtime.
- cancel(raw_task: Coroutine[Command, Any, Any]) bool [source]
Cancel a task.
- Parameters:
raw_task – The task to cancel.
- Returns:
True if the task was successfully cancelled; False if it was already finished or errored.
- handle_queue_close(queue: AsyncQueue[Any]) None [source]
Handle a queue being closed. Also resumes all tasks waiting on the queue with an appropriate exception.
- run_until_complete(root_task: Coroutine[Command, Any, _ReturnT], join: bool = False, wait_for_spawned_tasks: bool = True, _debug_max_wait_time: float | None = None) _ReturnT | None [source]
Run the event loop until the given root task is complete.
This method executes the main event loop, processing tasks, handling I/O operations, and managing task synchronization until the root task completes. It can optionally wait for all spawned tasks to finish as well.
- Parameters:
root_task (RawTask[Command, Any, _ReturnT]) – The coroutine task to execute as the root of the execution graph.
join (bool) – When True, returns the result value of the root task. When False, returns None regardless of the task’s result. If the task raises an exception and join=True, the exception is re-raised.
wait_for_spawned_tasks (bool) – When True, continue running the event loop until all tasks spawned by the root task have completed. When False, stop as soon as the root task completes.
_debug_max_wait_time (float | None) – Optional timeout value in seconds used for debugging. Limits how long the event loop will wait for network or sleeping operations.
- Returns:
- If join=True, returns the result of the root task (of type _ReturnT).
If join=False, returns None.
- Return type:
_ReturnT | None
- Raises:
RuntimeError – If the event loop exits without completing the root task when join=True.
Exception – Any exception raised by the root task is propagated if join=True.
- class flowno.Flow(is_finalized: bool = True)[source]
Dataflow graph execution engine.
The Flow class manages the execution of a dataflow graph, handling dependency resolution, node scheduling, and cycle breaking. It uses a custom event loop to execute nodes concurrently while respecting data dependencies.
- Key features:
Automatic dependency-based scheduling
Cycle detection and resolution
Support for streaming data (run levels)
Concurrency management
- unvisited
List of nodes that have not yet been visited during execution
- Type:
list[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]]]
- visited
Set of nodes that have been visited
- Type:
set[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]]]
- node_tasks
Dictionary mapping nodes to their tasks and status
- Type:
dict[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]], flowno.core.flow.flow.NodeTaskAndStatus]
- running_nodes
Set of nodes currently running
- Type:
set[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]]]
- resolution_queue
Queue of nodes waiting to be resolved
- Type:
flowno.core.event_loop.queues.AsyncSetQueue[flowno.core.node_base.FinalizedNode[typing_extensions.Unpack, tuple[object, …]]]
- add_node(node: FinalizedNode[Unpack, tuple[Any, ...]])[source]
Add a node to the flow.
- Parameters:
node – The node to add
- add_nodes(nodes: list[FinalizedNode[Unpack, tuple[Any, ...]]])[source]
Add multiple nodes to the flow.
- Parameters:
nodes – The nodes to add
- clear_defaulted_inputs(node: FinalizedNode[Unpack, tuple[object, ...]]) None [source]
Remove defaulted input information for a node.
- Parameters:
node – The node to clear defaulted inputs for
- evaluate_node(node: FinalizedNode[Unpack, tuple[object, ...]]) Never [source]
The persistent task that evaluates a node.
- This is the main execution function for a node. It:
Waits for the node to be ready to run
Gathers inputs and handles defaulted values
Calls the node with its inputs
Processes the result (either coroutine or async generator)
Propagates outputs to dependent nodes
Repeats
- Parameters:
node – The node to evaluate
- Returns:
Never returns; runs as a persistent coroutine
- Raises:
NotImplementedError – If the node does not return a coroutine or async generator
- is_input_defaulted(node: FinalizedNode[Unpack, tuple[object, ...]], input_port: InputPortIndex) bool [source]
Check if a specific input port is using a default value.
- Parameters:
node – The node to check
input_port – The input port index to check
- Returns:
True if the input port is using a default value, False otherwise
- run_until_complete(stop_at_node_generation: dict[FinalizedNode[Unpack, tuple[Any, ...]] | DraftNode[Unpack, tuple[Any, ...]], tuple[int, ...] | None] | tuple[int, ...] | None = (), terminate_on_node_error: bool = False, _debug_max_wait_time: float | None = None)[source]
Execute the flow until completion or until a termination condition is met.
This is the main entry point for running a flow. It starts the resolution process and runs until all nodes have completed or a termination condition (like reaching a generation limit or an error) is met.
- Parameters:
stop_at_node_generation – Generation limit for nodes, either as a global limit or as a dict mapping nodes to their individual limits
terminate_on_node_error – Whether to terminate the flow if a node raises an exception
_debug_max_wait_time – Maximum time in seconds to wait for I/O operations (useful for debugging)
- Raises:
Exception – Any exception raised by nodes and not caught
TerminateLimitReached – When a node reaches its generation limit
- set_defaulted_inputs(node: FinalizedNode[Unpack, tuple[object, ...]], defaulted_inputs: list[InputPortIndex]) None [source]
Mark specific inputs of a node as using default values.
When a node uses default values for inputs that are part of a cycle, this method records that information and increments the stitch level to prevent infinite recursion.
- Parameters:
node – The node with defaulted inputs
defaulted_inputs – List of input port indices using default values
- class flowno.FlowHDL[source]
Context manager for building dataflow graphs.
The FlowHDL context allows:
Assigning nodes as attributes
Forward-referencing nodes that haven’t been defined yet
Automatic resolution of placeholder references when exiting the context
Attributes within the context become nodes in the final flow. The context automatically finalizes all node connections when exited.
Use the special syntax:
>>> with FlowHDL() as f: ... f.node1 = Node1(f.node2) ... f.node2 = Node2() >>> f.run_until_complete()
User defined attributes should not start with an underscore.
- Canonical:
- KEYWORDS: ClassVar[list[str]] = ['KEYWORDS', 'run_until_complete']
Keywords that should not be treated as nodes in the graph.
- run_until_complete(stop_at_node_generation: dict[DraftNode[Unpack, tuple[Any, ...]] | FinalizedNode[Unpack, tuple[Any, ...]], tuple[int, ...] | None] | tuple[int, ...] | None = (), terminate_on_node_error: bool = True, _debug_max_wait_time: float | None = None) None [source]
Run the flow until all nodes have completed processing.
- Parameters:
stop_at_node_generation – Optional generation number or mapping of nodes to generation numbers to stop execution at
terminate_on_node_error – Whether to terminate the entire flow if any node raises an exception
_debug_max_wait_time – Maximum time to wait for nodes to complete (for debugging only)
- class flowno.SocketHandle(socket: socket)[source]
Wrapper around the built-in socket object.
This class provides methods that integrate with Flowno’s event loop, allowing socket operations to be performed asynchronously.
- accept() Generator[SocketAcceptCommand, None, tuple[SocketHandle, tuple[Any, ...] | str]] [source]
Accept a connection on a listening socket.
This coroutine yields a SocketAcceptCommand for the event loop to process. When the event loop detects an incoming connection, it resumes this coroutine.
- Returns:
A tuple containing a new SocketHandle for the client connection and the client’s address.
- bind(address: tuple[Any, ...] | str, /) None [source]
Bind the socket to the specified address.
- Parameters:
address – The address (host, port) to bind to.
- connect(address: tuple[Any, ...] | str, /) None [source]
Connect to a remote socket at the specified address.
This is a blocking operation. For non-blocking connections, use the socket primitives from the flowno module.
- Parameters:
address – The address to connect to (host, port).
- listen(backlog: int | None = None, /) None [source]
Enable a server socket to accept connections.
- Parameters:
backlog – The number of unaccepted connections the system will allow before refusing new connections.
- recv(bufsize: int) Generator[SocketRecvCommand, None, bytes] [source]
Receive data from the socket.
This coroutine yields a SocketRecvCommand for the event loop to process. When data is available to read, the event loop resumes this coroutine.
- Parameters:
bufsize – The maximum number of bytes to receive.
- Returns:
The bytes received from the socket.
- class flowno.Stream(input: FinalizedInputPortRef[_InputType], output: FinalizedOutputPortRef[_InputType])[source]
A stream of values from one node to another.
Streams connect nodes that produce multiple values over time (run_level > 0) to consuming nodes. They act as async iterators that yield values as they become available.
- Type Parameters:
_InputType: The type of data being streamed
- exception flowno.TerminateLimitReached[source]
Exception raised when a node reaches its generation limit.
- async flowno.azip(*args: Unpack) AsyncGenerator[tuple[object, ...], None] [source]
Combine multiple async iterators, similar to the built-in zip() function.
This function takes multiple async iterators and yields tuples containing items from each iterator, advancing all iterators in lockstep. It stops when the shortest iterator is exhausted.
- Parameters:
*args – Two or more async iterators to combine
- Yields:
Tuples containing one item from each iterator
Example
>>> async def gen1(): ... for i in range(3): ... yield i >>> >>> async def gen2(): ... yield "a" ... yield "b" >>> >>> # Will yield (0, "a") and (1, "b") >>> async for pair in azip(gen1(), gen2()): ... print(pair)
- flowno.node(func_or_cls: Callable[[...], Coroutine[Any, Any, _ReturnT_co]] | Callable[[...], AsyncGenerator[_ReturnT_co, None]] | type[ClassCall[Any, _ReturnT_co]] | None = None, /, *, multiple_outputs: Literal[False] | Literal[True] | None = None, stream_in: list[str] = []) type[MonoNode[Unpack, tuple[_ReturnT_co]]] | type[StreamingNode[Unpack, tuple[_ReturnT_co]]] | node_meta_single_dec | node_meta_multiple_dec[Unpack, _ReturnTupleT_co] [source]
Decorator that transforms async functions or classes into DraftNode subclasses.
- Parameters:
func_or_cls – The async function or class to transform
multiple_outputs – Whether the node has multiple outputs
stream_in – List of input streams
- Returns:
A DraftNode subclass or a node_meta decorator
Examples
Basic usage:
>>> from flowno import node >>> >>> @node ... async def Add(x: int, y: int) -> int: ... return x + y >>> >>> add_node = Add(1, 2) >>> print(add_node) # DraftNode instance
With stream inputs:
>>> from flowno import node, Stream >>> >>> @node(stream_in=["a"]) ... async def SumStream(x: int, a: Stream[int]) -> int: ... total = x ... async for value in a: ... total += value ... return total >>> >>> sum_stream_node = SumStream(1) >>> print(sum_stream_node) # DraftNode instance with stream input
With multiple outputs:
>>> from flowno import node >>> >>> @node(multiple_outputs=True) ... async def SumAndDiff(x: int, y: int) -> tuple[int, int]: ... return x + y, x - y >>> >>> sum_and_diff_node = SumAndDiff(3, 1) >>> print(sum_and_diff_node) # DraftNode instance with multiple outputs
- flowno.sleep(duration: float) Generator[SleepCommand, None, float] [source]
Suspend the current task for the specified duration.
- Parameters:
duration – The number of seconds to sleep.
- Returns:
The actual time slept (which may be slightly longer than requested).
- flowno.socket(family: AddressFamily | int = -1, type: SocketKind | int = -1, proto: int = -1, fileno: int | None = None, use_tls: bool = False, ssl_context: SSLContext | None = None, server_hostname: str | None = None) SocketHandle [source]
Create a new socket compatible with Flowno’s event loop.
- Parameters:
family – The address family (default: AF_INET)
type – The socket type (default: SOCK_STREAM)
proto – The protocol number (default: 0)
fileno – If specified, the socket is created from an existing file descriptor
use_tls – When True, creates a TLS-wrapped socket
ssl_context – The SSL context to use (if use_tls=True)
server_hostname – The server hostname for TLS certificate validation
- Returns:
A SocketHandle that can be used with the Flowno event loop.
- flowno.spawn(raw_task: Coroutine[Any, Any, _T_co]) Generator[SpawnCommand[_T_co], Any, TaskHandle[_T_co]] [source]
Spawn a new task to run concurrently with the current task.
- Parameters:
raw_task – The coroutine to run as a new task.
- Returns:
A TaskHandle that can be used to wait for the task to complete.