flowno.core.flow.instrumentation

Instrumentation system for Flowno’s dataflow execution.

This module provides tools for monitoring and debugging dataflow graph execution, including node evaluation, data propagation, and cycle resolution. It uses a context manager pattern similar to the event loop instrumentation, allowing different monitoring tools to be applied to specific flows.

Example

>>> from flowno import node, FlowHDL
>>> from flowno.core.flow.instrumentation import MinimalInstrument
>>>
>>> # Define a simple flow with two nodes
>>> @node
... async def NumberNode(value: int = 5):
...     return value
>>>
>>> @node
... async def DoubleNode(value: int):
...     return value * 2
>>>
>>> # Create a simple instrumentation class
>>> class MinimalInstrument(FlowInstrument):
...     def on_flow_start(self, flow):
...         print(f"Flow started: {flow}")
...
...     def on_flow_end(self, flow):
...         print(f"Flow completed: {flow}")
...
...     @contextmanager
...     def node_lifecycle(self, flow, node, run_level):
...         print(f"Starting node: {node} (run_level: {run_level})")
...         try:
...             yield
...         finally:
...             print(f"Completed node: {node} (run_level: {run_level})")
>>>
>>> # Run the flow with instrumentation
>>> with FlowHDL() as f:
...     f.number = NumberNode()
...     f.double = DoubleNode(f.number)
>>>
>>> with MinimalInstrument():
...     f.run_until_complete()
Flow started: <flowno.core.flow.flow.Flow object at 0x...>
Starting node: NumberNode#... (run_level: 0)
Completed node: NumberNode#... (run_level: 0)
Starting node: DoubleNode#... (run_level: 0)
Completed node: DoubleNode#... (run_level: 0)
Flow completed: <flowno.core.flow.flow.Flow object at 0x...>
>>>
>>> print(f.double.get_data())
(10,)
class flowno.core.flow.instrumentation.FlowInstrument[source]

Base class for Flowno dataflow instrumentation.

This class provides hooks for various flow events, allowing monitoring of node execution, data propagation, and dependency resolution. Subclasses can override specific methods to track different aspects of flow execution.

Key events: - Flow start/end - Node registration and state changes - Data emission and propagation - Dependency resolution steps - Node execution lifecycle

node_lifecycle(flow: Flow, node: ObjectNode, run_level: int)[source]

Context manager for tracking the complete lifecycle of a node evaluation.

This wraps the entire process of a node gathering its inputs, computing results, and producing outputs.

Parameters:
  • flow – The flow containing the node

  • node – The node being evaluated

  • run_level – The run level of the evaluation

on_barrier_node_read(node: ObjectNode, run_level: int)[source]

Context manager for tracking node read barrier events.

A read barrier notifies upstream nodes that their data has been consumed, allowing them to proceed with generating new data.

Parameters:
  • node – The node reading data

  • run_level – The run level of the operation

on_barrier_node_write(flow: Flow, node: ObjectNode, data: tuple[Any, ...], run_level: int)[source]

Context manager for tracking node write barrier events.

A write barrier ensures all downstream nodes have consumed previous data before new data is written, preventing data loss.

Parameters:
  • flow – The flow containing the node

  • node – The node writing data

  • data – The data being written

  • run_level – The run level of the operation

on_defaulted_inputs_set(flow: Flow, node: ObjectNode, defaulted_inputs: list[InputPortIndex]) None[source]

Called when defaulted inputs for a node are set and the stitch levels are incremented.

This is important for cycle resolution, as defaulted inputs break cycles in the flow.

Parameters:
  • flow – The flow containing the node

  • node – The node with defaulted inputs

  • defaulted_inputs – List of input port indices using default values

on_flow_end(flow: Flow) None[source]

Called immediately after the Flow finishes run_until_complete.

Parameters:

flow – The flow that has completed execution

on_flow_start(flow: Flow) None[source]

Called just before the Flow starts running (e.g. in run_until_complete).

Parameters:

flow – The flow that is starting execution

on_node_emitted_data(flow: Flow, node: ObjectNode, data: tuple[Any, ...] | None, run_level: int) None[source]

Called when a node yields or returns data at a particular run level.

Parameters:
  • flow – The flow the node is part of

  • node – The node emitting data

  • data – The data being emitted (tuple or None)

  • run_level – The run level of the data (0=regular, 1=streaming)

on_node_error(flow: Flow, node: ObjectNode, error: Exception) None[source]

Called when a node raises an exception.

Parameters:
  • flow – The flow the node is part of

  • node – The node raising the exception

  • error – The exception that was raised

on_node_generation_limit(flow: Flow, node: ObjectNode, limit: Generation) None[source]

Called if the node hits a user-specified generation limit.

Parameters:
  • flow – The flow the node is part of

  • node – The node hitting its limit

  • limit – The generation limit that was reached

on_node_pause(flow: Flow, node: ObjectNode, run_level: int) None[source]

Called when a node is paused.

