flowno.core.flow_hdl

FlowHDL: Hardware Description Language-inspired context for defining dataflow graphs.

This module provides the FlowHDL context manager which allows users to: - Define nodes and their connections in any order - Forward reference nodes before they are created (for cyclic dependencies) - Automatically finalize node connections when exiting the context

Example

>>> from flowno import FlowHDL, node
>>>
>>> @node
... async def Add(x, y):
...     return x + y
...
>>> @node
... async def Source(value):
...     return value
...
>>> with FlowHDL() as f:
...     f.output = Add(f.input1, f.input2)  # Reference nodes before definition
...     f.input1 = Source(1)                # Define nodes in any order
...     f.input2 = Source(2)
...
>>> f.run_until_complete()
>>> f.output.get_data()
(3,)
class flowno.core.flow_hdl.FlowHDL[source]

Context manager for building dataflow graphs.

The FlowHDL context allows:

  • Assigning nodes as attributes

  • Forward-referencing nodes that haven’t been defined yet

  • Automatic resolution of placeholder references when exiting the context

Attributes within the context become nodes in the final flow. The context automatically finalizes all node connections when exited.

Use the special syntax:

>>> with FlowHDL() as f:
...     f.node1 = Node1(f.node2)
...     f.node2 = Node2()
>>> f.run_until_complete()

User defined attributes should not start with an underscore.

Canonical:

flowno.core.flow_hdl.FlowHDL

KEYWORDS: ClassVar[list[str]] = ['KEYWORDS', 'run_until_complete']

Keywords that should not be treated as nodes in the graph.

__exit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) bool[source]

Finalize the graph when exiting the context by calling _finalize().

__getattribute__(key: str) NodePlaceholder[source]

Override the default attribute getter to return a placeholder for undefined attributes.

Treats attributes starting with an underscore or in the KEYWORDS list as normal attributes.

__setattr__(key: str, value: Any) None[source]

Override the default attribute setter to store nodes in a dictionary.

Ignores attributes starting with an underscore or in the KEYWORDS list.

_finalize() None[source]

Finalize the graph by replacing connections to placeholders with connections to the actual nodes.

Example: Out of order definition of nodes is allowed, as long as the connections are fully defined before the graph is finalized.

>>> with FlowHDL() as f:
...     hdl.a = Node1(f.b)
...     hdl.b = Node2()
run_until_complete(stop_at_node_generation: dict[DraftNode[Unpack, tuple[Any, ...]] | FinalizedNode[Unpack, tuple[Any, ...]], tuple[int, ...] | None] | tuple[int, ...] | None = (), terminate_on_node_error: bool = True, _debug_max_wait_time: float | None = None) None[source]

Run the flow until all nodes have completed processing.

Parameters:
  • stop_at_node_generation – Optional generation number or mapping of nodes to generation numbers to stop execution at

  • terminate_on_node_error – Whether to terminate the entire flow if any node raises an exception

  • _debug_max_wait_time – Maximum time to wait for nodes to complete (for debugging only)