Source code for flowno.core.streaming_node

"""
Type definitions for streaming nodes in Flowno's typechecking system.

A *Streaming Node* is a node that yields multiple values via async generator
rather than returning a single value or tuple. This module defines Protocol classes
that improve typechecking specificity for different node arities.

.. warning::

    These classes are *never* instantiated directly - they exist solely for
    static type checking.

.. admonition:: Naming Conventions

    - StreamingNode: Base protocol for all streaming nodes regardless of arity
    - StreamingNodeX: Node with X inputs that streams any number of outputs
    - StreamingNodeX_Y: Node with X inputs that streams Y outputs

.. seealso::

    - :mod:`flowno.core.mono_node`: Type definitions for mono nodes
"""

from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator
from types import NotImplementedType
from typing import Any, Literal, Protocol, TypeAlias, TypeVar, overload

from flowno.core.node_base import DraftNode, DraftOutputPortRef
from typing_extensions import TypeVarTuple, Unpack, override

_ReturnTupleT_co = TypeVar("_ReturnTupleT_co", covariant=True, bound=tuple[object, ...])
"""Type variable for the yielded tuple type of a streaming node."""

_T1_contra = TypeVar("_T1_contra", contravariant=True)
_T2_contra = TypeVar("_T2_contra", contravariant=True)
_Ts = TypeVarTuple("_Ts")
"""Type variable for the input types of a streaming node."""

_Tout = TypeVar("_Tout", covariant=True)
_T1_co = TypeVar("_T1_co", covariant=True)
_T2_co = TypeVar("_T2_co", covariant=True)


[docs] class StreamingDraftOutputPortRef(DraftOutputPortRef[_Tout]): pass
[docs] class StreamingNode(Protocol[Unpack[_Ts], _ReturnTupleT_co]): """Protocol for streaming nodes that produce async generators of tuples. A streaming node is a node that yields multiple values via async generator rather than returning a single value or tuple. This class serves as the base Protocol for all streaming node arities. Note: This class is never instantiated - it exists solely for typechecking. """ def __init__(self, *args: Unpack[tuple[Any, ...]]): ... def call(self, *args: Unpack[_Ts]) -> AsyncGenerator[_ReturnTupleT_co, None]: ...
[docs] class StreamingNode0(DraftNode[_ReturnTupleT_co]): """A streaming node with 0 inputs that yields tuples. See StreamingNode for details on the streaming node protocol. """ @abstractmethod def __init__(self): ... def call(self) -> AsyncGenerator[_ReturnTupleT_co, None]: ...
[docs] class StreamingNode0_0(StreamingNode0[tuple[None]], ABC): """A streaming node with 0 inputs and 0 outputs. See StreamingNode for details on the streaming node protocol. """ @abstractmethod def __init__(self): ... @override def call(self) -> AsyncGenerator[tuple[None], None]: ... @override def output(self, output_port: int) -> "NotImplementedType": ...
[docs] class StreamingNode0_1(StreamingNode0[tuple[_T1_co]], ABC): """A streaming node with 0 inputs and 1 output. See StreamingNode for details on the streaming node protocol. """ @abstractmethod def __init__(self): ... @override def call(self) -> AsyncGenerator[tuple[_T1_co], None]: ... @overload def output(self, output_port: Literal[0]) -> "StreamingDraftOutputPortRef[_T1_co]": ... @overload def output(self, output_port: int) -> "NotImplementedType | StreamingDraftOutputPortRef[_T1_co]": ... @override def output(self, output_port: int) -> "NotImplementedType | StreamingDraftOutputPortRef[_T1_co]": ...
_T1_co = TypeVar("_T1_co", covariant=True) StreamingNode_1: TypeAlias = DraftNode[Unpack[tuple[Any, ...]], tuple[_T1_co]]
[docs] class StreamingNode1( DraftNode[_T1_contra, _ReturnTupleT_co], ): """A streaming node with 1 input that yields tuples. See StreamingNode for details on the streaming node protocol. """ # Python 3.10 does not allow Unpack in an *args list @abstractmethod def __init__(self, *args: Any): ... # TODO: Replace this with specific options when migrating to Python 3.11 def call(self, arg1: _T1_contra) -> AsyncGenerator[_ReturnTupleT_co, None]: ...
[docs] class StreamingNode2( DraftNode[_T1_contra, _T2_contra, _ReturnTupleT_co], ): """A streaming node with 2 inputs that yields tuples. See StreamingNode for details on the streaming node protocol. """ @abstractmethod def __init__(self): ... def call(self, arg1: _T1_contra, arg2: _T2_contra) -> AsyncGenerator[_ReturnTupleT_co, None]: ...