flowno.io.http_client

HTTP Client for Flowno applications.

This module provides an HTTP client that works with Flowno’s event loop. The client supports both blocking and streaming requests, with automatic handling of chunked transfer encoding, gzip/deflate compression, and JSON serialization/deserialization.

Example

Basic GET request:

>>> from flowno import EventLoop
>>> from flowno.io import HttpClient
>>>
>>> async def main():
...     client = HttpClient()
...     response = await client.get("https://httpbin.org/get")
...     print(f"Status: {response.status_code}")
...     print(f"Body: {response.body[:50]}...")
...
>>> loop = EventLoop()
>>> loop.run_until_complete(main(), join=True)
Status: 200
Body: b'{"args":{},"headers":{"Accept-Encoding":"gzip, deflate"...'

Streaming response with JSON:

>>> from flowno import EventLoop
>>> from flowno.io import HttpClient
>>> from flowno.io.http_client import streaming_response_is_ok
>>>
>>> async def main():
...     client = HttpClient()
...     response = await client.stream_get("https://httpbin.org/stream/3")
...
...     if streaming_response_is_ok(response):
...         print("Streaming response items:")
...         async for chunk in response.body:
...             print(f"  {chunk}")
...
>>> loop = EventLoop()
>>> loop.run_until_complete(main(), join=True)
Streaming response items:
  {'id': 0, 'url': 'https://httpbin.org/stream/3'}
  {'id': 1, 'url': 'https://httpbin.org/stream/3'}
  {'id': 2, 'url': 'https://httpbin.org/stream/3'}
class flowno.io.http_client.ErrStreamingResponse(client: HttpClient, status: str, headers: Headers, body: bytes)[source]

Error streaming HTTP response.

This class is used for streaming responses that resulted in an error, where the full error body is available.

decode_json() Any[source]

Decode the response body as JSON.

is_json() bool[source]

Check if the response has a JSON content type.

exception flowno.io.http_client.HTTPException(status: str, message: str | bytes)[source]

Exception raised for HTTP errors.

This exception includes the HTTP status and body for detailed error reporting.

class flowno.io.http_client.HttpClient(headers: Headers | None = None)[source]

HTTP client compatible with Flowno’s event loop.

This client allows making both regular and streaming HTTP requests. It supports custom headers, JSON serialization, and automatic handling of compressed responses.

Example

>>> async def main():
...     # Create client with custom headers
...     headers = Headers()
...     headers.set("Authorization", "Bearer my_token")
...     client = HttpClient(headers=headers)
...
...     # Make a POST request with JSON data
...     response = await client.post(
...         "https://httpbin.org/post",
...         json={"name": "test", "value": 123}
...     )
...     print(f"Status: {response.status_code}")
async get(url: str) Response[source]

Make a GET request to the given URL, blocking the current task.

Example

>>> async def main():
...     client = HttpClient()
...     response = await client.get("https://httpbin.org/get")
...     print(f"Status: {response.status_code}")
...     print(f"Body: {response.body}")
Parameters:

url – The URL to make the request to

Returns:

Response object containing status, headers, and body

async post(url: str, json: dict[str, Any] | None = None, data: bytes | None = None) Response[source]

Make a POST request to the given URL.

If json is provided, it will be serialized with the client’s JSON encoder and sent as the request body with the appropriate Content-Type header.

Example

>>> async def main():
...     client = HttpClient()
...     response = await client.post(
...         "https://httpbin.org/post",
...         json={"name": "test", "value": 123}
...     )
...     print(f"Status: {response.status_code}")
Parameters:
  • url – The URL to make the request to

  • json – JSON data to send (will be encoded using the client’s JSON encoder)

  • data – Raw data to send (used only if json is None)

Returns:

Response object containing status, headers, and body

async request(method: Literal['GET', 'POST'], url: str, data: bytes | None = None, extra_headers: Headers | None = None) Response[source]

Make a request to the given URL.

This is the core method that handles both GET and POST requests. It handles the entire request-response cycle, including connecting, sending the request, receiving the response, and decompressing the body.

Example

