Source code for flowno.io.http_client

"""
HTTP Client for Flowno applications.

This module provides an HTTP client that works with Flowno's event loop. The client 
supports both blocking and streaming requests, with automatic handling of chunked 
transfer encoding, gzip/deflate compression, and JSON serialization/deserialization.

Example:
    Basic GET request:
    
    >>> from flowno import EventLoop
    >>> from flowno.io import HttpClient
    >>> 
    >>> async def main():
    ...     client = HttpClient()
    ...     response = await client.get("https://httpbin.org/get")
    ...     print(f"Status: {response.status_code}")
    ...     print(f"Body: {response.body[:50]}...")
    ... 
    >>> loop = EventLoop()
    >>> loop.run_until_complete(main(), join=True)
    Status: 200
    Body: b'{"args":{},"headers":{"Accept-Encoding":"gzip, deflate"...'

    Streaming response with JSON:
    
    >>> from flowno import EventLoop
    >>> from flowno.io import HttpClient
    >>> from flowno.io.http_client import streaming_response_is_ok
    >>> 
    >>> async def main():
    ...     client = HttpClient()
    ...     response = await client.stream_get("https://httpbin.org/stream/3")
    ...     
    ...     if streaming_response_is_ok(response):
    ...         print("Streaming response items:")
    ...         async for chunk in response.body:
    ...             print(f"  {chunk}")
    ... 
    >>> loop = EventLoop()
    >>> loop.run_until_complete(main(), join=True)
    Streaming response items:
      {'id': 0, 'url': 'https://httpbin.org/stream/3'}
      {'id': 1, 'url': 'https://httpbin.org/stream/3'}
      {'id': 2, 'url': 'https://httpbin.org/stream/3'}
"""

import gzip
import json
import logging
import re
import zlib

from collections.abc import AsyncGenerator, AsyncIterator, Generator
from dataclasses import dataclass
from typing import Any, Generic, Literal, TypeVar
from urllib.parse import urlparse

from flowno import SocketHandle, socket
from flowno.io.headers import Headers
from typing_extensions import TypeIs

T = TypeVar("T")
logger = logging.getLogger(__name__)


