flowno.core.event_loop.synchronization

Synchronization primitives for the Flowno event loop.

This module provides synchronization tools like CountdownLatch that help coordinate concurrent tasks in Flowno’s dataflow execution model. These primitives are particularly useful for ensuring proper data flow between nodes.

Examples

>>> from flowno.core.event_loop.event_loop import EventLoop
>>> from flowno.core.event_loop.primitives import spawn
>>> from flowno.core.event_loop.synchronization import CountdownLatch
>>>
>>> async def consumer(name: str, latch: CountdownLatch):
...     print(f"{name}: Waiting for data...")
...     await latch.wait()
...     print(f"{name}: Data received, processing...")
...     return f"{name} processed"
>>>
>>> async def producer(latch: CountdownLatch):
...     print("Producer: Preparing data...")
...     # Simulate data preparation
...     print("Producer: Data ready, notifying consumers...")
...     await latch.count_down()
...     await latch.count_down()  # Notify both consumers
...     print("Producer: All consumers notified")
>>>
>>> async def main():
...     # Create a latch that will block until counted down twice
...     latch = CountdownLatch(count=2)
...
...     # Start two consumers that wait on the latch
...     consumer1 = await spawn(consumer("Consumer1", latch))
...     consumer2 = await spawn(consumer("Consumer2", latch))
...
...     # Start the producer that will count down the latch
...     producer_task = await spawn(producer(latch))
...
...     # Wait for all tasks to complete
...     await producer_task.join()
...     result1 = await consumer1.join()
...     result2 = await consumer2.join()
...
...     return [result1, result2]
>>>
>>> event_loop = EventLoop()
>>> results = event_loop.run_until_complete(main(), join=True)
Producer: Preparing data...
Consumer1: Waiting for data...
Consumer2: Waiting for data...
Producer: Data ready, notifying consumers...
Producer: All consumers notified
Consumer1: Data received, processing...
Consumer2: Data received, processing...
>>> print(results)
['Consumer1 processed', 'Consumer2 processed']
class flowno.core.event_loop.synchronization.Barrier(parties: int)[source]

A synchronization primitive that allows multiple tasks to wait for each other.

A barrier is initialized with a participant count. Each task calls wait() on the barrier, and all tasks are blocked until the specified number of tasks have called wait().

Note

This is a basic implementation. For production use with many participants, consider implementing a more efficient version.

async wait() int[source]

Wait for all parties to reach the barrier.

This method blocks until all parties have called wait() on this barrier. When the final party arrives, all waiting parties are released.

Returns:

The arrival index (0 through parties-1) for this task

class flowno.core.event_loop.synchronization.CountdownLatch(count: int)[source]

A synchronization primitive that allows one or more tasks to wait until a set of operations in other tasks completes.

The latch is initialized with a given count. Tasks can then wait on the latch, and each count_down() call decreases the counter. When the counter reaches zero, all waiting tasks are released.

This is currently used in node_base where a node needs to ensure all downstream nodes have consumed its previous output before generating new data.

count

The initial count that must be counted down to zero

property count: int

Get the current count of the latch.

Returns:

The current count value.

async count_down() None[source]

Decrement the latch count by one.

If the count reaches zero as a result of this call, all waiting tasks will be unblocked.

If the latch is already at zero, this method logs a warning.

set_count(count: int) None[source]

Set a new count for the latch.

This method should only be called when no tasks are waiting on the latch.

Parameters:

count – The new count value. Must be non-negative.

Raises:

ValueError – If count is negative.

async wait() None[source]

Block until the latch has counted down to zero.

This coroutine will not complete until count_down() has been called the specified number of times.