Tutorial: Streaming Chatbot

Warning

This tutorial is a work in progress.

Let’s walk through building a chatbot that streams responses in real-time. We’ll create a simple terminal chat interface that connects to an OpenAI-compatible API (I’ve tested Groq and llama.cpp). A dataflow framework is overkill for something so simple, but the same principles apply to more complex programs. My goal here is to show you the process of going from a vague idea to a concrete dataflow, including the easy mistakes when building something real.

> Hello
Hi! How can I assist you today?
> How many R letters are in "strawberry"?
There are two "r"s in the word "strawberry".
>

Planning the Flow

This is a simple enough problem that we can be confident in fully specifying the flow just by the ordering of events in the program:

  1. Accept user input from the terminal.

  2. Send that input along with a chat history to an LLM API.

  3. Display the AI’s response as it streams back.

  4. Store the AI’s total response in the chat history.

  5. Loop back to step (1).

We need to convert our crude plan into a concrete flow. It is tempting, from a procedural perspective, to try to choose and order nodes based on the actions they perform. This is the sort of activity diagram[1] you would make in traditional no-code flowchart/workflow editors:

Naive Flow

../_images/9f4716837100aa99c1efe5dac071755f2cb2e6e829c285cf633ecd3e2187f568.svg

However! Flowno only considers explicit data dependencies between node when picking the next node to execute. The TerminalInput has no explicit, direct data dependency on the TerminalOutput. We could add a dummy value as a dependency to ensure TerminalInput runs after TerminalOutput finishes, but we have two better options.

Future

I’m considering adding an after() method to DraftNode that would allow marking one node ‘depends’ on another without an actual data dependency.

Merged Node Flow

Combine TerminalInput() and TerminalOutput into a single node, TerminalChat(), that handles both input and output. In the very simplest of flows, this is the best choice. To start with, I’m going to do this. Later on, once we have a simple proof of concept, I’ll use separate nodes but eliminate the explicit dependency between them.

../_images/fc49a5bf63e5baaf01ff3be231f208e453617d8793b962189e7c51b4d78261a1.svg

Independent I/O Nodes

If I add a GUI frontend, it would be nice to have independent I/O nodes that send and receive messages from the frontend. This approach allows the user to enter and submit new prompts even while the previous response is still streaming to the GUI, without enforcing a sequential dependency.

../_images/4b37186c1d40d58ef136bcb644e4c15481957f603b97a356113a7c5807b29108.svg

This flow eliminates the dummy dependency by having the output node simply display whatever responses arrive, while the input node independently waits for user prompts.

Sketch Out the Nodes

Let’s start by sketching out the signatures of the Merged Node Flow. Later we’ll revise this with the minimum default arguments to fix the MissingDefaultError caused by the cyclic dependencies.

async TerminalChat(response: Stream[str])
Decorator:

@~flowno.node

Print each chunk as it arrives, then call input("> ").

Parameters:

response (Stream[str]) – The streamed response from the last interaction.

Returns:

The user entered prompt.

Return type:

str

The ChatHistory node is a Stateful Node so should be a class.

class ChatHistory
Decorator:

@~flowno.node

messages

The accumulated message history.

Type:

Messages

Value:

[Message(“system”, “You are a helpful assistant.”)]

async call(self, prompt: str, last_response: str)

Under role, “assistant”, append the last_response to messages and under role “user” append prompt. Return messages.

Note

It should be obvious that last_response should have a default value. The first time through the flow, there is no ‘last_response’. When we try to run the flow, we will see a MissingDefaultError that lists the last_response argument as one potential fix.

Parameters:
  • prompt (str) – The user entered prompt.

  • last_response (str) – The response to the last prompt.

Return type:

Messages

Returns:

The accumulated list of messages.

This class used the Messages and Message utility types.

class Message(role: Literal['system', 'user', 'assistant'], content: str)
type Messages = list[Message, ...]
async Inference(messages: Messages)
Decorator:

@~flowno.node

Sends the chat history to a chat-completion API and stream back the response.

Parameters:

messages (Messages) – The list of messages.

Yields:

The streamed text chunks from the API endpoint.

Return type:

Stream[str]

Implementing the Nodes

LLM API Node

We are going to use an OpenAI compatible “chat completion” API. I’m using groq, but you can use whatever or set up a local inference server like llama.cpp if you prefer. The Inference() node will send the chat history to the API and stream back the response.

Flowno provides an async flowno.io.HTTPClient class, compatible with the flowno event loop, that replaces some of the functionality of the blocking requests or asyncio aiohttp libraries. The HTTPClient class is designed to work with Flowno’s custom event loop. Because Flowno uses a custom event loop, and not threads or Asyncio, any blocking calls will block the entire flow, and awaiting incompatible asyncio primitives will do nothing.

Future

I’m considering replacing the flowno event loop with asyncio or adding compatibility with asyncio.

from flowno import node
from flowno.io import HttpClient, Headers, streaming_response_is_ok
from json import JSONEncoder
import os

API_URL = "https://api.groq.com/openai/v1/chat/completions"
TOKEN = os.environ["GROQ_API_KEY"]

