"""
Internal Command Types for the Flowno Event Loop
-------------------------------------------------
This module defines the command types used by the Flowno event loop to implement
its cooperative multitasking system. Commands are yielded by coroutines and
interpreted by the event loop to perform operations like task scheduling,
I/O operations, and synchronization.
.. warning::
These command types are used internally by the Flowno event loop to control
task scheduling, socket operations, and asynchronous queue interactions.
They are not part of the public API. Normal users should rely on the public
awaitable primitives (e.g. :func:`sleep`, :func:`spawn`, etc.) rather than
yielding these commands directly.
"""
from abc import ABC
from collections.abc import Generator
from dataclasses import dataclass
from types import coroutine
from typing import TYPE_CHECKING, Any, Generic, TypeVar
if TYPE_CHECKING:
from flowno.core.node_base import ObjectDraftNode
from flowno.core.event_loop.tasks import TaskHandle
from flowno.core.event_loop.selectors import SocketHandle
from flowno.core.event_loop.queues import AsyncQueue
from .types import DeltaTime, RawTask
_T = TypeVar("_T")
_T_co = TypeVar("_T_co", covariant=True)
[docs]
@dataclass
class Command(ABC):
"""
Base abstract class for all internal command types.
.. note::
These commands are part of the Flowno event loop's internal control
mechanism. Application developers should not use or yield these commands
directly.
"""
pass
[docs]
@dataclass
class SpawnCommand(Generic[_T_co], Command):
"""
Internal command to spawn a new raw task.
:param raw_task: The raw task coroutine to be scheduled.
.. note::
Users should use the public :func:`spawn` primitive instead of yielding
a SpawnCommand directly.
"""
raw_task: RawTask[Command, Any, _T_co]
[docs]
@dataclass
class JoinCommand(Generic[_T], Command):
"""
Internal command to suspend a task until another task finishes.
:param task_handle: A handle to the task to join.
.. note::
This is used internally to implement the :meth:`TaskHandle.join` awaitable.
"""
task_handle: "TaskHandle[_T]"
[docs]
@dataclass
class SleepCommand(Command):
"""
Internal command to suspend a task until a specified time.
:param end_time: The time (as a DeltaTime) until which the task should sleep.
.. note::
Users should use the public :func:`sleep` primitive rather than yielding
a SleepCommand.
"""
end_time: DeltaTime
[docs]
@dataclass
class SocketCommand(Command):
"""
Base internal command for socket operations.
:param handle: The socket handle associated with this operation.
.. note::
These commands are used by the event loop to implement non-blocking I/O.
"""
handle: "SocketHandle"
[docs]
class SocketSendCommand(SocketCommand):
"""
Internal command indicating that data is to be sent over a socket.
"""
pass
[docs]
class SocketRecvCommand(SocketCommand):
"""
Internal command indicating that data is to be received from a socket.
"""
pass
[docs]
class SocketAcceptCommand(SocketCommand):
"""
Internal command requesting to accept a new connection on a socket.
"""
pass
[docs]
@dataclass
class QueueGetCommand(Generic[_T], Command):
"""
Internal command to retrieve an item from an asynchronous queue.
:param queue: The asynchronous queue from which to retrieve the item.
:param peek: If True, the command will not remove the item from the queue.
.. note::
This command is used internally to implement the blocking get behavior.
"""
queue: "AsyncQueue[_T]"
peek: bool = False
[docs]
@dataclass
class QueuePutCommand(Generic[_T], Command):
"""
Internal command to put an item into an asynchronous queue.
:param queue: The asynchronous queue into which the item should be inserted.
:param item: The item to be inserted.
.. note::
This command is used internally to implement the blocking put behavior.
"""
queue: "AsyncQueue[_T]"
item: _T
[docs]
@dataclass
class QueueCloseCommand(Generic[_T], Command):
"""
Internal command to close an asynchronous queue.
:param queue: The asynchronous queue to be closed.
.. note::
This command is used internally by the event loop when closing queues.
"""
queue: "AsyncQueue[_T]"
[docs]
@dataclass
class QueueNotifyGettersCommand(Generic[_T], Command):
"""
Internal command to notify tasks waiting for items on an asynchronous queue.
:param queue: The asynchronous queue whose waiting getters should be notified.
.. note::
This command is used internally when an item is added to a queue to wake up
tasks blocked on a get operation.
"""
queue: "AsyncQueue[_T]"