flowno.core.event_loop.instrumentation
Event Loop Instrumentation System.
This module provides a flexible instrumentation system for monitoring and debugging the Flowno event loop. It captures metrics and events for socket operations, queue operations, and other event loop activities.
The instrumentation system uses a context manager pattern, allowing multiple instrumentation hooks to be applied within different scopes of code.
Example
>>> from flowno.core.event_loop.event_loop import EventLoop
>>> from flowno.core.event_loop.instrumentation import PrintInstrument
>>> from flowno.core.event_loop.queues import AsyncQueue
>>>
>>> # Define a simple coroutine that uses a queue
>>> async def queue_example():
... queue = AsyncQueue()
... # Put an item into the queue
... await queue.put(42)
... # Get the item back from the queue
... value = await queue.get()
... return value
>>>
>>> # Create an event loop and run with instrumentation
>>> event_loop = EventLoop()
>>> with PrintInstrument():
... result = event_loop.run_until_complete(queue_example(), join=True)
...
[QUEUE-PUT] Item put in queue: 42
[QUEUE-GET] Item retrieved from queue: 42
>>> print(result)
42
- class flowno.core.event_loop.instrumentation.EventLoopInstrument[source]
Base class for event loop instrumentation.
This class provides hooks for various event loop operations. Subclasses can override these methods to implement custom monitoring or logging.
- on_queue_get(queue: AsyncQueue[T], item: T, immediate: bool) None [source]
Called every time a queue completes a get command.
If the task blocks on an empty queue, this callback fires when an items is available and returned.
- Parameters:
queue (AsyncQueue[T]) – The queue which the item was popped.
item (T) – The item that was popped from the queue.
- on_queue_put(queue: AsyncQueue[T], item: T, immediate: bool) None [source]
Called every time a queue processes a put command.
If the task blocks on a full queue, this callback fires when the queue accepts the item.
- Parameters:
queue (AsyncQueue[T]) – The queue which the item was added to.
item (T) – The item that was added to the queue.
- on_socket_accept_ready(metadata: ReadySocketInstrumentationMetadata) None [source]
Called when a socket accept operation completes.
- Parameters:
metadata – Socket operation metadata including duration
- on_socket_accept_start(metadata: InstrumentationMetadata) None [source]
Called when a task starts waiting on a socket accept.
- Parameters:
metadata – Socket operation metadata
- on_socket_close(metadata: InstrumentationMetadata) None [source]
Called when a socket connection is closed.
- Parameters:
metadata – Socket operation metadata
- on_socket_connect_ready(metadata: SocketConnectReadyMetadata) None [source]
Called when a socket connection has been established.
- Parameters:
metadata – Connection metadata including duration and target address
- on_socket_connect_start(metadata: SocketConnectStartMetadata) None [source]
Called when a socket connection is initiated.
- Parameters:
metadata – Connection metadata including target address
- on_socket_recv_data(metadata: SocketRecvDataMetadata) None [source]
Called immediately after the actual bytes have been read.
- Parameters:
metadata – Metadata including the received bytes
- on_socket_recv_ready(metadata: ReadySocketInstrumentationMetadata) None [source]
Called when the socket is ready for reading.
- Parameters:
metadata – Socket operation metadata including duration
- on_socket_recv_start(metadata: InstrumentationMetadata) None [source]
Called when a task starts waiting on a socket recv.
- Parameters:
metadata – Socket operation metadata
- on_socket_send_ready(metadata: ReadySocketInstrumentationMetadata) None [source]
Called when the socket send operation completes.
- Parameters:
metadata – Socket operation metadata including duration
- on_socket_send_start(metadata: InstrumentationMetadata) None [source]
Called when a task starts waiting on a socket send.
- Parameters:
metadata – Socket operation metadata
- class flowno.core.event_loop.instrumentation.InstrumentationMetadata(_task: RawTask[Any, None, None], _command: Command, socket_handle: SocketHandle, immediate: bool = False, start_time: float = <factory>)[source]
Base metadata class for instrumentation events.
- class flowno.core.event_loop.instrumentation.LogInstrument[source]
Event loop instrument that logs information using the logging module.
This instrument uses the debug log level for all messages.
- print(msg, *args, **kwargs)
Log ‘msg % args’ with severity ‘DEBUG’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.debug(“Houston, we have a %s”, “thorny problem”, exc_info=True)
- class flowno.core.event_loop.instrumentation.PrintInstrument[source]
Event loop instrument that prints information to stdout.
This instrument outputs detailed information about socket operations.
- on_socket_close(metadata: InstrumentationMetadata) None [source]
Called when a socket connection is closed.
- Parameters:
metadata – Socket operation metadata
- on_socket_connect_ready(metadata: SocketConnectReadyMetadata) None [source]
Called when a socket connection has been established.
- Parameters:
metadata – Connection metadata including duration and target address
- on_socket_connect_start(metadata: SocketConnectStartMetadata) None [source]
Called when a socket connection is initiated.
- Parameters:
metadata – Connection metadata including target address
- on_socket_recv_data(metadata: SocketRecvDataMetadata) None [source]
Called immediately after the actual bytes have been read.
- Parameters:
metadata – Metadata including the received bytes
- on_socket_recv_ready(metadata: ReadySocketInstrumentationMetadata) None [source]
Called when the socket is ready for reading.
- Parameters:
metadata – Socket operation metadata including duration
- on_socket_recv_start(metadata: InstrumentationMetadata) None [source]
Called when a task starts waiting on a socket recv.
- Parameters:
metadata – Socket operation metadata
- on_socket_send_ready(metadata: ReadySocketInstrumentationMetadata) None [source]
Called when the socket send operation completes.
- Parameters:
metadata – Socket operation metadata including duration
- on_socket_send_start(metadata: InstrumentationMetadata) None [source]
Called when a task starts waiting on a socket send.
- Parameters:
metadata – Socket operation metadata
- print(*args, sep=' ', end='\n', file=None, flush=False)
Prints the values to a stream, or to sys.stdout by default.
- sep
string inserted between values, default a space.
- end
string appended after the last value, default a newline.
- file
a file-like object (stream); defaults to the current sys.stdout.
- flush
whether to forcibly flush the stream.
- class flowno.core.event_loop.instrumentation.ReadySocketInstrumentationMetadata(_task: RawTask[Any, None, None], _command: Command, socket_handle: SocketHandle, immediate: bool = False, start_time: float = <factory>, finish_time: float = <factory>)[source]
Metadata for socket operations that have completed.
- class flowno.core.event_loop.instrumentation.SocketConnectReadyMetadata(socket_handle: SocketHandle, target_address: _Address, immediate: bool = False, start_time: float = <factory>, finish_time: float = <factory>)[source]
Metadata for completed socket connections.
- class flowno.core.event_loop.instrumentation.SocketConnectStartMetadata(socket_handle: SocketHandle, target_address: _Address, immediate: bool = False, start_time: float = <factory>)[source]
Metadata for socket connection initiation.
- class flowno.core.event_loop.instrumentation.SocketRecvDataMetadata(socket_handle: SocketHandle, data: bytes)[source]
Metadata for received socket data.
- flowno.core.event_loop.instrumentation.get_current_instrument() EventLoopInstrument [source]
Get the current instrumentation context.
- Returns:
The currently active instrument or an empty instrument if none is active.