flowno.core.event_loop.primitives

Low-level concurrency primitives for the Flowno event loop.

This module provides the core asynchronous primitives that enable cooperative multitasking in Flowno. These primitives are similar to those provided by asyncio but specifically tailored for Flowno’s event loop.

Examples

Sleep can be used directly with the event loop:

>>> from flowno.core.event_loop.event_loop import EventLoop
>>> from flowno.core.event_loop.primitives import sleep
>>>
>>> async def delayed_hello():
...     print("Hello")
...     duration = await sleep(0.5)  # pause for 0.5 seconds
...     print(f"World! (slept for {duration:.1f}s)")
...     return "Done"
>>>
>>> loop = EventLoop()
>>> result = loop.run_until_complete(delayed_hello(), join=True)
Hello
World! (slept for 0.5s)
>>> print(result)
Done

The azip function combines multiple asynchronous streams:

>>> from flowno import node, FlowHDL, Stream
>>> from flowno.core.event_loop.primitives import azip
>>>
>>> @node
... async def Numbers(count: int):
...     for i in range(count):
...         yield i
>>>
>>> @node(stream_in=["a", "b"])
... async def Pairs(a: Stream[int], b: Stream[str]):
...     async for num, letter in azip(a, b):
...         yield f"{num}:{letter}"
>>>
>>> @node
... async def Letters(chars: str):
...     for c in chars:
...         yield c
>>>
>>> with FlowHDL() as f:
...     f.nums = Numbers(3)
...     f.chars = Letters("ABC")
...     f.pairs = Pairs(f.nums, f.chars)
...
>>> f.run_until_complete()
>>> f.pairs.get_data()
('0:A', '1:B', '2:C')
async flowno.core.event_loop.primitives.azip(iterable: AsyncIterator[_T_co], /) AsyncGenerator[tuple[_T_co], None][source]
async flowno.core.event_loop.primitives.azip(iterable1: AsyncIterator[_T1], iterable2: AsyncIterator[_T2], /) AsyncGenerator[tuple[_T1, _T2], None]
async flowno.core.event_loop.primitives.azip(iterable1: AsyncIterator[_T1], iterable2: AsyncIterator[_T2], iterable3: AsyncIterator[_T3], /) AsyncGenerator[tuple[_T1, _T2, _T3], None]
async flowno.core.event_loop.primitives.azip(iterable1: AsyncIterator[_T1], iterable2: AsyncIterator[_T2], iterable3: AsyncIterator[_T3], iterable4: AsyncIterator[_T4], /) AsyncGenerator[tuple[_T1, _T2, _T3, _T4], None]

Combine multiple async iterators, similar to the built-in zip() function.

This function takes multiple async iterators and yields tuples containing items from each iterator, advancing all iterators in lockstep. It stops when the shortest iterator is exhausted.

Parameters:

*args – Two or more async iterators to combine

Yields:

Tuples containing one item from each iterator

Example

>>> async def gen1():
...     for i in range(3):
...         yield i
>>>
>>> async def gen2():
...     yield "a"
...     yield "b"
>>>
>>> # Will yield (0, "a") and (1, "b")
>>> async for pair in azip(gen1(), gen2()):
...     print(pair)
flowno.core.event_loop.primitives.sleep(duration: float) Generator[SleepCommand, None, float][source]

Suspend the current task for the specified duration.

Parameters:

duration – The number of seconds to sleep.

Returns:

The actual time slept (which may be slightly longer than requested).

flowno.core.event_loop.primitives.socket(family: AddressFamily | int = -1, type: SocketKind | int = -1, proto: int = -1, fileno: int | None = None, use_tls: bool = False, ssl_context: SSLContext | None = None, server_hostname: str | None = None) SocketHandle[source]

Create a new socket compatible with Flowno’s event loop.

Parameters:
  • family – The address family (default: AF_INET)

  • type – The socket type (default: SOCK_STREAM)

  • proto – The protocol number (default: 0)

  • fileno – If specified, the socket is created from an existing file descriptor

  • use_tls – When True, creates a TLS-wrapped socket

  • ssl_context – The SSL context to use (if use_tls=True)

  • server_hostname – The server hostname for TLS certificate validation

Returns:

A SocketHandle that can be used with the Flowno event loop.

flowno.core.event_loop.primitives.spawn(raw_task: Coroutine[Any, Any, _T_co]) Generator[SpawnCommand[_T_co], Any, TaskHandle[_T_co]][source]

Spawn a new task to run concurrently with the current task.

Parameters:

raw_task – The coroutine to run as a new task.

Returns:

A TaskHandle that can be used to wait for the task to complete.