Tutorial: Streaming Chatbot
Warning
This tutorial is a work in progress.
Let’s walk through building a chatbot that streams responses in real-time. We’ll create a simple terminal chat interface that connects to an OpenAI-compatible API (I’ve tested Groq and llama.cpp). A dataflow framework is overkill for something so simple, but the same principles apply to more complex programs. My goal here is to show you the process of going from a vague idea to a concrete dataflow, including the easy mistakes when building something real.
> Hello
Hi! How can I assist you today?
> How many R letters are in "strawberry"?
There are two "r"s in the word "strawberry".
>
Planning the Flow
This is a simple enough problem that we can be confident in fully specifying the flow just by the ordering of events in the program:
Accept user input from the terminal.
Send that input along with a chat history to an LLM API.
Display the AI’s response as it streams back.
Store the AI’s total response in the chat history.
Loop back to step (1).
We need to convert our crude plan into a concrete flow. It is tempting, from a procedural perspective, to try to choose and order nodes based on the actions they perform. This is the sort of activity diagram[1] you would make in traditional no-code flowchart/workflow editors:
Naive Flow
However! Flowno only considers explicit data dependencies between node when
picking the next node to execute. The TerminalInput
has no
explicit, direct data dependency on the TerminalOutput
. We could
add a dummy value as a dependency to ensure TerminalInput
runs
after TerminalOutput
finishes, but we have two better options.
Future
I’m considering adding an after()
method to DraftNode
that would allow marking one node ‘depends’ on another without an actual
data dependency.
Merged Node Flow
Combine TerminalInput()
and TerminalOutput
into a single
node, TerminalChat()
, that handles both input and output. In the very
simplest of flows, this is the best choice. To start with, I’m going to do
this. Later on, once we have a simple proof of concept, I’ll use separate nodes
but eliminate the explicit dependency between them.
Independent I/O Nodes
If I add a GUI frontend, it would be nice to have independent I/O nodes that send and receive messages from the frontend. This approach allows the user to enter and submit new prompts even while the previous response is still streaming to the GUI, without enforcing a sequential dependency.
This flow eliminates the dummy dependency by having the output node simply display whatever responses arrive, while the input node independently waits for user prompts.
Sketch Out the Nodes
Let’s start by sketching out the signatures of the Merged Node Flow.
Later we’ll revise this with the minimum default arguments to fix the
MissingDefaultError
caused by the cyclic dependencies.
- async TerminalChat(response: Stream[str])
- Decorator:
Print each chunk as it arrives, then call
input("> ")
.
The ChatHistory
node is a Stateful Node so
should be a class.
- class ChatHistory
- Decorator:
- messages
The accumulated message history.
- Type:
Messages
- Value:
[Message(“system”, “You are a helpful assistant.”)]
- async call(self, prompt: str, last_response: str)
Under role, “assistant”, append the last_response to messages and under role “user” append prompt. Return messages.
Note
It should be obvious that last_response should have a default value. The first time through the flow, there is no ‘last_response’. When we try to run the flow, we will see a
MissingDefaultError
that lists the last_response argument as one potential fix.
This class used the Messages
and Message
utility types.
- async Inference(messages: Messages)
- Decorator:
Sends the chat history to a chat-completion API and stream back the response.
Implementing the Nodes
LLM API Node
We are going to use an OpenAI compatible “chat completion” API. I’m using groq, but you can use whatever or set up a local inference server
like llama.cpp if you prefer. The
Inference()
node will send the chat history to the API and stream back
the response.
Flowno provides an async flowno.io.HTTPClient
class, compatible
with the flowno event loop, that replaces some of the functionality of the
blocking requests or asyncio
aiohttp libraries. The
HTTPClient
class is designed to work with Flowno’s
custom event loop. Because Flowno uses a custom event loop, and not threads or
Asyncio, any blocking calls will block the entire flow, and awaiting
incompatible asyncio primitives will do nothing.
Future
I’m considering replacing the flowno event loop with asyncio or adding compatibility with asyncio.
from flowno import node
from flowno.io import HttpClient, Headers, streaming_response_is_ok
from json import JSONEncoder
import os
API_URL = "https://api.groq.com/openai/v1/chat/completions"
TOKEN = os.environ["GROQ_API_KEY"]
headers = Headers()
headers.set("Authorization", f"Bearer {TOKEN}")
client = HttpClient(headers=headers)
class MessageJSONEncoder(JSONEncoder):
@override
def default(self, o: Any):
if isinstance(o, Message):
return {
"role": o.role,
"content": o.content,
}
return super().default(o)
client.json_encoder = MessageJSONEncoder()
@node
async def Inference(messages: Messages):
"""
Streams the LLM API response.
"""
response = await client.stream_post(
API_URL,
json={
"messages": messages,
"model": "llama-3.3-70b-versatile",
"stream": True,
},
)
if not streaming_response_is_ok(response):
logger.error("Response Body: %s", response.body)
raise HTTPException(response.status, response.body)
async for response_stream_json in response.body:
try:
choice = response_stream_json["choices"][0]
# If finish_reason is set, skip this chunk.
if choice.get("finish_reason"):
continue
# by adding a type here, the typechecker now knows Inference
# produces a Stream[str]
chunk: str = choice["delta"]["content"]
yield chunk
except KeyError:
raise ValueError(response_stream_json)
Future
I’m going to add a fleshed out version of the inference node to the list of pre-built nodes.
Here are the key parts of the node:
@node async def Inference(...):
: The stateless node definition MUST be anasync def
. The@~flowno.node
decorator transforms the function into a node factory.
Warning
The async
keyword is required, even if you don’t use any
async/await features.
The
HttpClient.stream_post()
method initiates a POST request to the API. Instead of waiting for the complete response, it immediately returns an async generator of deserialized Server-Sent-Events. Objects passed in with thejson=
keyword argument are serialized withHttpClient.json_encoder
.The
async for
loop is the async analog of the regularfor
loop. It awaits the next value in the async generatorresponse
, possibly suspending. Control is returned when the client recieves another streamed chunk.The
yield
statement is what makes this a Streaming Node rather than a Mono Node. Each time we receive a piece of data through our async for loop, we immediately pass it on to downstream nodes yielding control to the event loop, rather than waiting for all data to arrive first.If we wanted, we could return an accumulated
final_result
after ending the stream by manually raising aStopAsyncIteration
exception. Sadly, Python doesn’t allow mixingreturn
andyield
inAsyncGenerator
types. If you want to explicitly return a final value, you need to explicitly callraise StopAsyncIteration(return_value)
. This value will be passed to any connect downstream nodes that did not set@node(stream_in=[...])
. Instead in this example, I’m taking advantage of implicit accumulation ofStream[str]
values when sending the complete response to the downstreamChatHistory
node.
Tip
Flowno will automatically accumulate string values and provide them to downstream nodes that can not accept a Stream[str]
. I’ll point out this magic behavior when we make the node connections.
Terminal Chat Node
Next up is the TerminalChat()
stateless, stream-in, mono node.
from flowno import node
@node(stream_in=["response_chunks"])
async def TerminalChat(response_chunks: Stream[str]):
async for chunk in response_chunks:
print(chunk, end="", flush=True)
return input("> ")
Key points:
@node(stream_in=["response_chunks"])
: By passing instream_in
to the@~flowno.node
decorator, we say that this node can receive a stream of data in the given input, rather than a single value. If you forget this Flowno will wait until the upstream node (Inference()
in this case) has ended its stream and pass the full string toTerminalChat()
.async def TerminalOutput(response_chunks: Stream[str]):
: The argumentresponse_chunks
is annotated with the generic typeStream
. This type annotation is completely optional, but it is useful for static typechecking.Stream
is anAsyncIterator
like the value ofclient.stream_post(...)
in theInference
node body.
Warning
If you forget to use @~flowno.node
‘s stream_in=[...]
argument, flowno will pass in a str
value instead of the desired Stream[str]
.
async for chunk in response_chunks:
: Like before,await
values from the async iterator. This time the iterator is aStream[str]
type. The typechecker can infer thatchunk
is astr
. When the connected Streaming Node does not have fresh data, thisasync for
statement suspends until the upstream node has yielded more data.
Chat History Node
from dataclasses import dataclass
from typing import Literal
@dataclass
class Message:
role: Literal["system", "user", "assistant"]
content: str
@node
class ChatHistory:
messages = [Message("system", "You are a helpful assistant.")]
async def call(self, prompt: str, response: str):
self.messages.append(Message("user", prompt))
self.messages.append(Message("assistant", response))
return self.messages
Step 4: Designing the Flow Graph
with FlowHDL() as f:
f.terminal_input = TerminalChat()
f.history = ChatHistory(f.terminal_input, f.inference)
f.inference = GroqInference(f.history)
f.terminal_output = TerminalChat(f.inference)
Step 5: Handling Cycles
If you attempt to run the flow as-is:
f.run_until_complete()
You will get a MissingDefaultError
due to the cycle between ChatHistory
and GroqInference
.
Solution 1: Add a default value to ChatHistory
:
@node
class ChatHistory:
async def call(self, new_prompt: str, last_response: str = ""):
...
Solution 2: Add a default value to GroqInference
:
@node
async def GroqInference(messages: list[Message] | None = None):
if messages is None:
# Handle initial run
...
Read More