[docs] @dataclass class ResponseBase: """ Base class for HTTP responses. This class contains common properties and methods shared by both regular and streaming responses. """ client: "HttpClient" status: str # e.g. "HTTP/1.1 200 OK" headers: Headers @property def status_code(self) -> int: """Get the numeric HTTP status code.""" return int(self.status.split(" ")[1]) @property def is_ok(self) -> bool: """Returns True if the status code is in the 2xx range (successful responses).""" return 200 <= self.status_code < 300
[docs] @dataclass class Response(ResponseBase): """ Regular HTTP response with full body. This class is used for non-streaming responses where the entire body is available at once. """ body: bytes
[docs] def is_json(self) -> bool: """Check if the response has a JSON content type.""" content_type = self.headers.get("content-type") return content_type == "application/json"
[docs] def decode_json(self) -> Any: """Decode the response body as JSON.""" return self.client.json_decoder.decode(self.body.decode())
[docs] @dataclass class OkStreamingResponse(ResponseBase, Generic[T]): """ Successful streaming HTTP response. This class is used for streaming responses where the body is available as an asynchronous iterator. """ body: AsyncIterator[T]
[docs] @dataclass class ErrStreamingResponse(ResponseBase): """ Error streaming HTTP response. This class is used for streaming responses that resulted in an error, where the full error body is available. """ body: bytes
[docs] def is_json(self) -> bool: """Check if the response has a JSON content type.""" content_type = self.headers.get("content-type") return content_type == "application/json"
[docs] def decode_json(self) -> Any: """Decode the response body as JSON.""" return self.client.json_decoder.decode(self.body.decode())
def _status_ok(status: str) -> bool: """ Check if an HTTP status indicates success (2xx). Args: status: HTTP status line (e.g., "HTTP/1.1 200 OK") Returns: True if status code is in the 2xx range """ return 200 <= int(status.split(" ")[1]) < 300
[docs] def streaming_response_is_ok( response: OkStreamingResponse[T] | ErrStreamingResponse, ) -> TypeIs[OkStreamingResponse[T]]: """ Type guard for checking if a streaming response is successful. This function serves as a type guard in Python's type system, narrowing the type of `response` to `OkStreamingResponse[T]` when it returns True. Example: >>> async def main(): ... client = HttpClient() ... response = await client.stream_get("https://httpbin.org/stream/1") ... if streaming_response_is_ok(response): ... # Here response is known to be OkStreamingResponse[T] ... async for item in response.body: ... print(item) ... else: ... # Here response is known to be ErrStreamingResponse ... print(f"Error: {response.status}") Args: response: The streaming response to check Returns: True if the response is successful (i.e., an OkStreamingResponse) """ return _status_ok(response.status)
[docs] class HTTPException(Exception): """ Exception raised for HTTP errors. This exception includes the HTTP status and body for detailed error reporting. """ def __init__(self, status: str, message: str | bytes): super().__init__(self, f"Status: {status}\nBody: {message}")
[docs] class HttpClient: """ HTTP client compatible with Flowno's event loop. This client allows making both regular and streaming HTTP requests. It supports custom headers, JSON serialization, and automatic handling of compressed responses. Example: >>> async def main(): ... # Create client with custom headers ... headers = Headers() ... headers.set("Authorization", "Bearer my_token") ... client = HttpClient(headers=headers) ... ... # Make a POST request with JSON data ... response = await client.post( ... "https://httpbin.org/post", ... json={"name": "test", "value": 123} ... ) ... print(f"Status: {response.status_code}") """ def __init__(self, headers: Headers | None = None): """ Initialize a new HTTP client. Args: headers: Default headers to include in all requests """ self.override_headers: Headers = headers or Headers() self.json_decoder: json.JSONDecoder = json.JSONDecoder() self.json_encoder: json.JSONEncoder = json.JSONEncoder() self._sse_buffer: bytes = b"" self._json_buffer: str = ""
[docs] async def get(self, url: str) -> Response: """ Make a GET request to the given URL, blocking the current task. Example: >>> async def main(): ... client = HttpClient() ... response = await client.get("https://httpbin.org/get") ... print(f"Status: {response.status_code}") ... print(f"Body: {response.body}") Args: url: The URL to make the request to Returns: Response object containing status, headers, and body """ return await self.request("GET", url)
[docs] async def post( self, url: str, json: dict[str, Any] | None = None, # pyright: ignore[reportExplicitAny] data: bytes | None = None, ) -> Response: """ Make a POST request to the given URL. If `json` is provided, it will be serialized with the client's JSON encoder and sent as the request body with the appropriate Content-Type header. Example: >>> async def main(): ... client = HttpClient() ... response = await client.post( ... "https://httpbin.org/post", ... json={"name": "test", "value": 123} ... ) ... print(f"Status: {response.status_code}") Args: url: The URL to make the request to json: JSON data to send (will be encoded using the client's JSON encoder) data: Raw data to send (used only if json is None) Returns: Response object containing status, headers, and body """ headers = Headers() if json is not None: data = self.json_encoder.encode(json).encode(encoding="utf-8") headers.set("Content-Type", "application/json") return await self.request("POST", url, data, headers)
[docs] async def stream_get( self, url: str ) -> OkStreamingResponse[Any] | ErrStreamingResponse: # pyright: ignore[reportExplicitAny] """ Make a streaming GET request to the given URL. This method returns a response with a body that is an asynchronous iterator, allowing for processing of response data as it arrives. Example: >>> async def main(): ... client = HttpClient() ... response = await client.stream_get("https://httpbin.org/stream/3") ... ... if streaming_response_is_ok(response): ... async for chunk in response.body: ... print(chunk) Args: url: The URL to make the request to Returns: A streaming response object that may be either successful or an error """ return await self.stream_request("GET", url)
[docs] async def stream_post( self, url: str, json: dict[str, Any] | Any | None = None, # pyright: ignore[reportExplicitAny] data: bytes | None = None, ) -> OkStreamingResponse[Any] | ErrStreamingResponse: # pyright: ignore[reportExplicitAny] """ Make a streaming POST request to the given URL. If `json` is provided, it will be serialized with the client's JSON encoder and sent as the request body with the appropriate Content-Type header. Example: >>> async def main(): ... client = HttpClient() ... response = await client.stream_post( ... "https://httpbin.org/stream/3", ... json={"key": "value"} ... ) ... ... if streaming_response_is_ok(response): ... async for chunk in response.body: ... print(chunk) Args: url: The URL to make the request to json: JSON data to send (will be encoded using the client's JSON encoder) data: Raw data to send (used only if json is None) Returns: A streaming response object that may be either successful or an error """ headers = Headers() if json is not None: data = self.json_encoder.encode(json).encode(encoding="utf-8") headers.set("Content-Type", "application/json") return await self.stream_request("POST", url, data, headers)
def _parse_url(self, url: str) -> tuple[str, int, str, bool]: """ Parse a URL into components needed for making a request. Args: url: The URL to parse Returns: Tuple of (host, port, path, use_tls) """ parsed_url = urlparse(url) host = parsed_url.hostname or "localhost" if parsed_url.scheme == "https": port = 443 use_tls = True else: port = 80 use_tls = False port = parsed_url.port or port path = parsed_url.path or "/" return host, port, path, use_tls async def _receive_headers(self, sock: SocketHandle) -> tuple[str, Headers, bytes]: """ Receive HTTP headers from a socket. This method reads from the socket until it encounters the end of headers marker (double CRLF), then parses the headers and returns them along with any body data that may have been included in the same read. Args: sock: The socket to read from Returns: Tuple of (status_line, headers, initial_body_data) """ headers = Headers() data = b"" while True: data = await sock.recv(1024) if not data: break if b"\r\n\r\n" in data: break split_data = data.split(b"\r\n\r\n", 1) if len(split_data) == 2: headers_data, initial_body = split_data else: headers_data = split_data[0] initial_body = b"" lines = headers_data.decode().split("\r\n") status = lines[0] for line in lines[1:]: try: name, value = line.split(": ", 1) except ValueError: continue headers.set(name, value) return status, headers, initial_body
[docs] async def request( self, method: Literal["GET", "POST"], url: str, data: bytes | None = None, extra_headers: Headers | None = None, ) -> Response: """ Make a request to the given URL. This is the core method that handles both GET and POST requests. It handles the entire request-response cycle, including connecting, sending the request, receiving the response, and decompressing the body. Example: >>> async def main(): ... client = HttpClient() ... headers = Headers() ... headers.set("Accept", "application/json") ... response = await client.request( ... "GET", ... "https://httpbin.org/get", ... extra_headers=headers ... ) ... print(f"Status: {response.status}") Args: method: The HTTP method to use ("GET" or "POST") url: The URL to make the request to data: The data to send in the request body extra_headers: Additional headers to include in the request Returns: Response object containing status, headers, and body """ # Parse the URL host, port, path, use_tls = self._parse_url(url) # Create a socket sock = socket(use_tls=use_tls, server_hostname=host) sock.connect((host, port)) request = f"{method} {path} HTTP/1.1\r\n" request = request.encode("utf-8") headers = Headers() if data is not None: headers.set("Content-Length", str(len(data))) headers.set("Host", host) headers.set("User-Agent", "Flowno/0.1") headers.set("Accept-Encoding", ["gzip", "deflate"]) headers.set("Connection", "close") if extra_headers is not None: headers.merge(extra_headers) headers.merge(self.override_headers) request += headers.stringify().encode("utf-8") + b"\r\n\r\n" + (data or b"") sent = await sock.send(request) assert sent == len(request), f"Sent {sent} bytes, expected {len(request)}" status, response_headers, initial_body = await self._receive_headers(sock) body = await self._receive_remainder(sock, initial_body, response_headers) assert isinstance(body, bytes), f"Expected bytes, got {type(body)}" decompressed_body = self._decompress_body(body, response_headers) return Response(self, status, response_headers, decompressed_body)
[docs] async def stream_request( self, method: Literal["GET", "POST"], url: str, data: bytes | None = None, extra_headers: Headers | None = None, ) -> OkStreamingResponse[Any] | ErrStreamingResponse: # pyright: ignore[reportExplicitAny] """ Make a streaming request to the given URL. This method is similar to `request()` but returns a streaming response. For successful responses, the body is an asynchronous iterator that yields either parsed JSON objects (for SSE streams) or raw bytes. Example: >>> async def main(): ... client = HttpClient() ... response = await client.stream_request( ... "GET", ... "https://httpbin.org/stream/3" ... ) ... ... if streaming_response_is_ok(response): ... async for chunk in response.body: ... print(f"Received chunk: {chunk}") Args: method: The HTTP method to use ("GET" or "POST") url: The URL to make the request to data: The data to send in the request body extra_headers: Additional headers to include in the request Returns: A streaming response object that may be either successful or an error """ host, port, path, use_tls = self._parse_url(url) sock = socket(use_tls=use_tls, server_hostname=host) sock.connect((host, port)) request = f"{method} {path} HTTP/1.1\r\n" request = request.encode("utf-8") headers = Headers() if data is not None: headers.set("Content-Length", str(len(data))) headers.set("Host", host) headers.set("User-Agent", "Flowno/0.1") headers.set("Accept-Encoding", ["gzip", "deflate"]) headers.set("Connection", "close") if extra_headers is not None: headers.merge(extra_headers) headers.merge(self.override_headers) request += headers.stringify().encode("utf-8") + b"\r\n\r\n" + (data or b"") sent = await sock.send(request) assert sent == len(request), f"Sent {sent} bytes, expected {len(request)}" status, response_headers, initial_body = await self._receive_headers(sock) async def body_generator(): body = self._stream_read(sock, initial_body) # each chunk yielded by _stream_read is a contains one or more `message lines` # message lines are separated by `\r\ndata: ` and are grouped into messages by `\n\n` or `\r\n` depending on the implementation # chunks may contain multiple messages, and messages may be split across chunks # I'm going to assume messages are json, except for '[DONE]' which is a pointless signal to close the connection # I can't split chunks into lines with `\r\ndata: ` because the json content may contain that string # Instead I will json raw_decode messages then yield them until I receive a message that is just '[DONE]' or chunk size 0 async for chunk in body: decompressed_chunk = self._decompress_chunk(chunk, response_headers) content_type = response_headers.get("Content-Type") if isinstance(content_type, str) and content_type.startswith("text/event-stream"): for message in self._split_chunks_to_message_json(decompressed_chunk): yield message else: yield decompressed_chunk if _status_ok(status): return OkStreamingResponse( self, status=status, headers=response_headers, body=body_generator(), ) else: body = await self._receive_remainder(sock, initial_body, response_headers) assert isinstance(body, bytes), f"Expected bytes, got {type(body)}" decompressed_body = self._decompress_body(body, response_headers) return ErrStreamingResponse( self, status=status, headers=response_headers, body=decompressed_body, )
def _split_chunks_to_message_json(self, chunk: bytes) -> Generator[Any, None, None]: """ Split SSE chunks into JSON objects. This method processes Server-Sent Events (SSE) data, extracting and parsing JSON objects from the data fields. Args: chunk: Raw bytes from the SSE stream Yields: Parsed JSON objects from the SSE stream """ self._sse_buffer += chunk # Match both \r\n\r\n and \n\n as delimiters pattern = re.compile(rb"(\r\n\r\n|\n\n)") while True: match = pattern.search(self._sse_buffer) if not match: break # Exit if no complete message is found event_bytes, self._sse_buffer = self._sse_buffer.split(match.group(0), 1) event = event_bytes.decode(errors="replace") if not event.startswith("data: "): continue json_str = event[len("data: "):].strip() if json_str == "[DONE]": break # Buffering for incomplete JSON self._json_buffer += json_str while self._json_buffer: try: message, idx = self.json_decoder.raw_decode(self._json_buffer) yield message self._json_buffer = self._json_buffer[idx:].lstrip() except json.JSONDecodeError as e: if e.pos == len(self._json_buffer): # Incomplete JSON, keep accumulating break logger.error(f"Error decoding JSON: {repr(self._json_buffer)}") raise def _decompress_chunk(self, chunk: bytes, headers: Headers) -> bytes: """ Decompress a chunk of data based on Content-Encoding header. Args: chunk: The data to decompress headers: Response headers containing Content-Encoding Returns: Decompressed data or the original chunk if no compression was used """ encoding = headers.get("Content-Encoding") if encoding == "gzip": return gzip.decompress(chunk) elif encoding == "deflate": return zlib.decompress(chunk) return chunk def _decompress_body(self, body: bytes, headers: Headers) -> bytes: """ Decompress an entire response body based on Content-Encoding header. Args: body: The data to decompress headers: Response headers containing Content-Encoding Returns: Decompressed body or the original body if no compression was used """ encoding = headers.get("Content-Encoding") if encoding == "gzip": return zlib.decompress(body, 16 + zlib.MAX_WBITS) elif encoding == "deflate": return zlib.decompress(body) return body async def _receive_remainder( self, sock: SocketHandle, initial_body: bytes, headers: Headers ) -> bytes | AsyncGenerator[bytes, None]: """ Receive the remaining response body after headers. This method handles both fixed-length and chunked responses. Args: sock: The socket to read from initial_body: Any body data that was read with the headers headers: Response headers Returns: Complete response body for fixed-length responses, or a generator for chunked responses """ body = initial_body content_length = headers.get("Content-Length") is_chunked = headers.get("Transfer-Encoding") == "chunked" if not is_chunked: assert content_length, "No content-length header found" assert isinstance(content_length, str), "Content-length header is not a string" content_length = int(content_length) body_length = len(body) while body_length < content_length: remaining_chunk = await sock.recv(1024) if not remaining_chunk: break body += remaining_chunk body_length += len(remaining_chunk) return body else: return self._stream_read(sock, initial_body) async def _stream_read(self, sock: SocketHandle, initial_body: bytes) -> AsyncGenerator[bytes, None]: # content-length is ignored if transfer-encoding is chunked # we need to start reading chunks from initial body then continuing to read from socket # the initial may be empty and may or may not contain a complete number of chunks. # if the header size is just unlucky the first chunk may be split or even the size of the chunk may be split # none of this considers intentional bad or weird responses from the server # TODO: adversarial testing # run in a loop, yielding chunks until the server closes the connection head = initial_body self._sse_buffer = b"" while True: trailing = await sock.recv(1024) if trailing == b"": # server closed connection break head += trailing # head must now contain at least one byte while b"\r\n" not in head: # the first time \r\n is found, it must be immediately after the chunk size trailing = await sock.recv(1024) if trailing == b"": raise Exception("Server closed connection before sending chunk size") head += trailing chunk_size_hex, body = head.split(b"\r\n", 1) if chunk_size_hex == b"": break chunk_size = int(chunk_size_hex, 16) while chunk_size > len(body): remaining_chunk = await sock.recv(chunk_size - len(body)) if remaining_chunk == b"": raise Exception("Server closed connection before sending full chunk") body += remaining_chunk complete_chunk = body[:chunk_size] yield complete_chunk head = body[chunk_size:] while not head.startswith(b"\r\n"): trailing = await sock.recv(1024) if trailing == b"": raise Exception("Server closed connection before sending chunk terminator") head += trailing head = head[2:] # remove the terminator # now head should be empty or contain the start of the next chunk async def _receive_all(self, sock: SocketHandle) -> bytes: """ Receive all data from a socket until connection closes. Args: sock: The socket to read from Returns: All data received from the socket """ data = b"" while True: chunk = await sock.recv(1024) if not chunk: break data += chunk return data