flowno.core.event_loop.selectors

Socket abstraction for the Flowno event loop.

This module provides socket wrappers that integrate with Flowno’s event loop system, enabling non-blocking socket operations through coroutines. The wrappers allow socket operations to be suspended and resumed by yielding command objects that the event loop processes.

Key features:
  • Non-blocking socket operations through coroutines

  • Support for both plain and TLS/SSL sockets

  • Integration with Flowno’s instrumentation system

  • Compatible with Flowno’s event loop commands

Example usage:
>>> from flowno import socket, spawn, sleep, EventLoop
>>>
>>> # Server coroutine
>>> async def echo_server():
...     # Create a server socket
...     server_sock = socket()
...     server_sock.bind(('localhost', 8888))
...     server_sock.listen(1)
...     print("Server: Listening on port 8888")
...
...     # Accept client connection (non-blocking)
...     client_sock, addr = await server_sock.accept()
...     print(f"Server: Client connected from {addr}")
...
...     # Receive data from client (non-blocking)
...     data = await client_sock.recv(1024)
...     print(f"Server: Received: {data.decode()}")
...
...     # Send response (non-blocking)
...     await client_sock.sendAll(b"Echo: " + data)
...     print("Server: Response sent")
>>>
>>> # Client coroutine
>>> async def echo_client():
...     # Give the server time to start
...     await sleep(0.1)
...
...     # Create a client socket
...     client_sock = socket()
...
...     # Connect to server (non-blocking)
...     client_sock.connect(('localhost', 8888))
...     print("Client: Connected to server")
...
...     # Send data (non-blocking)
...     message = b"Hello, world!"
...     await client_sock.sendAll(message)
...     print(f"Client: Sent: {message.decode()}")
...
...     # Receive response (non-blocking)
...     response = await client_sock.recv(1024)
...     print(f"Client: Received: {response.decode()}")
>>>
>>> # Main coroutine that coordinates server and client
>>> async def main():
...     # Start the server
...     server_task = await spawn(echo_server())
...
...     # Start the client
...     client_task = await spawn(echo_client())
...
...     # Wait for both tasks to complete
...     await server_task.join()
...     await client_task.join()
...
...     print("Main: All tasks completed")
>>>
>>> # Run the example
>>> event_loop = EventLoop()
>>> event_loop.run_until_complete(main(), join=True)
Server: Listening on port 8888
Client: Connected to server
Client: Sent: Hello, world!
Server: Client connected from ('127.0.0.1', ...)
Server: Received: Hello, world!
Server: Response sent
Client: Received: Echo: Hello, world!
Main: All tasks completed
class flowno.core.event_loop.selectors.SocketHandle(socket: socket)[source]

Wrapper around the built-in socket object.

This class provides methods that integrate with Flowno’s event loop, allowing socket operations to be performed asynchronously.

accept() Generator[SocketAcceptCommand, None, tuple[SocketHandle, tuple[Any, ...] | str]][source]

Accept a connection on a listening socket.

This coroutine yields a SocketAcceptCommand for the event loop to process. When the event loop detects an incoming connection, it resumes this coroutine.

Returns:

A tuple containing a new SocketHandle for the client connection and the client’s address.

bind(address: tuple[Any, ...] | str, /) None[source]

Bind the socket to the specified address.

Parameters:

address – The address (host, port) to bind to.

connect(address: tuple[Any, ...] | str, /) None[source]

Connect to a remote socket at the specified address.

This is a blocking operation. For non-blocking connections, use the socket primitives from the flowno module.

Parameters:

address – The address to connect to (host, port).

listen(backlog: int | None = None, /) None[source]

Enable a server socket to accept connections.

Parameters:

backlog – The number of unaccepted connections the system will allow before refusing new connections.

recv(bufsize: int) Generator[SocketRecvCommand, None, bytes][source]

Receive data from the socket.

This coroutine yields a SocketRecvCommand for the event loop to process. When data is available to read, the event loop resumes this coroutine.

Parameters:

bufsize – The maximum number of bytes to receive.

Returns:

The bytes received from the socket.

send(data: bytes) Generator[SocketSendCommand, None, int][source]

Send data to the socket.

Unlike sendAll, this sends data once and returns the number of bytes sent.

Parameters:

data – The bytes to send.

Returns:

The number of bytes sent.

sendAll(data: bytes) Generator[SocketSendCommand, None, None][source]

Send all data to the socket.

This coroutine continues yielding SocketSendCommand until all data is sent.

Parameters:

data – The bytes to send.

class flowno.core.event_loop.selectors.TLSSocketHandle(socket: socket, ssl_context: SSLContext, server_hostname: str | None)[source]

Wrapper around the built-in ssl socket object.

This class extends SocketHandle to provide TLS/SSL support.

connect(address: tuple[Any, ...] | str, /) None[source]

Connect to a remote socket and establish TLS/SSL connection.

Parameters:

address – The address to connect to (host, port).

flowno.core.event_loop.selectors.sel = <selectors.EpollSelector object>

Default selector used by the event loop for socket operations. This selector efficiently monitors multiple socket objects for I/O events.