Parameters:
  • flow – The flow the node is part of

  • node – The node being paused

  • run_level – The run level at which the node is paused

on_node_registered(flow: Flow, node: ObjectNode) None[source]

Called when a node is first added/registered to the flow.

Parameters:
  • flow – The flow the node is being registered with

  • node – The node being registered

on_node_resumed(flow: Flow, node: ObjectNode, run_level: int) None[source]

Called when a node is resumed at a given run level.

Parameters:
  • flow – The flow resuming the node

  • node – The node being resumed

  • run_level – The run level at which the node is resuming (0=regular, 1=streaming)

on_node_stalled(flow: Flow, node: ObjectNode, stalled_input: FinalizedInputPortRef[Any]) None[source]

Called when a node transitions to ‘Stalled’ status because of a blocked input port.

Parameters:
  • flow – The flow containing the stalled node

  • node – The node that has stalled

  • stalled_input – Reference to the input port causing the stall

on_node_status_change(flow: Flow, node: ObjectNode, old_status: NodeTaskStatus.Type, new_status: NodeTaskStatus.Type) None[source]

Called when the node changes status.

Parameters:
  • flow – The flow the node is part of

  • node – The node changing status

  • old_status – The previous status of the node

  • new_status – The new status of the node

on_node_visited(flow: Flow, node: ObjectNode) None[source]

Called whenever the flow “marks” a node as visited in the resolution queue.

Parameters:
  • flow – The flow executing the node

  • node – The node being marked as visited

on_resolution_queue_get(flow: Flow, node: ObjectNode) None[source]

Called when a node is popped from the resolution queue.

Parameters:
  • flow – The flow managing the resolution queue

  • node – The node being processed from the queue

on_resolution_queue_put(flow: Flow, node: ObjectNode) None[source]

Called when a node is pushed onto the resolution queue.

Parameters:
  • flow – The flow managing the resolution queue

  • node – The node being queued for resolution

on_solving_nodes(flow: Flow, head_node: ObjectNode, solution_nodes: list[ObjectNode]) None[source]

Called when the flow forcibly evaluates a set of leaf dependencies.

This happens after _find_solution_nodes(head_node) returns.

Parameters:
  • flow – The flow solving the dependencies

  • head_node – The node whose dependencies are being solved

  • solution_nodes – The set of nodes that will be evaluated to unblock head_node

on_stream_end(stream: Stream[Any]) None[source]

Called when a Stream has no more items to process.

Parameters:

stream – The stream that has completed

on_stream_error(stream: Stream[Any], error: Exception) None[source]

Called when a Stream encounters an error.

Parameters:
  • stream – The stream encountering an error

  • error – The exception that was raised

on_stream_next(stream: Stream[Any], data: Any) None[source]

Called each time a Stream processes the next item.

Parameters:
  • stream – The stream processing data

  • data – The data item being processed

on_stream_start(stream: Stream[Any]) None[source]

Called when a Stream starts processing.

Parameters:

stream – The stream that is starting

class flowno.core.flow.instrumentation.LogInstrument[source]

A version of PrintInstrument that sends output to the logger instead of stdout.

This instrument uses debug level logging for all messages.

print(msg, *args, **kwargs)

Log ‘msg % args’ with severity ‘DEBUG’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.debug(“Houston, we have a %s”, “thorny problem”, exc_info=True)

class flowno.core.flow.instrumentation.MinimalInstrument[source]

A simplified instrument that only tracks flow start/end and node lifecycle.

This is useful for basic monitoring without excessive output.

node_lifecycle(flow, node, run_level)[source]

Context manager for tracking the complete lifecycle of a node evaluation.

This wraps the entire process of a node gathering its inputs, computing results, and producing outputs.

Parameters:
  • flow – The flow containing the node

  • node – The node being evaluated

  • run_level – The run level of the evaluation

on_flow_end(flow)[source]

Called immediately after the Flow finishes run_until_complete.

Parameters:

flow – The flow that has completed execution

on_flow_start(flow)[source]

Called just before the Flow starts running (e.g. in run_until_complete).

Parameters:

flow – The flow that is starting execution

class flowno.core.flow.instrumentation.PrintInstrument[source]

A concrete implementation of FlowInstrument that prints flow and node execution events.

This instrument is useful for debugging and understanding the execution order of nodes in a flow.

node_lifecycle(flow: Flow, node, run_level: int)[source]

Context manager for tracking the complete lifecycle of a node evaluation.

This wraps the entire process of a node gathering its inputs, computing results, and producing outputs.

Parameters:
  • flow – The flow containing the node

  • node – The node being evaluated

  • run_level – The run level of the evaluation

on_barrier_node_read(node: ObjectNode, run_level: int)[source]

Context manager for tracking node read barrier events.

A read barrier notifies upstream nodes that their data has been consumed, allowing them to proceed with generating new data.

Parameters:
  • node – The node reading data

  • run_level – The run level of the operation

