flowno.core.flow.instrumentation
Instrumentation system for Flowno’s dataflow execution.
This module provides tools for monitoring and debugging dataflow graph execution, including node evaluation, data propagation, and cycle resolution. It uses a context manager pattern similar to the event loop instrumentation, allowing different monitoring tools to be applied to specific flows.
Example
>>> from flowno import node, FlowHDL
>>> from flowno.core.flow.instrumentation import MinimalInstrument
>>>
>>> # Define a simple flow with two nodes
>>> @node
... async def NumberNode(value: int = 5):
... return value
>>>
>>> @node
... async def DoubleNode(value: int):
... return value * 2
>>>
>>> # Create a simple instrumentation class
>>> class MinimalInstrument(FlowInstrument):
... def on_flow_start(self, flow):
... print(f"Flow started: {flow}")
...
... def on_flow_end(self, flow):
... print(f"Flow completed: {flow}")
...
... @contextmanager
... def node_lifecycle(self, flow, node, run_level):
... print(f"Starting node: {node} (run_level: {run_level})")
... try:
... yield
... finally:
... print(f"Completed node: {node} (run_level: {run_level})")
>>>
>>> # Run the flow with instrumentation
>>> with FlowHDL() as f:
... f.number = NumberNode()
... f.double = DoubleNode(f.number)
>>>
>>> with MinimalInstrument():
... f.run_until_complete()
Flow started: <flowno.core.flow.flow.Flow object at 0x...>
Starting node: NumberNode#... (run_level: 0)
Completed node: NumberNode#... (run_level: 0)
Starting node: DoubleNode#... (run_level: 0)
Completed node: DoubleNode#... (run_level: 0)
Flow completed: <flowno.core.flow.flow.Flow object at 0x...>
>>>
>>> print(f.double.get_data())
(10,)
- class flowno.core.flow.instrumentation.FlowInstrument[source]
Base class for Flowno dataflow instrumentation.
This class provides hooks for various flow events, allowing monitoring of node execution, data propagation, and dependency resolution. Subclasses can override specific methods to track different aspects of flow execution.
Key events: - Flow start/end - Node registration and state changes - Data emission and propagation - Dependency resolution steps - Node execution lifecycle
- node_lifecycle(flow: Flow, node: ObjectNode, run_level: int)[source]
Context manager for tracking the complete lifecycle of a node evaluation.
This wraps the entire process of a node gathering its inputs, computing results, and producing outputs.
- Parameters:
flow – The flow containing the node
node – The node being evaluated
run_level – The run level of the evaluation
- on_barrier_node_read(node: ObjectNode, run_level: int)[source]
Context manager for tracking node read barrier events.
A read barrier notifies upstream nodes that their data has been consumed, allowing them to proceed with generating new data.
- Parameters:
node – The node reading data
run_level – The run level of the operation
- on_barrier_node_write(flow: Flow, node: ObjectNode, data: tuple[Any, ...], run_level: int)[source]
Context manager for tracking node write barrier events.
A write barrier ensures all downstream nodes have consumed previous data before new data is written, preventing data loss.
- Parameters:
flow – The flow containing the node
node – The node writing data
data – The data being written
run_level – The run level of the operation
- on_defaulted_inputs_set(flow: Flow, node: ObjectNode, defaulted_inputs: list[InputPortIndex]) None [source]
Called when defaulted inputs for a node are set and the stitch levels are incremented.
This is important for cycle resolution, as defaulted inputs break cycles in the flow.
- Parameters:
flow – The flow containing the node
node – The node with defaulted inputs
defaulted_inputs – List of input port indices using default values
- on_flow_end(flow: Flow) None [source]
Called immediately after the Flow finishes run_until_complete.
- Parameters:
flow – The flow that has completed execution
- on_flow_start(flow: Flow) None [source]
Called just before the Flow starts running (e.g. in run_until_complete).
- Parameters:
flow – The flow that is starting execution
- on_node_emitted_data(flow: Flow, node: ObjectNode, data: tuple[Any, ...] | None, run_level: int) None [source]
Called when a node yields or returns data at a particular run level.
- Parameters:
flow – The flow the node is part of
node – The node emitting data
data – The data being emitted (tuple or None)
run_level – The run level of the data (0=regular, 1=streaming)
- on_node_error(flow: Flow, node: ObjectNode, error: Exception) None [source]
Called when a node raises an exception.
- Parameters:
flow – The flow the node is part of
node – The node raising the exception
error – The exception that was raised
- on_node_generation_limit(flow: Flow, node: ObjectNode, limit: Generation) None [source]
Called if the node hits a user-specified generation limit.
- Parameters:
flow – The flow the node is part of
node – The node hitting its limit
limit – The generation limit that was reached
- on_node_pause(flow: Flow, node: ObjectNode, run_level: int) None [source]
Called when a node is paused.
- Parameters:
flow – The flow the node is part of
node – The node being paused
run_level – The run level at which the node is paused
- on_node_registered(flow: Flow, node: ObjectNode) None [source]
Called when a node is first added/registered to the flow.
- Parameters:
flow – The flow the node is being registered with
node – The node being registered
- on_node_resumed(flow: Flow, node: ObjectNode, run_level: int) None [source]
Called when a node is resumed at a given run level.
- Parameters:
flow – The flow resuming the node
node – The node being resumed
run_level – The run level at which the node is resuming (0=regular, 1=streaming)
- on_node_stalled(flow: Flow, node: ObjectNode, stalled_input: FinalizedInputPortRef[Any]) None [source]
Called when a node transitions to ‘Stalled’ status because of a blocked input port.
- Parameters:
flow – The flow containing the stalled node
node – The node that has stalled
stalled_input – Reference to the input port causing the stall
- on_node_status_change(flow: Flow, node: ObjectNode, old_status: NodeTaskStatus.Type, new_status: NodeTaskStatus.Type) None [source]
Called when the node changes status.
- Parameters:
flow – The flow the node is part of
node – The node changing status
old_status – The previous status of the node
new_status – The new status of the node
- on_node_visited(flow: Flow, node: ObjectNode) None [source]
Called whenever the flow “marks” a node as visited in the resolution queue.
- Parameters:
flow – The flow executing the node
node – The node being marked as visited
- on_resolution_queue_get(flow: Flow, node: ObjectNode) None [source]
Called when a node is popped from the resolution queue.
- Parameters:
flow – The flow managing the resolution queue
node – The node being processed from the queue
- on_resolution_queue_put(flow: Flow, node: ObjectNode) None [source]
Called when a node is pushed onto the resolution queue.
- Parameters:
flow – The flow managing the resolution queue
node – The node being queued for resolution
- on_solving_nodes(flow: Flow, head_node: ObjectNode, solution_nodes: list[ObjectNode]) None [source]
Called when the flow forcibly evaluates a set of leaf dependencies.
This happens after _find_solution_nodes(head_node) returns.
- Parameters:
flow – The flow solving the dependencies
head_node – The node whose dependencies are being solved
solution_nodes – The set of nodes that will be evaluated to unblock head_node
- on_stream_end(stream: Stream[Any]) None [source]
Called when a Stream has no more items to process.
- Parameters:
stream – The stream that has completed
- on_stream_error(stream: Stream[Any], error: Exception) None [source]
Called when a Stream encounters an error.
- Parameters:
stream – The stream encountering an error
error – The exception that was raised
- class flowno.core.flow.instrumentation.LogInstrument[source]
A version of PrintInstrument that sends output to the logger instead of stdout.
This instrument uses debug level logging for all messages.
- print(msg, *args, **kwargs)
Log ‘msg % args’ with severity ‘DEBUG’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.debug(“Houston, we have a %s”, “thorny problem”, exc_info=True)
- class flowno.core.flow.instrumentation.MinimalInstrument[source]
A simplified instrument that only tracks flow start/end and node lifecycle.
This is useful for basic monitoring without excessive output.
- node_lifecycle(flow, node, run_level)[source]
Context manager for tracking the complete lifecycle of a node evaluation.
This wraps the entire process of a node gathering its inputs, computing results, and producing outputs.
- Parameters:
flow – The flow containing the node
node – The node being evaluated
run_level – The run level of the evaluation
- class flowno.core.flow.instrumentation.PrintInstrument[source]
A concrete implementation of FlowInstrument that prints flow and node execution events.
This instrument is useful for debugging and understanding the execution order of nodes in a flow.
- node_lifecycle(flow: Flow, node, run_level: int)[source]
Context manager for tracking the complete lifecycle of a node evaluation.
This wraps the entire process of a node gathering its inputs, computing results, and producing outputs.
- Parameters:
flow – The flow containing the node
node – The node being evaluated
run_level – The run level of the evaluation
- on_barrier_node_read(node: ObjectNode, run_level: int)[source]
Context manager for tracking node read barrier events.
A read barrier notifies upstream nodes that their data has been consumed, allowing them to proceed with generating new data.
- Parameters:
node – The node reading data
run_level – The run level of the operation
- on_barrier_node_write(flow: Flow, node: ObjectNode, data: tuple[Any, ...], run_level: int)[source]
Context manager for tracking node write barrier events.
A write barrier ensures all downstream nodes have consumed previous data before new data is written, preventing data loss.
- Parameters:
flow – The flow containing the node
node – The node writing data
data – The data being written
run_level – The run level of the operation
- on_defaulted_inputs_set(flow: Flow, node: ObjectNode, defaulted_inputs: list[InputPortIndex]) None [source]
Called when defaulted inputs for a node are set and the stitch levels are incremented.
This is important for cycle resolution, as defaulted inputs break cycles in the flow.
- Parameters:
flow – The flow containing the node
node – The node with defaulted inputs
defaulted_inputs – List of input port indices using default values
- on_flow_end(flow)[source]
Called immediately after the Flow finishes run_until_complete.
- Parameters:
flow – The flow that has completed execution
- on_flow_start(flow)[source]
Called just before the Flow starts running (e.g. in run_until_complete).
- Parameters:
flow – The flow that is starting execution
- on_node_emitted_data(flow, node, data, run_level)[source]
Called when a node yields or returns data at a particular run level.
- Parameters:
flow – The flow the node is part of
node – The node emitting data
data – The data being emitted (tuple or None)
run_level – The run level of the data (0=regular, 1=streaming)
- on_node_error(flow, node, error: Exception)[source]
Called when a node raises an exception.
- Parameters:
flow – The flow the node is part of
node – The node raising the exception
error – The exception that was raised
- on_node_generation_limit(flow, node, limit)[source]
Called if the node hits a user-specified generation limit.
- Parameters:
flow – The flow the node is part of
node – The node hitting its limit
limit – The generation limit that was reached
- on_node_pause(flow, node, run_level)[source]
Called when a node is paused.
- Parameters:
flow – The flow the node is part of
node – The node being paused
run_level – The run level at which the node is paused
- on_node_registered(flow, node)[source]
Called when a node is first added/registered to the flow.
- Parameters:
flow – The flow the node is being registered with
node – The node being registered
- on_node_resumed(flow, node, run_level)[source]
Called when a node is resumed at a given run level.
- Parameters:
flow – The flow resuming the node
node – The node being resumed
run_level – The run level at which the node is resuming (0=regular, 1=streaming)
- on_node_stalled(flow, node, stalled_input)[source]
Called when a node transitions to ‘Stalled’ status because of a blocked input port.
- Parameters:
flow – The flow containing the stalled node
node – The node that has stalled
stalled_input – Reference to the input port causing the stall
- on_node_status_change(flow: Flow, node: ObjectNode, old_status: NodeTaskStatus.Type, new_status: NodeTaskStatus.Type) None [source]
Called when the node changes status.
- Parameters:
flow – The flow the node is part of
node – The node changing status
old_status – The previous status of the node
new_status – The new status of the node
- on_node_visited(flow, node)[source]
Called whenever the flow “marks” a node as visited in the resolution queue.
- Parameters:
flow – The flow executing the node
node – The node being marked as visited
- on_resolution_queue_get(flow, node)[source]
Called when a node is popped from the resolution queue.
- Parameters:
flow – The flow managing the resolution queue
node – The node being processed from the queue
- on_resolution_queue_put(flow, node)[source]
Called when a node is pushed onto the resolution queue.
- Parameters:
flow – The flow managing the resolution queue
node – The node being queued for resolution
- on_solving_nodes(flow, head_node, solution_nodes)[source]
Called when the flow forcibly evaluates a set of leaf dependencies.
This happens after _find_solution_nodes(head_node) returns.
- Parameters:
flow – The flow solving the dependencies
head_node – The node whose dependencies are being solved
solution_nodes – The set of nodes that will be evaluated to unblock head_node
- on_stream_end(stream)[source]
Called when a Stream has no more items to process.
- Parameters:
stream – The stream that has completed
- on_stream_error(stream, error: Exception)[source]
Called when a Stream encounters an error.
- Parameters:
stream – The stream encountering an error
error – The exception that was raised
- on_stream_next(stream, data)[source]
Called each time a Stream processes the next item.
- Parameters:
stream – The stream processing data
data – The data item being processed
- on_stream_start(stream)[source]
Called when a Stream starts processing.
- Parameters:
stream – The stream that is starting
- print(*args, sep=' ', end='\n', file=None, flush=False)
Prints the values to a stream, or to sys.stdout by default.
- sep
string inserted between values, default a space.
- end
string appended after the last value, default a newline.
- file
a file-like object (stream); defaults to the current sys.stdout.
- flush
whether to forcibly flush the stream.
- flowno.core.flow.instrumentation.get_current_flow_instrument() FlowInstrument [source]
Get the current flow instrumentation context.
- Returns:
The currently active flow instrument or a no-op instrument if none is active