flowno.core.node_base
Core node types and base classes for the Flowno dataflow system.
- This module defines the fundamental building blocks of the dataflow graph:
DraftNode: Base class for all nodes before finalization
FinalizedNode: Runtime node with active connections
Stream: For handling streaming data between nodes
Various port and connection types for graph construction
These classes are primarily internal implementation details.
- class flowno.core.node_base.Constant(value: _T)[source]
Bases:
DraftNode
[(),tuple
[_T
]]A node that produces a constant value of type _T.
- Parameters:
value (_T) – The constant value produced by the node.
- _default_values: ClassVar[dict[int, object]] = {}
Used by dynamically generated subclasses in the @node factory
- class flowno.core.node_base.DraftInputPort(*, connected_output: flowno.core.node_base.DraftOutputPortRef[_T] | flowno.core.node_base.OutputPortRefPlaceholder[_T] | NoneType = None, minimum_run_level: Literal[0, 1, 2, 3] = 0, default_value: object | type[inspect._empty])[source]
Bases:
Generic
[_T
]- default_value
alias of
_empty
- class flowno.core.node_base.DraftInputPortRef(node: DraftNode[Unpack, tuple[object, ...]], port_index: InputPortIndex)[source]
Bases:
Generic
[_T
]Represents an input port of a node (the data consumer).
- Type Parameters:
_T : Phantom type. The port_input arg to node.call(…) should be _T.
- class flowno.core.node_base.DraftNode(*args: Unpack)[source]
Bases:
ABC
,Generic
[Unpack
[_Ts
],ReturnTupleT_co
]Abstract Base class for all connectable draft nodes in the flow graph.
DraftNode subclass instance represents a node that can be connected to other nodes to form a computational graph. It only handles input/output connections. The node must be wrapped in a FinalizedNode to be used in a running flow.
Warning
Do not use this class directly, use the
@node
decorator insteadExamples
>>> from flowno import node, FlowHDL >>> @node ... async def add(a: int, b: int) -> int: ... return a + b >>> with FlowHDL() as f: ... f.result = add(1, f.result)
- _blank_finalized() FinalizedNode[Unpack, ReturnTupleT_co] [source]
Convert this draft node into a finalized node without connections.
- _default_values: ClassVar[dict[int, object]]
Used by dynamically generated subclasses in the @node factory
- _minimum_run_level: ClassVar[list[Literal[0, 1, 2, 3]]]
Used by subclasses to set the minimum run level required for each input port
- class flowno.core.node_base.DraftOutputPortRef(node: DraftNode[Unpack, tuple[object, ...]], port_index: OutputPortIndex)[source]
Bases:
Generic
[_Tout
]Represents the an output port on a node (the data producer).
- Type Parameters:
_T : A phantom type. The port_index output of node.call(…) should be _T.
- connect(target: DraftInputPortRef[_Tout])[source]
Wires this output to the given target’s input. Internally, record that data flows source_node.output(port_index) -> target_node.input(port_index)
- class flowno.core.node_base.FinalizedInputPort(*, port_index: flowno.core.types.InputPortIndex, connected_output: flowno.core.node_base.FinalizedOutputPortRef[_T] | None = None, minimum_run_level: Literal[0, 1, 2, 3] = 0, default_value: object | type[inspect._empty], stitch_level_0: int = 0)[source]
Bases:
Generic
[_T
]- default_value
alias of
_empty
- class flowno.core.node_base.FinalizedInputPortRef(node: flowno.core.node_base.FinalizedNode[typing_extensions.Unpack[tuple[object, ...]], tuple[object, ...]], port_index: flowno.core.types.InputPortIndex)[source]
Bases:
Generic
[_T
]
- class flowno.core.node_base.FinalizedNode(original_call: OriginalCall, instance_id: int, input_ports: dict[InputPortIndex, FinalizedInputPort[object]], connected_output_nodes: dict[OutputPortIndex, list[FinalizedNode[Unpack, tuple[object, ...]]]], draft: DraftNode[Unpack, ReturnTupleT_co])[source]
Bases:
Generic
[Unpack
[_Ts
],ReturnTupleT_co
]The finalized node with valid connections to other nodes. Do not explicitly create or subclass FinalizedNode.
- class GatheredInputs(positional_args, defaulted_ports)[source]
Bases:
NamedTuple
- _asdict()
Return a new dict which maps field names to their values.
- classmethod _make(iterable)
Make a new GatheredInputs object from a sequence or iterable
- _replace(**kwds)
Return a new GatheredInputs object replacing specified fields with new values
- _get_maximum_input_generation() tuple[int, ...] | None [source]
Return the highest generation of the connected inputs using predicate cmp_generation.
- call(*args: Unpack) Coroutine[Any, Any, ReturnTupleT_co] | AsyncGenerator[ReturnTupleT_co, None] [source]
Delegate call to the draft node implementation
- async count_down_upstream_latches(defaulted_inputs: list[InputPortIndex]) None [source]
Count down upstream node barriers for non-defaulted connected inputs.
- gather_inputs() GatheredInputs [source]
Gather the inputs for the node.
For input ports that request non-streaming data, the last data produced by the input node is used. For input ports that request streaming data, a Stream object is used with reference to self.
- get_data(run_level: Literal[0, 1, 2, 3] = 0) ReturnTupleT_co | None [source]
Get the most recent data produced by the node at the specified run level.
- Parameters:
run_level – The run level to retrieve data for (default: 0)
- Returns:
The tuple of data produced by this node, or None if no data is available
- get_inputs_with_le_generation_clipped_to_minimum_run_level() list[FinalizedInputPort[object]] [source]
Get the input nodes that should be resolved before this node should run, considering the minimum run level required for each input.
For each input connection, the following steps are performed:
Clip the input node’s generation based on the input port’s minimum run level.
Add the stitch value to the clipped generation.
Compare the clipped and stitched generation with this node’s current generation.
The input is considered stale and needs to be resolved if the clipped generation is less than or equal to this node’s generation.
- Returns:
A list of input connections that are stale and and need to be resolved before this node can run.
- Return type:
List[InputConnection]
- get_output_nodes() list[FinalizedNode[Unpack, tuple[object, ...]]] [source]
Get all nodes connected to this node’s outputs.
- get_output_nodes_by_run_level(run_level: Literal[0, 1, 2, 3]) list[FinalizedInputPort[object]] [source]
Returns a list of input ports (FinalizedInputPort objects) from consumer nodes that are connected to this node, filtered by the specified run level.
Each input port is included once per connection. That is, if the same consumer is connected multiple times (or via different output ports) with a minimum_run_level equal to the specified run_level, it will appear once for each such connection.
- Parameters:
run_level (RunLevel) – The desired run level (for example, 0 for final data, 1 for partial/streaming updates).
- Returns:
- A list of input ports on consumer nodes that are connected to self and
have the specified minimum_run_level.
- Return type:
- class flowno.core.node_base.FinalizedOutputPortRef(node: flowno.core.node_base.FinalizedNode[typing_extensions.Unpack[tuple[object, ...]], tuple[object, ...]], port_index: flowno.core.types.OutputPortIndex)[source]
Bases:
Generic
[_T
]
- exception flowno.core.node_base.MissingDefaultError(node: FinalizedNode[Unpack, tuple[object, ...]], input_index: InputPortIndex)[source]
- exception flowno.core.node_base.MissingDefaultError(node: SuperNode)
Bases:
Exception
- class flowno.core.node_base.NodePlaceholder(name: str)[source]
Bases:
object
Placeholder for a node defined on a FlowHDL context instance.
Examples
>>> with FlowHDL() as flow: ... flow.node1 = DummyNode() ... assert isinstance(flow.node1, Node) ... assert isinstance(flow.node2, NodePlaceholder) ... print(flow.node2.name) # prints "node2" ... assert isinstance(flow.node3[OutputPortIndex(0)], NodeOutputPlaceholder)
- class flowno.core.node_base.OriginalCall(call_signature, call_code, func_name, class_name)[source]
Bases:
NamedTuple
- _asdict()
Return a new dict which maps field names to their values.
- classmethod _make(iterable)
Make a new OriginalCall object from a sequence or iterable
- _replace(**kwds)
Return a new OriginalCall object replacing specified fields with new values
- class flowno.core.node_base.OutputPortRefPlaceholder(node: NodePlaceholder, port_index: OutputPortIndex)[source]
Bases:
Generic
[_ReturnT
]Placeholder for a specific output port of a NodePlaceholder.
- class flowno.core.node_base.ReturnTupleT_co
The return type of a single output or multiple output node.
alias of TypeVar(‘ReturnTupleT_co’, bound=
tuple
[object
, …], covariant=True)
- class flowno.core.node_base.StalledNodeRequestCommand(stalled_input: 'FinalizedInputPortRef[object]', stalling_node: 'FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]]')[source]
Bases:
Command
- class flowno.core.node_base.Stream(input: FinalizedInputPortRef[_InputType], output: FinalizedOutputPortRef[_InputType])[source]
Bases:
Generic
[_InputType
],AsyncIterator
[_InputType
]A stream of values from one node to another.
Streams connect nodes that produce multiple values over time (run_level > 0) to consuming nodes. They act as async iterators that yield values as they become available.
- Type Parameters:
_InputType: The type of data being streamed
- class flowno.core.node_base.SuperNode(head: ~flowno.core.node_base.FinalizedNode[~typing_extensions.Unpack, tuple[object, ...]], members: dict[~flowno.core.node_base.FinalizedNode[~typing_extensions.Unpack, tuple[object, ...]], list[~flowno.core.types.InputPortIndex]], dependent: ~flowno.core.node_base.SuperNode | None = None, dependencies: list[~flowno.core.node_base.SuperNode] = <factory>)[source]
Bases:
object
The SuperNode is a set of nodes that are strongly connected to each other. It is used by the cycle detection algorithm.
- static build_condensed_mermaid(all_snodes: list[SuperNode], supernode_ids: dict[SuperNode, str]) str [source]
Produce a single Mermaid diagram that shows all SuperNodes as nodes and draws edges for each supernode.dependencies entry.
- flowno.core.node_base._node_id(node: FinalizedNode[Unpack, tuple[object, ...]]) str [source]
Return a stable identifier for a node to use in Mermaid.
- flowno.core.node_base._node_label(node: FinalizedNode[Unpack, tuple[object, ...]]) str [source]
Return a short label to display for each node.
- flowno.core.node_base.build_abbreviated_signature(parameters: list[tuple[str, Parameter]], required_input_ports: list[InputPortIndex], skip_self: bool) tuple[str, str] [source]
Construct just the parenthesized parameter list, skipping self if needed. Returns the line plus a line of dashes/underlines for missing-default params.
- flowno.core.node_base.build_inline_signature(func_name: str, parameters: list[tuple[str, Parameter]], required_input_ports: list[InputPortIndex], skip_self: bool) tuple[str, list[tuple[int, int, bool]]] [source]
Build something like: func_name(x: int=1, y: str) Returns the signature string and param_positions for underlining.
- flowno.core.node_base.build_param_text(param_name: str, param: Parameter) str [source]
Return a string representation of one parameter, e.g. x: int=5.
- flowno.core.node_base.build_signature_display(oc: OriginalCall, required_input_ports: list[InputPortIndex]) str [source]
- Return a multi-line string that shows either:
ClassName.call(…) in an abbreviated form, or
func_name(…) in a traditional inline form
With underlines marking which parameters are missing defaults.
- flowno.core.node_base.build_underline_line(signature_line: str, param_positions: list[tuple[int, int, bool]]) str [source]
Create a line of spaces, with dashes marking the parameters that need defaults.
- flowno.core.node_base.format_missing_defaults(node: FinalizedNode[Unpack, tuple[object, ...]], required_input_ports: list[InputPortIndex]) str [source]
Produce a message describing which parameters of a node must have default values. If the node’s original call is from a class’s call method, display an abbreviated signature with the class name in a separate line. Otherwise, display a more traditional function signature inline.
The final message includes underlines for each parameter index found in required_input_ports.