flowno.core.node_base

Core node types and base classes for the Flowno dataflow system.

This module defines the fundamental building blocks of the dataflow graph:
  • DraftNode: Base class for all nodes before finalization

  • FinalizedNode: Runtime node with active connections

  • Stream: For handling streaming data between nodes

  • Various port and connection types for graph construction

These classes are primarily internal implementation details.

class flowno.core.node_base.Constant(value: _T)[source]

Bases: DraftNode[(), tuple[_T]]

A node that produces a constant value of type _T.

Parameters:

value (_T) – The constant value produced by the node.

_default_values: ClassVar[dict[int, object]] = {}

Used by dynamically generated subclasses in the @node factory

_minimum_run_level: ClassVar[list[Literal[0, 1, 2, 3]]] = []

Used by subclasses to set the minimum run level required for each input port

async call() tuple[_T][source]

Produce the single-value tuple containing our constant.

class flowno.core.node_base.DraftInputPort(*, connected_output: flowno.core.node_base.DraftOutputPortRef[_T] | flowno.core.node_base.OutputPortRefPlaceholder[_T] | NoneType = None, minimum_run_level: Literal[0, 1, 2, 3] = 0, default_value: object | type[inspect._empty])[source]

Bases: Generic[_T]

default_value

alias of _empty

class flowno.core.node_base.DraftInputPortRef(node: DraftNode[Unpack, tuple[object, ...]], port_index: InputPortIndex)[source]

Bases: Generic[_T]

Represents an input port of a node (the data consumer).

Type Parameters:

_T : Phantom type. The port_input arg to node.call(…) should be _T.

class flowno.core.node_base.DraftNode(*args: Unpack)[source]

Bases: ABC, Generic[Unpack[_Ts], ReturnTupleT_co]

Abstract Base class for all connectable draft nodes in the flow graph.

DraftNode subclass instance represents a node that can be connected to other nodes to form a computational graph. It only handles input/output connections. The node must be wrapped in a FinalizedNode to be used in a running flow.

Warning

Do not use this class directly, use the @node decorator instead

Examples

>>> from flowno import node, FlowHDL
>>> @node
... async def add(a: int, b: int) -> int:
...     return a + b
>>> with FlowHDL() as f:
...     f.result = add(1, f.result)
_blank_finalized() FinalizedNode[Unpack, ReturnTupleT_co][source]

Convert this draft node into a finalized node without connections.

_default_values: ClassVar[dict[int, object]]

Used by dynamically generated subclasses in the @node factory

_instance_counter: ClassVar[int] = 0

Used for debugging and logging

_minimum_run_level: ClassVar[list[Literal[0, 1, 2, 3]]]

Used by subclasses to set the minimum run level required for each input port

get_input_nodes() list[DraftNode[Unpack, tuple[object, ...]]][source]

Get all nodes connected to this node’s inputs.

get_output_nodes() list[DraftNode[Unpack, tuple[object, ...]]][source]

Get all nodes connected to this node’s outputs.

class flowno.core.node_base.DraftOutputPortRef(node: DraftNode[Unpack, tuple[object, ...]], port_index: OutputPortIndex)[source]

Bases: Generic[_Tout]

Represents the an output port on a node (the data producer).

Type Parameters:

_T : A phantom type. The port_index output of node.call(…) should be _T.

connect(target: DraftInputPortRef[_Tout])[source]

Wires this output to the given target’s input. Internally, record that data flows source_node.output(port_index) -> target_node.input(port_index)

class flowno.core.node_base.FinalizedInputPort(*, port_index: flowno.core.types.InputPortIndex, connected_output: flowno.core.node_base.FinalizedOutputPortRef[_T] | None = None, minimum_run_level: Literal[0, 1, 2, 3] = 0, default_value: object | type[inspect._empty], stitch_level_0: int = 0)[source]

Bases: Generic[_T]

default_value

alias of _empty

class flowno.core.node_base.FinalizedInputPortRef(node: flowno.core.node_base.FinalizedNode[typing_extensions.Unpack[tuple[object, ...]], tuple[object, ...]], port_index: flowno.core.types.InputPortIndex)[source]

Bases: Generic[_T]

class flowno.core.node_base.FinalizedNode(original_call: OriginalCall, instance_id: int, input_ports: dict[InputPortIndex, FinalizedInputPort[object]], connected_output_nodes: dict[OutputPortIndex, list[FinalizedNode[Unpack, tuple[object, ...]]]], draft: DraftNode[Unpack, ReturnTupleT_co])[source]

