flowno.core.event_loop.instrumentation

Event Loop Instrumentation System.

This module provides a flexible instrumentation system for monitoring and debugging the Flowno event loop. It captures metrics and events for socket operations, queue operations, and other event loop activities.

The instrumentation system uses a context manager pattern, allowing multiple instrumentation hooks to be applied within different scopes of code.

Example

>>> from flowno.core.event_loop.event_loop import EventLoop
>>> from flowno.core.event_loop.instrumentation import PrintInstrument
>>> from flowno.core.event_loop.queues import AsyncQueue
>>>
>>> # Define a simple coroutine that uses a queue
>>> async def queue_example():
...     queue = AsyncQueue()
...     # Put an item into the queue
...     await queue.put(42)
...     # Get the item back from the queue
...     value = await queue.get()
...     return value
>>>
>>> # Create an event loop and run with instrumentation
>>> event_loop = EventLoop()
>>> with PrintInstrument():
...     result = event_loop.run_until_complete(queue_example(), join=True)
...
[QUEUE-PUT] Item put in queue: 42
[QUEUE-GET] Item retrieved from queue: 42
>>> print(result)
42
class flowno.core.event_loop.instrumentation.EventLoopInstrument[source]

Base class for event loop instrumentation.

This class provides hooks for various event loop operations. Subclasses can override these methods to implement custom monitoring or logging.

on_queue_get(queue: AsyncQueue[T], item: T, immediate: bool) None[source]

Called every time a queue completes a get command.

If the task blocks on an empty queue, this callback fires when an items is available and returned.

Parameters:
  • queue (AsyncQueue[T]) – The queue which the item was popped.

  • item (T) – The item that was popped from the queue.

on_queue_put(queue: AsyncQueue[T], item: T, immediate: bool) None[source]

Called every time a queue processes a put command.

If the task blocks on a full queue, this callback fires when the queue accepts the item.

Parameters:
  • queue (AsyncQueue[T]) – The queue which the item was added to.

  • item (T) – The item that was added to the queue.

on_socket_accept_ready(metadata: ReadySocketInstrumentationMetadata) None[source]

Called when a socket accept operation completes.

Parameters:

metadata – Socket operation metadata including duration

on_socket_accept_start(metadata: InstrumentationMetadata) None[source]

Called when a task starts waiting on a socket accept.

Parameters:

metadata – Socket operation metadata

on_socket_close(metadata: InstrumentationMetadata) None[source]

Called when a socket connection is closed.

Parameters:

metadata – Socket operation metadata

on_socket_connect_ready(metadata: SocketConnectReadyMetadata) None[source]

Called when a socket connection has been established.

Parameters:

metadata – Connection metadata including duration and target address

on_socket_connect_start(metadata: SocketConnectStartMetadata) None[source]

Called when a socket connection is initiated.

Parameters:

metadata – Connection metadata including target address

on_socket_recv_data(metadata: SocketRecvDataMetadata) None[source]

Called immediately after the actual bytes have been read.

Parameters:

metadata – Metadata including the received bytes

on_socket_recv_ready(metadata: ReadySocketInstrumentationMetadata) None[source]

Called when the socket is ready for reading.

Parameters:

metadata – Socket operation metadata including duration

on_socket_recv_start(metadata: InstrumentationMetadata) None[source]

Called when a task starts waiting on a socket recv.

Parameters:

metadata – Socket operation metadata

on_socket_send_ready(metadata: ReadySocketInstrumentationMetadata) None[source]

Called when the socket send operation completes.

Parameters:

metadata – Socket operation metadata including duration

on_socket_send_start(metadata: InstrumentationMetadata) None[source]

Called when a task starts waiting on a socket send.

Parameters:

metadata – Socket operation metadata

class flowno.core.event_loop.instrumentation.InstrumentationMetadata(_task: RawTask[Any, None, None], _command: Command, socket_handle: SocketHandle, immediate: bool = False, start_time: float = <factory>)[source]

Base metadata class for instrumentation events.

class flowno.core.event_loop.instrumentation.LogInstrument[source]

Event loop instrument that logs information using the logging module.

This instrument uses the debug log level for all messages.

print(msg, *args, **kwargs)

Log ‘msg % args’ with severity ‘DEBUG’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.debug(“Houston, we have a %s”, “thorny problem”, exc_info=True)

class flowno.core.event_loop.instrumentation.PrintInstrument[source]

Event loop instrument that prints information to stdout.

This instrument outputs detailed information about socket operations.

on_socket_close(metadata: InstrumentationMetadata) None[source]

Called when a socket connection is closed.

Parameters:

metadata – Socket operation metadata

on_socket_connect_ready(metadata: SocketConnectReadyMetadata) None[source]

Called when a socket connection has been established.

Parameters:

metadata – Connection metadata including duration and target address

on_socket_connect_start(metadata: SocketConnectStartMetadata) None[source]

Called when a socket connection is initiated.

Parameters:

metadata – Connection metadata including target address

on_socket_recv_data(metadata: SocketRecvDataMetadata) None[source]

Called immediately after the actual bytes have been read.

Parameters:

metadata – Metadata including the received bytes

on_socket_recv_ready(metadata: ReadySocketInstrumentationMetadata) None[source]

Called when the socket is ready for reading.

Parameters:

metadata – Socket operation metadata including duration

on_socket_recv_start(metadata: InstrumentationMetadata) None[source]

Called when a task starts waiting on a socket recv.

Parameters:

metadata – Socket operation metadata

on_socket_send_ready(metadata: ReadySocketInstrumentationMetadata) None[source]

Called when the socket send operation completes.

Parameters:

metadata – Socket operation metadata including duration

on_socket_send_start(metadata: InstrumentationMetadata) None[source]

Called when a task starts waiting on a socket send.

Parameters:

metadata – Socket operation metadata

print(*args, sep=' ', end='\n', file=None, flush=False)

Prints the values to a stream, or to sys.stdout by default.

sep

string inserted between values, default a space.

end

string appended after the last value, default a newline.

file

a file-like object (stream); defaults to the current sys.stdout.

flush

whether to forcibly flush the stream.

class flowno.core.event_loop.instrumentation.ReadySocketInstrumentationMetadata(_task: RawTask[Any, None, None], _command: Command, socket_handle: SocketHandle, immediate: bool = False, start_time: float = <factory>, finish_time: float = <factory>)[source]

Metadata for socket operations that have completed.

class flowno.core.event_loop.instrumentation.SocketConnectReadyMetadata(socket_handle: SocketHandle, target_address: _Address, immediate: bool = False, start_time: float = <factory>, finish_time: float = <factory>)[source]

Metadata for completed socket connections.

class flowno.core.event_loop.instrumentation.SocketConnectStartMetadata(socket_handle: SocketHandle, target_address: _Address, immediate: bool = False, start_time: float = <factory>)[source]

Metadata for socket connection initiation.

class flowno.core.event_loop.instrumentation.SocketRecvDataMetadata(socket_handle: SocketHandle, data: bytes)[source]

Metadata for received socket data.

flowno.core.event_loop.instrumentation.get_current_instrument() EventLoopInstrument[source]

Get the current instrumentation context.

Returns:

The currently active instrument or an empty instrument if none is active.