headers = Headers()
headers.set("Authorization", f"Bearer {TOKEN}")

client = HttpClient(headers=headers)

class MessageJSONEncoder(JSONEncoder):
    @override
    def default(self, o: Any):
        if isinstance(o, Message):
            return {
                "role": o.role,
                "content": o.content,
            }
        return super().default(o)

client.json_encoder = MessageJSONEncoder()

@node
async def Inference(messages: Messages):
    """
    Streams the LLM API response.
    """
    response = await client.stream_post(
        API_URL,
        json={
            "messages": messages,
            "model": "llama-3.3-70b-versatile",
            "stream": True,
        },
    )

    if not streaming_response_is_ok(response):
        logger.error("Response Body: %s", response.body)
        raise HTTPException(response.status, response.body)

    async for response_stream_json in response.body:
        try:
            choice = response_stream_json["choices"][0]
            # If finish_reason is set, skip this chunk.
            if choice.get("finish_reason"):
                continue
            # by adding a type here, the typechecker now knows Inference
            # produces a Stream[str]
            chunk: str = choice["delta"]["content"]
            yield chunk

        except KeyError:
            raise ValueError(response_stream_json)

Future

I’m going to add a fleshed out version of the inference node to the list of pre-built nodes.

Here are the key parts of the node:

  • @node async def Inference(...):: The stateless node definition MUST be an async def. The @~flowno.node decorator transforms the function into a node factory.

Warning

The async keyword is required, even if you don’t use any async/await features.

  • The HttpClient.stream_post() method initiates a POST request to the API. Instead of waiting for the complete response, it immediately returns an async generator of deserialized Server-Sent-Events. Objects passed in with the json= keyword argument are serialized with HttpClient.json_encoder.

  • The async for loop is the async analog of the regular for loop. It awaits the next value in the async generator response, possibly suspending. Control is returned when the client recieves another streamed chunk.

  • The yield statement is what makes this a Streaming Node rather than a Mono Node. Each time we receive a piece of data through our async for loop, we immediately pass it on to downstream nodes yielding control to the event loop, rather than waiting for all data to arrive first.

  • If we wanted, we could return an accumulated final_result after ending the stream by manually raising a StopAsyncIteration exception. Sadly, Python doesn’t allow mixing return and yield in AsyncGenerator types. If you want to explicitly return a final value, you need to explicitly call raise StopAsyncIteration(return_value). This value will be passed to any connect downstream nodes that did not set @node(stream_in=[...]). Instead in this example, I’m taking advantage of implicit accumulation of Stream[str] values when sending the complete response to the downstream ChatHistory node.

Tip

Flowno will automatically accumulate string values and provide them to downstream nodes that can not accept a Stream[str]. I’ll point out this magic behavior when we make the node connections.

Terminal Chat Node

Next up is the TerminalChat() stateless, stream-in, mono node.

from flowno import node

@node(stream_in=["response_chunks"])
async def TerminalChat(response_chunks: Stream[str]):
    async for chunk in response_chunks:
        print(chunk, end="", flush=True)
    return input("> ")

Key points:

  1. @node(stream_in=["response_chunks"]): By passing in stream_in to the @~flowno.node decorator, we say that this node can receive a stream of data in the given input, rather than a single value. If you forget this Flowno will wait until the upstream node (Inference() in this case) has ended its stream and pass the full string to TerminalChat().

  2. async def TerminalOutput(response_chunks: Stream[str]):: The argument response_chunks is annotated with the generic type Stream. This type annotation is completely optional, but it is useful for static typechecking. Stream is an AsyncIterator like the value of client.stream_post(...) in the Inference node body.

Warning

If you forget to use @~flowno.node‘s stream_in=[...] argument, flowno will pass in a str value instead of the desired Stream[str].

  1. async for chunk in response_chunks:: Like before, await values from the async iterator. This time the iterator is a Stream[str] type. The typechecker can infer that chunk is a str. When the connected Streaming Node does not have fresh data, this async for statement suspends until the upstream node has yielded more data.

Chat History Node

from dataclasses import dataclass
from typing import Literal

@dataclass
class Message:
    role: Literal["system", "user", "assistant"]
    content: str

@node
class ChatHistory:
    messages = [Message("system", "You are a helpful assistant.")]

    async def call(self, prompt: str, response: str):
        self.messages.append(Message("user", prompt))
        self.messages.append(Message("assistant", response))
        return self.messages

Step 4: Designing the Flow Graph

with FlowHDL() as f:
    f.terminal_input = TerminalChat()

    f.history = ChatHistory(f.terminal_input, f.inference)

    f.inference = GroqInference(f.history)

    f.terminal_output = TerminalChat(f.inference)

Step 5: Handling Cycles

If you attempt to run the flow as-is:

f.run_until_complete()

You will get a MissingDefaultError due to the cycle between ChatHistory and GroqInference.

Solution 1: Add a default value to ChatHistory:

@node
class ChatHistory:
    async def call(self, new_prompt: str, last_response: str = ""):
        ...

Solution 2: Add a default value to GroqInference:

@node
async def GroqInference(messages: list[Message] | None = None):
    if messages is None:
        # Handle initial run
        ...

Read More