Bases: Generic[Unpack[_Ts], ReturnTupleT_co]

The finalized node with valid connections to other nodes. Do not explicitly create or subclass FinalizedNode.

class GatheredInputs(positional_args, defaulted_ports)[source]

Bases: NamedTuple

_asdict()

Return a new dict which maps field names to their values.

classmethod _make(iterable)

Make a new GatheredInputs object from a sequence or iterable

_replace(**kwds)

Return a new GatheredInputs object replacing specified fields with new values

defaulted_ports: list[InputPortIndex]

Alias for field number 1

positional_args: tuple[object | Stream[object], ...]

Alias for field number 0

_get_maximum_input_generation() tuple[int, ...] | None[source]

Return the highest generation of the connected inputs using predicate cmp_generation.

call(*args: Unpack) Coroutine[Any, Any, ReturnTupleT_co] | AsyncGenerator[ReturnTupleT_co, None][source]

Delegate call to the draft node implementation

async count_down_upstream_latches(defaulted_inputs: list[InputPortIndex]) None[source]

Count down upstream node barriers for non-defaulted connected inputs.

gather_inputs() GatheredInputs[source]

Gather the inputs for the node.

For input ports that request non-streaming data, the last data produced by the input node is used. For input ports that request streaming data, a Stream object is used with reference to self.

Returns:

The tuple of inputs for the node to be passed as args to the call method.

Return type:

tuple[object | Stream[object], …]

get_data(run_level: Literal[0, 1, 2, 3] = 0) ReturnTupleT_co | None[source]

Get the most recent data produced by the node at the specified run level.

Parameters:

run_level – The run level to retrieve data for (default: 0)

Returns:

The tuple of data produced by this node, or None if no data is available

get_inputs_with_le_generation_clipped_to_minimum_run_level() list[FinalizedInputPort[object]][source]

Get the input nodes that should be resolved before this node should run, considering the minimum run level required for each input.

For each input connection, the following steps are performed:

  • Clip the input node’s generation based on the input port’s minimum run level.

  • Add the stitch value to the clipped generation.

  • Compare the clipped and stitched generation with this node’s current generation.

    • The input is considered stale and needs to be resolved if the clipped generation is less than or equal to this node’s generation.

Returns:

A list of input connections that are stale and and need to be resolved before this node can run.

Return type:

List[InputConnection]

get_output_nodes() list[FinalizedNode[Unpack, tuple[object, ...]]][source]

Get all nodes connected to this node’s outputs.

get_output_nodes_by_run_level(run_level: Literal[0, 1, 2, 3]) list[FinalizedInputPort[object]][source]

Returns a list of input ports (FinalizedInputPort objects) from consumer nodes that are connected to this node, filtered by the specified run level.

Each input port is included once per connection. That is, if the same consumer is connected multiple times (or via different output ports) with a minimum_run_level equal to the specified run_level, it will appear once for each such connection.

Parameters:

run_level (RunLevel) – The desired run level (for example, 0 for final data, 1 for partial/streaming updates).

Returns:

A list of input ports on consumer nodes that are connected to self and

have the specified minimum_run_level.

Return type:

list[FinalizedInputPort[object]]

class flowno.core.node_base.FinalizedOutputPortRef(node: flowno.core.node_base.FinalizedNode[typing_extensions.Unpack[tuple[object, ...]], tuple[object, ...]], port_index: flowno.core.types.OutputPortIndex)[source]

Bases: Generic[_T]

exception flowno.core.node_base.MissingDefaultError(node: FinalizedNode[Unpack, tuple[object, ...]], input_index: InputPortIndex)[source]
exception flowno.core.node_base.MissingDefaultError(node: SuperNode)

Bases: Exception

class flowno.core.node_base.NodePlaceholder(name: str)[source]

Bases: object

Placeholder for a node defined on a FlowHDL context instance.

Examples

>>> with FlowHDL() as flow:
...     flow.node1 = DummyNode()
...     assert isinstance(flow.node1, Node)
...     assert isinstance(flow.node2, NodePlaceholder)
...     print(flow.node2.name) # prints "node2"
...     assert isinstance(flow.node3[OutputPortIndex(0)], NodeOutputPlaceholder)
class flowno.core.node_base.OriginalCall(call_signature, call_code, func_name, class_name)[source]

Bases: NamedTuple

_asdict()

Return a new dict which maps field names to their values.

classmethod _make(iterable)