>>> async def main():
...     client = HttpClient()
...     headers = Headers()
...     headers.set("Accept", "application/json")
...     response = await client.request(
...         "GET",
...         "https://httpbin.org/get",
...         extra_headers=headers
...     )
...     print(f"Status: {response.status}")
Parameters:
  • method – The HTTP method to use (“GET” or “POST”)

  • url – The URL to make the request to

  • data – The data to send in the request body

  • extra_headers – Additional headers to include in the request

Returns:

Response object containing status, headers, and body

async stream_get(url: str) OkStreamingResponse[Any] | ErrStreamingResponse[source]

Make a streaming GET request to the given URL.

This method returns a response with a body that is an asynchronous iterator, allowing for processing of response data as it arrives.

Example

>>> async def main():
...     client = HttpClient()
...     response = await client.stream_get("https://httpbin.org/stream/3")
...
...     if streaming_response_is_ok(response):
...         async for chunk in response.body:
...             print(chunk)
Parameters:

url – The URL to make the request to

Returns:

A streaming response object that may be either successful or an error

async stream_post(url: str, json: dict[str, Any] | Any | None = None, data: bytes | None = None) OkStreamingResponse[Any] | ErrStreamingResponse[source]

Make a streaming POST request to the given URL.

If json is provided, it will be serialized with the client’s JSON encoder and sent as the request body with the appropriate Content-Type header.

Example

>>> async def main():
...     client = HttpClient()
...     response = await client.stream_post(
...         "https://httpbin.org/stream/3",
...         json={"key": "value"}
...     )
...
...     if streaming_response_is_ok(response):
...         async for chunk in response.body:
...             print(chunk)
Parameters:
  • url – The URL to make the request to

  • json – JSON data to send (will be encoded using the client’s JSON encoder)

  • data – Raw data to send (used only if json is None)

Returns:

A streaming response object that may be either successful or an error

async stream_request(method: Literal['GET', 'POST'], url: str, data: bytes | None = None, extra_headers: Headers | None = None) OkStreamingResponse[Any] | ErrStreamingResponse[source]

Make a streaming request to the given URL.

This method is similar to request() but returns a streaming response. For successful responses, the body is an asynchronous iterator that yields either parsed JSON objects (for SSE streams) or raw bytes.

Example

>>> async def main():
...     client = HttpClient()
...     response = await client.stream_request(
...         "GET",
...         "https://httpbin.org/stream/3"
...     )
...
...     if streaming_response_is_ok(response):
...         async for chunk in response.body:
...             print(f"Received chunk: {chunk}")
Parameters:
  • method – The HTTP method to use (“GET” or “POST”)

  • url – The URL to make the request to

  • data – The data to send in the request body

  • extra_headers – Additional headers to include in the request

Returns:

A streaming response object that may be either successful or an error

class flowno.io.http_client.OkStreamingResponse(client: HttpClient, status: str, headers: Headers, body: AsyncIterator[T])[source]

Successful streaming HTTP response.

This class is used for streaming responses where the body is available as an asynchronous iterator.

class flowno.io.http_client.Response(client: HttpClient, status: str, headers: Headers, body: bytes)[source]

Regular HTTP response with full body.

This class is used for non-streaming responses where the entire body is available at once.

decode_json() Any[source]

Decode the response body as JSON.

is_json() bool[source]

Check if the response has a JSON content type.

class flowno.io.http_client.ResponseBase(client: HttpClient, status: str, headers: Headers)[source]

Base class for HTTP responses.

This class contains common properties and methods shared by both regular and streaming responses.

property is_ok: bool

Returns True if the status code is in the 2xx range (successful responses).

property status_code: int

Get the numeric HTTP status code.

flowno.io.http_client.streaming_response_is_ok(response: OkStreamingResponse[T] | ErrStreamingResponse) TypeIs[OkStreamingResponse[T]][source]

Type guard for checking if a streaming response is successful.

This function serves as a type guard in Python’s type system, narrowing the type of response to OkStreamingResponse[T] when it returns True.

Example

>>> async def main():
...     client = HttpClient()
...     response = await client.stream_get("https://httpbin.org/stream/1")
...     if streaming_response_is_ok(response):
...         # Here response is known to be OkStreamingResponse[T]
...         async for item in response.body:
...             print(item)
...     else:
...         # Here response is known to be ErrStreamingResponse
...         print(f"Error: {response.status}")
Parameters:

response – The streaming response to check

Returns:

True if the response is successful (i.e., an OkStreamingResponse)