flowno.core.event_loop.selectors
Socket abstraction for the Flowno event loop.
This module provides socket wrappers that integrate with Flowno’s event loop system, enabling non-blocking socket operations through coroutines. The wrappers allow socket operations to be suspended and resumed by yielding command objects that the event loop processes.
- Key features:
Non-blocking socket operations through coroutines
Support for both plain and TLS/SSL sockets
Integration with Flowno’s instrumentation system
Compatible with Flowno’s event loop commands
- Example usage:
>>> from flowno import socket, spawn, sleep, EventLoop >>> >>> # Server coroutine >>> async def echo_server(): ... # Create a server socket ... server_sock = socket() ... server_sock.bind(('localhost', 8888)) ... server_sock.listen(1) ... print("Server: Listening on port 8888") ... ... # Accept client connection (non-blocking) ... client_sock, addr = await server_sock.accept() ... print(f"Server: Client connected from {addr}") ... ... # Receive data from client (non-blocking) ... data = await client_sock.recv(1024) ... print(f"Server: Received: {data.decode()}") ... ... # Send response (non-blocking) ... await client_sock.sendAll(b"Echo: " + data) ... print("Server: Response sent") >>> >>> # Client coroutine >>> async def echo_client(): ... # Give the server time to start ... await sleep(0.1) ... ... # Create a client socket ... client_sock = socket() ... ... # Connect to server (non-blocking) ... client_sock.connect(('localhost', 8888)) ... print("Client: Connected to server") ... ... # Send data (non-blocking) ... message = b"Hello, world!" ... await client_sock.sendAll(message) ... print(f"Client: Sent: {message.decode()}") ... ... # Receive response (non-blocking) ... response = await client_sock.recv(1024) ... print(f"Client: Received: {response.decode()}") >>> >>> # Main coroutine that coordinates server and client >>> async def main(): ... # Start the server ... server_task = await spawn(echo_server()) ... ... # Start the client ... client_task = await spawn(echo_client()) ... ... # Wait for both tasks to complete ... await server_task.join() ... await client_task.join() ... ... print("Main: All tasks completed") >>> >>> # Run the example >>> event_loop = EventLoop() >>> event_loop.run_until_complete(main(), join=True) Server: Listening on port 8888 Client: Connected to server Client: Sent: Hello, world! Server: Client connected from ('127.0.0.1', ...) Server: Received: Hello, world! Server: Response sent Client: Received: Echo: Hello, world! Main: All tasks completed
- class flowno.core.event_loop.selectors.SocketHandle(socket: socket)[source]
Wrapper around the built-in socket object.
This class provides methods that integrate with Flowno’s event loop, allowing socket operations to be performed asynchronously.
- accept() Generator[SocketAcceptCommand, None, tuple[SocketHandle, tuple[Any, ...] | str]] [source]
Accept a connection on a listening socket.
This coroutine yields a SocketAcceptCommand for the event loop to process. When the event loop detects an incoming connection, it resumes this coroutine.
- Returns:
A tuple containing a new SocketHandle for the client connection and the client’s address.
- bind(address: tuple[Any, ...] | str, /) None [source]
Bind the socket to the specified address.
- Parameters:
address – The address (host, port) to bind to.
- connect(address: tuple[Any, ...] | str, /) None [source]
Connect to a remote socket at the specified address.
This is a blocking operation. For non-blocking connections, use the socket primitives from the flowno module.
- Parameters:
address – The address to connect to (host, port).
- listen(backlog: int | None = None, /) None [source]
Enable a server socket to accept connections.
- Parameters:
backlog – The number of unaccepted connections the system will allow before refusing new connections.
- recv(bufsize: int) Generator[SocketRecvCommand, None, bytes] [source]
Receive data from the socket.
This coroutine yields a SocketRecvCommand for the event loop to process. When data is available to read, the event loop resumes this coroutine.
- Parameters:
bufsize – The maximum number of bytes to receive.
- Returns:
The bytes received from the socket.
- class flowno.core.event_loop.selectors.TLSSocketHandle(socket: socket, ssl_context: SSLContext, server_hostname: str | None)[source]
Wrapper around the built-in ssl socket object.
This class extends SocketHandle to provide TLS/SSL support.
- flowno.core.event_loop.selectors.sel = <selectors.EpollSelector object>
Default selector used by the event loop for socket operations. This selector efficiently monitors multiple socket objects for I/O events.