Make a new OriginalCall object from a sequence or iterable

_replace(**kwds)

Return a new OriginalCall object replacing specified fields with new values

call_code: CodeType

Alias for field number 1

call_signature: Signature

Alias for field number 0

class_name: str | None

Alias for field number 3

func_name: str

Alias for field number 2

class flowno.core.node_base.OutputPortRefPlaceholder(node: NodePlaceholder, port_index: OutputPortIndex)[source]

Bases: Generic[_ReturnT]

Placeholder for a specific output port of a NodePlaceholder.

class flowno.core.node_base.ReturnTupleT_co

The return type of a single output or multiple output node.

alias of TypeVar(‘ReturnTupleT_co’, bound=tuple[object, …], covariant=True)

class flowno.core.node_base.StalledNodeRequestCommand(stalled_input: 'FinalizedInputPortRef[object]', stalling_node: 'FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]]')[source]

Bases: Command

class flowno.core.node_base.Stream(input: FinalizedInputPortRef[_InputType], output: FinalizedOutputPortRef[_InputType])[source]

Bases: Generic[_InputType], AsyncIterator[_InputType]

A stream of values from one node to another.

Streams connect nodes that produce multiple values over time (run_level > 0) to consuming nodes. They act as async iterators that yield values as they become available.

Type Parameters:

_InputType: The type of data being streamed

class flowno.core.node_base.SuperNode(head: ~flowno.core.node_base.FinalizedNode[~typing_extensions.Unpack, tuple[object, ...]], members: dict[~flowno.core.node_base.FinalizedNode[~typing_extensions.Unpack, tuple[object, ...]], list[~flowno.core.types.InputPortIndex]], dependent: ~flowno.core.node_base.SuperNode | None = None, dependencies: list[~flowno.core.node_base.SuperNode] = <factory>)[source]

Bases: object

The SuperNode is a set of nodes that are strongly connected to each other. It is used by the cycle detection algorithm.

static build_condensed_mermaid(all_snodes: list[SuperNode], supernode_ids: dict[SuperNode, str]) str[source]

Produce a single Mermaid diagram that shows all SuperNodes as nodes and draws edges for each supernode.dependencies entry.

gather_supernodes() list[SuperNode][source]

Simple DFS to collect every SuperNode reachable from root.

generate_mermaid_charts_for_condensed_graph() str[source]
Generates two ouputs:
  1. A mermaid diagram of the condensed graph.

  2. A dictionary mapping each supernode to a mermaid diagram of its internal members.

flowno.core.node_base._node_id(node: FinalizedNode[Unpack, tuple[object, ...]]) str[source]

Return a stable identifier for a node to use in Mermaid.

flowno.core.node_base._node_label(node: FinalizedNode[Unpack, tuple[object, ...]]) str[source]

Return a short label to display for each node.

flowno.core.node_base.build_abbreviated_signature(parameters: list[tuple[str, Parameter]], required_input_ports: list[InputPortIndex], skip_self: bool) tuple[str, str][source]

Construct just the parenthesized parameter list, skipping self if needed. Returns the line plus a line of dashes/underlines for missing-default params.

flowno.core.node_base.build_inline_signature(func_name: str, parameters: list[tuple[str, Parameter]], required_input_ports: list[InputPortIndex], skip_self: bool) tuple[str, list[tuple[int, int, bool]]][source]

Build something like: func_name(x: int=1, y: str) Returns the signature string and param_positions for underlining.

flowno.core.node_base.build_param_text(param_name: str, param: Parameter) str[source]

Return a string representation of one parameter, e.g. x: int=5.

flowno.core.node_base.build_signature_display(oc: OriginalCall, required_input_ports: list[InputPortIndex]) str[source]
Return a multi-line string that shows either:
  1. ClassName.call(…) in an abbreviated form, or

  2. func_name(…) in a traditional inline form

With underlines marking which parameters are missing defaults.

flowno.core.node_base.build_underline_line(signature_line: str, param_positions: list[tuple[int, int, bool]]) str[source]

Create a line of spaces, with dashes marking the parameters that need defaults.

flowno.core.node_base.format_missing_defaults(node: FinalizedNode[Unpack, tuple[object, ...]], required_input_ports: list[InputPortIndex]) str[source]

Produce a message describing which parameters of a node must have default values. If the node’s original call is from a class’s call method, display an abbreviated signature with the class name in a separate line. Otherwise, display a more traditional function signature inline.

The final message includes underlines for each parameter index found in required_input_ports.