flowno.io
- class flowno.io.Headers[source]
Case-insensitive container for HTTP headers.
This class provides methods for working with HTTP headers, ensuring that header names are handled case-insensitively as per HTTP specifications. It also handles automatic conversion of list values to comma-separated strings as required by the HTTP protocol.
Examples
>>> headers = Headers() >>> headers.set("Content-Type", "application/json") >>> headers.get("content-type") 'application/json'
# Using list values for headers that accept multiple values >>> headers.set(“Accept”, [“text/html”, “application/json”]) >>> headers.get(“Accept”) ‘text/html, application/json’
- delete(name: str) None [source]
Remove a header.
- Parameters:
name – Header name (case-insensitive)
Examples
>>> headers = Headers() >>> headers.set("X-Custom-Header", "value") >>> headers.delete("X-Custom-Header") >>> headers.get("X-Custom-Header") None
- get(name: str, default: str | list[str] | None = None) str | list[str] | None [source]
Get a header value.
- Parameters:
name – Header name (case-insensitive)
default – Value to return if the header is not found
- Returns:
The header value, or the default value if not found
Examples
>>> headers = Headers() >>> headers.set("Content-Type", "application/json") >>> headers.get("content-type") 'application/json' >>> headers.get("nonexistent-header", "default-value") 'default-value'
- merge(headers: Headers) None [source]
Merge headers from another Headers instance.
This will override any existing headers with the same names.
- Parameters:
headers – Another Headers instance to merge from
Examples
>>> headers1 = Headers() >>> headers1.set("Content-Type", "application/json") >>> >>> headers2 = Headers() >>> headers2.set("Accept", "text/html") >>> headers2.set("Content-Type", "text/plain") # Will override >>> >>> headers1.merge(headers2) >>> headers1.get("Content-Type") 'text/plain' >>> headers1.get("Accept") 'text/html'
- set(name: str, value: str | list[str]) None [source]
Set a header value.
If the value is a list, it’s joined with commas to create a single header value, which is the standard way to represent multiple values for a single header in HTTP.
- Parameters:
name – Header name (case-insensitive)
value – Header value or list of values
Examples
>>> headers = Headers() >>> headers.set("Content-Type", "application/json") >>> headers.set("Accept-Encoding", ["gzip", "deflate"])
- stringify() str [source]
Convert headers to a string suitable for an HTTP request.
- Returns:
HTTP headers as a string with CRLF line endings
Examples
>>> headers = Headers() >>> headers.set("Content-Type", "application/json") >>> headers.set("Accept", "text/html") >>> print(headers.stringify()) content-type: application/json accept: text/html
- class flowno.io.HttpClient(headers: Headers | None = None)[source]
HTTP client compatible with Flowno’s event loop.
This client allows making both regular and streaming HTTP requests. It supports custom headers, JSON serialization, and automatic handling of compressed responses.
Example
>>> async def main(): ... # Create client with custom headers ... headers = Headers() ... headers.set("Authorization", "Bearer my_token") ... client = HttpClient(headers=headers) ... ... # Make a POST request with JSON data ... response = await client.post( ... "https://httpbin.org/post", ... json={"name": "test", "value": 123} ... ) ... print(f"Status: {response.status_code}")
- async get(url: str) Response [source]
Make a GET request to the given URL, blocking the current task.
Example
>>> async def main(): ... client = HttpClient() ... response = await client.get("https://httpbin.org/get") ... print(f"Status: {response.status_code}") ... print(f"Body: {response.body}")
- Parameters:
url – The URL to make the request to
- Returns:
Response object containing status, headers, and body
- async post(url: str, json: dict[str, Any] | None = None, data: bytes | None = None) Response [source]
Make a POST request to the given URL.
If json is provided, it will be serialized with the client’s JSON encoder and sent as the request body with the appropriate Content-Type header.
Example
>>> async def main(): ... client = HttpClient() ... response = await client.post( ... "https://httpbin.org/post", ... json={"name": "test", "value": 123} ... ) ... print(f"Status: {response.status_code}")
- Parameters:
url – The URL to make the request to
json – JSON data to send (will be encoded using the client’s JSON encoder)
data – Raw data to send (used only if json is None)
- Returns:
Response object containing status, headers, and body
- async request(method: Literal['GET', 'POST'], url: str, data: bytes | None = None, extra_headers: Headers | None = None) Response [source]
Make a request to the given URL.
This is the core method that handles both GET and POST requests. It handles the entire request-response cycle, including connecting, sending the request, receiving the response, and decompressing the body.
Example
>>> async def main(): ... client = HttpClient() ... headers = Headers() ... headers.set("Accept", "application/json") ... response = await client.request( ... "GET", ... "https://httpbin.org/get", ... extra_headers=headers ... ) ... print(f"Status: {response.status}")
- Parameters:
method – The HTTP method to use (“GET” or “POST”)
url – The URL to make the request to
data – The data to send in the request body
extra_headers – Additional headers to include in the request
- Returns:
Response object containing status, headers, and body
- async stream_get(url: str) OkStreamingResponse[Any] | ErrStreamingResponse [source]
Make a streaming GET request to the given URL.
This method returns a response with a body that is an asynchronous iterator, allowing for processing of response data as it arrives.
Example
>>> async def main(): ... client = HttpClient() ... response = await client.stream_get("https://httpbin.org/stream/3") ... ... if streaming_response_is_ok(response): ... async for chunk in response.body: ... print(chunk)
- Parameters:
url – The URL to make the request to
- Returns:
A streaming response object that may be either successful or an error
- async stream_post(url: str, json: dict[str, Any] | Any | None = None, data: bytes | None = None) OkStreamingResponse[Any] | ErrStreamingResponse [source]
Make a streaming POST request to the given URL.
If json is provided, it will be serialized with the client’s JSON encoder and sent as the request body with the appropriate Content-Type header.
Example
>>> async def main(): ... client = HttpClient() ... response = await client.stream_post( ... "https://httpbin.org/stream/3", ... json={"key": "value"} ... ) ... ... if streaming_response_is_ok(response): ... async for chunk in response.body: ... print(chunk)
- Parameters:
url – The URL to make the request to
json – JSON data to send (will be encoded using the client’s JSON encoder)
data – Raw data to send (used only if json is None)
- Returns:
A streaming response object that may be either successful or an error
- async stream_request(method: Literal['GET', 'POST'], url: str, data: bytes | None = None, extra_headers: Headers | None = None) OkStreamingResponse[Any] | ErrStreamingResponse [source]
Make a streaming request to the given URL.
This method is similar to request() but returns a streaming response. For successful responses, the body is an asynchronous iterator that yields either parsed JSON objects (for SSE streams) or raw bytes.
Example
>>> async def main(): ... client = HttpClient() ... response = await client.stream_request( ... "GET", ... "https://httpbin.org/stream/3" ... ) ... ... if streaming_response_is_ok(response): ... async for chunk in response.body: ... print(f"Received chunk: {chunk}")
- Parameters:
method – The HTTP method to use (“GET” or “POST”)
url – The URL to make the request to
data – The data to send in the request body
extra_headers – Additional headers to include in the request
- Returns:
A streaming response object that may be either successful or an error
- class flowno.io.HttpServer(host: str, port: int)[source]
Simple HTTP server compatible with the Flowno event loop.
This is a minimal HTTP server implementation for development and testing. For production scenarios, it’s recommended to use a dedicated web server like Flask or FastAPI in a separate process.
- host
The hostname or IP address to bind to
- port
The port number to listen on
Example
>>> from flowno import EventLoop >>> from flowno.io import HttpServer >>> >>> async def custom_server(): ... server = HttpServer('localhost', 8080) ... await server.serve() ... >>> loop = EventLoop() >>> loop.run_until_complete(custom_server())
- async handle_client(client_sock: SocketHandle)[source]
Handle an individual client connection.
This method reads the client request, processes it, and sends a response.
- Parameters:
client_sock – The socket connected to the client
- async serve()[source]
Start the HTTP server and begin accepting connections.
This method binds to the specified host and port, then enters an infinite loop to accept and handle client connections. Each client connection is handled in a separate task.
The current task will suspend and other tasks can run concurrently.