"""
Node decorator for Flowno.
This module provides the `@node` decorator, which transforms async functions or classes
into DraftNode subclasses. These nodes can then be used within a FlowHDL context to
define dataflow graphs.
Examples:
Basic usage:
>>> from flowno import node
>>>
>>> @node
... async def Add(x: int, y: int) -> int:
... return x + y
>>>
>>> add_node = Add(1, 2)
>>> print(add_node) # DraftNode instance
With stream inputs:
>>> from flowno import node, Stream
>>>
>>> @node(stream_in=["a"])
... async def SumStream(x: int, a: Stream[int]) -> int:
... total = x
... async for value in a:
... total += value
... return total
>>>
>>> sum_stream_node = SumStream(1)
>>> print(sum_stream_node) # DraftNode instance with stream input
With multiple outputs:
>>> from flowno import node
>>>
>>> @node(multiple_outputs=True)
... async def SumAndDiff(x: int, y: int) -> tuple[int, int]:
... return x + y, x - y
>>>
>>> sum_and_diff_node = SumAndDiff(3, 1)
>>> print(sum_and_diff_node) # DraftNode instance with multiple outputs
"""
from collections.abc import AsyncGenerator, Callable, Coroutine
from typing_extensions import Unpack
from typing import Any, Final, Literal, TypeVar, Union, overload
from flowno.core.mono_node import (
MonoNode,
MonoNode0,
MonoNode0_0,
MonoNode0_1,
MonoNode1,
MonoNode2,
)
from flowno.core.node_base import DraftNode
from flowno.core.streaming_node import (
StreamingNode,
StreamingNode0,
StreamingNode1,
StreamingNode2,
)
from flowno.decorators.node_meta_multiple_dec import node_meta_multiple_dec
from flowno.decorators.node_meta_single_dec import node_meta_single_dec
from flowno.decorators.single_output import (
ClassCall,
MonoClassCall,
StreamingClassCall,
create_class_node_subclass_single,
create_func_node_factory_single,
)
from typing_extensions import override
EMPTY_LIST: Final[list[str]] = [] # used to make the typing happy
_T1 = TypeVar("_T1")
_T2 = TypeVar("_T2")
_ReturnT = TypeVar("_ReturnT")
T1 = TypeVar("T1")
T2 = TypeVar("T2")
_ReturnT_co = TypeVar("_ReturnT_co", covariant=True)
_ReturnTupleT_co = TypeVar("_ReturnTupleT_co", bound=tuple[object, ...], covariant=True)
# # Eg:
# # @node
# # class StatefulNode:
# # async def call(self, x: str, y: int) -> int:
# @overload
# def node(
# func_or_cls: type[BlockingClassCall[T1, T2, _ReturnT_co]], /
# ) -> type[MonoNode2[T1, T2, tuple[_ReturnT_co]]]: ...
# # Eg:
# # @node
# # class StatefulNode:
# # async def call(self, x: str, y: int) -> AsyncGenerator[int, None]:
# @overload
# def node(
# func_or_cls: type[StreamingClassCall[T1, T2, _ReturnT_co]], /
# ) -> type[StreamingNode2[T1, T2, tuple[_ReturnT_co]]
# ]: ...
# # Eg:
# # @node
# # class StatefulNode:
# # async def call(self, x: str) -> int:
# @overload
# def node(
# func_or_cls: type[BlockingClassCall[T1, _ReturnT_co]], /
# ) -> type[MonoNode1[T1, tuple[_ReturnT_co]]]: ...
# # Eg:
# # @node
# # class StatefulNode:
# # async def call(self, x: str) -> AsyncGenerator[int, None]:
# @overload
# def node(
# func_or_cls: type[StreamingClassCall[T1, _ReturnT_co]], /
# ) -> type[StreamingNode1[T1, tuple[_ReturnT_co]]]: ...
# # Eg:
# # @node
# # class StatefulNode:
# # async def call(self) -> int:
# @overload
# def node(
# func_or_cls: type[MonoClassCall[_ReturnT_co]], /
# ) -> type[MonoNode0[tuple[_ReturnT_co]]]: ...
# # Eg:
# # @node
# # class StatefulNode:
# # async def call(self) -> AsyncGenerator[int, None]:
# @overload
# def node(
# func_or_cls: type[StreamingClassCall[_ReturnT_co]], /
# ) -> type[StreamingNode0[tuple[_ReturnT_co]]]: ...
# # Eg:
# # @node
# # async def StatelessNode(x: str, y: int) -> int:
# @overload
# def node(
# func_or_cls: Callable[[T1, T2], Coroutine[Any, Any, _ReturnT_co]], /
# ) -> type[MonoNode2[T1, T2, tuple[_ReturnT_co]]
# ]: ...
# # Eg:
# # @node
# # async def StatelessNode(x: str, y: int) -> AsyncGenerator[int, None]:
# @overload
# def node(
# func_or_cls: Callable[[T1, T2], AsyncGenerator[_ReturnT_co, None]], /
# ) -> type[StreamingNode2[T1, T2, tuple[_ReturnT_co]]
# ]: ...
# Eg:
# @node
# async def StatelessNode(x: str) -> int:
@overload
def node(
func_or_cls: Callable[[T1], Coroutine[Any, Any, _ReturnT_co]], /
) -> type[MonoNode1[T1, tuple[_ReturnT_co]]]: ...
# # Eg:
# # @node
# # async def StatelessNode(x: str) -> AsyncGenerator[int, None]:
# @overload
# def node(
# func_or_cls: Callable[[T1], AsyncGenerator[_ReturnT_co, None]], /
# ) -> type[StreamingNode1[T1, tuple[_ReturnT_co]]
# ]: ...
# Eg:
# @node
# async def StatelessNode() -> int:
@overload
def node(func_or_cls: Callable[[], Coroutine[Any, Any, None]], /) -> type[MonoNode0_0]: ...
@overload
def node(func_or_cls: Callable[[], Coroutine[Any, Any, _ReturnT_co]], /) -> type[MonoNode0_1[_ReturnT_co]]: ...
# # Eg:
# # @node
# # async def StatelessNode() -> AsyncGenerator[int, None]:
# @overload
# def node(
# func_or_cls: Callable[[], AsyncGenerator[_ReturnT_co, None]], /
# ) -> type[StreamingNode0[tuple[_ReturnT_co]]]: ...
# Eg:
# @node(stream_in=["a"])
# async def StatelessNode(x: str, a: Stream[int]) -> int:
# or:
# @node(stream_in=["a"])
# async def StatelessNode(x: str, a: Stream[int]) -> AsyncGenerator[int, None]:
@overload
def node(
func_or_cls: None = None,
/,
*,
multiple_outputs: Literal[False] | None = None,
stream_in: list[str] = EMPTY_LIST,
) -> node_meta_single_dec: ...
# # Eg:
# # @node(stream_in=["a"], multiple_outputs=True)
# # async def StatelessNode(x: str, a: Stream[int]) -> tuple[int, int]:
# # or:
# # @node(stream_in=["a"], multiple_outputs=True)
# # async def StatelessNode(x: str, a: Stream[int]) -> AsyncGenerator[tuple[int, int], None]:
# @overload
# def node(
# func_or_cls: None = None,
# /,
# *,
# multiple_outputs: Literal[True],
# stream_in: list[str] = EMPTY_LIST,
# ) -> node_meta_multiple_dec: ...
[docs]
def node(
func_or_cls: (
Callable[..., Coroutine[Any, Any, _ReturnT_co]]
| Callable[..., AsyncGenerator[_ReturnT_co, None]]
| type[ClassCall[Any, _ReturnT_co]]
| None
) = None,
/,
*,
multiple_outputs: Literal[False] | Literal[True] | None = None,
stream_in: list[str] = EMPTY_LIST,
) -> (
type[MonoNode[Unpack[tuple[Any, ...]], tuple[_ReturnT_co]]]
| type[StreamingNode[Unpack[tuple[Any, ...]], tuple[_ReturnT_co]]]
| node_meta_single_dec
| node_meta_multiple_dec[Unpack[tuple[object, ...]], _ReturnTupleT_co]
):
"""
Decorator that transforms async functions or classes into DraftNode subclasses.
Args:
func_or_cls: The async function or class to transform
multiple_outputs: Whether the node has multiple outputs
stream_in: List of input streams
Returns:
A DraftNode subclass or a node_meta decorator
Examples:
Basic usage:
>>> from flowno import node
>>>
>>> @node
... async def Add(x: int, y: int) -> int:
... return x + y
>>>
>>> add_node = Add(1, 2)
>>> print(add_node) # DraftNode instance
With stream inputs:
>>> from flowno import node, Stream
>>>
>>> @node(stream_in=["a"])
... async def SumStream(x: int, a: Stream[int]) -> int:
... total = x
... async for value in a:
... total += value
... return total
>>>
>>> sum_stream_node = SumStream(1)
>>> print(sum_stream_node) # DraftNode instance with stream input
With multiple outputs:
>>> from flowno import node
>>>
>>> @node(multiple_outputs=True)
... async def SumAndDiff(x: int, y: int) -> tuple[int, int]:
... return x + y, x - y
>>>
>>> sum_and_diff_node = SumAndDiff(3, 1)
>>> print(sum_and_diff_node) # DraftNode instance with multiple outputs
"""
if func_or_cls is not None:
# both stream_in and multiple_outputs are unset
if isinstance(func_or_cls, type):
return create_class_node_subclass_single(func_or_cls, stream_in)
else:
return create_func_node_factory_single(func_or_cls, stream_in)
elif multiple_outputs is not True:
return node_meta_single_dec(stream_in=stream_in)
else:
return node_meta_multiple_dec(stream_in=stream_in)