on_barrier_node_write(flow: Flow, node: ObjectNode, data: tuple[Any, ...], run_level: int)[source]

Context manager for tracking node write barrier events.

A write barrier ensures all downstream nodes have consumed previous data before new data is written, preventing data loss.

Parameters:
  • flow – The flow containing the node

  • node – The node writing data

  • data – The data being written

  • run_level – The run level of the operation

on_defaulted_inputs_set(flow: Flow, node: ObjectNode, defaulted_inputs: list[InputPortIndex]) None[source]

Called when defaulted inputs for a node are set and the stitch levels are incremented.

This is important for cycle resolution, as defaulted inputs break cycles in the flow.

Parameters:
  • flow – The flow containing the node

  • node – The node with defaulted inputs

  • defaulted_inputs – List of input port indices using default values

on_flow_end(flow)[source]

Called immediately after the Flow finishes run_until_complete.

Parameters:

flow – The flow that has completed execution

on_flow_start(flow)[source]

Called just before the Flow starts running (e.g. in run_until_complete).

Parameters:

flow – The flow that is starting execution

on_node_emitted_data(flow, node, data, run_level)[source]

Called when a node yields or returns data at a particular run level.

Parameters:
  • flow – The flow the node is part of

  • node – The node emitting data

  • data – The data being emitted (tuple or None)

  • run_level – The run level of the data (0=regular, 1=streaming)

on_node_error(flow, node, error: Exception)[source]

Called when a node raises an exception.

Parameters:
  • flow – The flow the node is part of

  • node – The node raising the exception

  • error – The exception that was raised

on_node_generation_limit(flow, node, limit)[source]

Called if the node hits a user-specified generation limit.

Parameters:
  • flow – The flow the node is part of

  • node – The node hitting its limit

  • limit – The generation limit that was reached

on_node_pause(flow, node, run_level)[source]

Called when a node is paused.

Parameters:
  • flow – The flow the node is part of

  • node – The node being paused

  • run_level – The run level at which the node is paused

on_node_registered(flow, node)[source]

Called when a node is first added/registered to the flow.

Parameters:
  • flow – The flow the node is being registered with

  • node – The node being registered

on_node_resumed(flow, node, run_level)[source]

Called when a node is resumed at a given run level.

Parameters:
  • flow – The flow resuming the node

  • node – The node being resumed

  • run_level – The run level at which the node is resuming (0=regular, 1=streaming)

on_node_stalled(flow, node, stalled_input)[source]

Called when a node transitions to ‘Stalled’ status because of a blocked input port.

Parameters:
  • flow – The flow containing the stalled node

  • node – The node that has stalled

  • stalled_input – Reference to the input port causing the stall

on_node_status_change(flow: Flow, node: ObjectNode, old_status: NodeTaskStatus.Type, new_status: NodeTaskStatus.Type) None[source]

Called when the node changes status.

Parameters:
  • flow – The flow the node is part of

  • node – The node changing status

  • old_status – The previous status of the node

  • new_status – The new status of the node

on_node_visited(flow, node)[source]

Called whenever the flow “marks” a node as visited in the resolution queue.

Parameters:
  • flow – The flow executing the node

  • node – The node being marked as visited

on_resolution_queue_get(flow, node)[source]

Called when a node is popped from the resolution queue.

Parameters:
  • flow – The flow managing the resolution queue

  • node – The node being processed from the queue

on_resolution_queue_put(flow, node)[source]

Called when a node is pushed onto the resolution queue.

Parameters:
  • flow – The flow managing the resolution queue

  • node – The node being queued for resolution

on_solving_nodes(flow, head_node, solution_nodes)[source]

Called when the flow forcibly evaluates a set of leaf dependencies.

This happens after _find_solution_nodes(head_node) returns.

Parameters:
  • flow – The flow solving the dependencies

  • head_node – The node whose dependencies are being solved

  • solution_nodes – The set of nodes that will be evaluated to unblock head_node

on_stream_end(stream)[source]

Called when a Stream has no more items to process.

Parameters:

stream – The stream that has completed

on_stream_error(stream, error: Exception)[source]

Called when a Stream encounters an error.

Parameters:
  • stream – The stream encountering an error

  • error – The exception that was raised

on_stream_next(stream, data)[source]

Called each time a Stream processes the next item.

Parameters:
  • stream – The stream processing data

  • data – The data item being processed

on_stream_start(stream)[source]

Called when a Stream starts processing.

Parameters:

stream – The stream that is starting

print(*args, sep=' ', end='\n', file=None, flush=False)

Prints the values to a stream, or to sys.stdout by default.

sep

string inserted between values, default a space.

end

string appended after the last value, default a newline.

file

a file-like object (stream); defaults to the current sys.stdout.

flush

whether to forcibly flush the stream.

flowno.core.flow.instrumentation.get_current_flow_instrument() FlowInstrument[source]

Get the current flow instrumentation context.

Returns:

The currently active flow instrument or a no-op instrument if none is active