flowno.core.event_loop.synchronization
Synchronization primitives for the Flowno event loop.
This module provides synchronization tools like CountdownLatch that help coordinate concurrent tasks in Flowno’s dataflow execution model. These primitives are particularly useful for ensuring proper data flow between nodes.
Examples
>>> from flowno.core.event_loop.event_loop import EventLoop
>>> from flowno.core.event_loop.primitives import spawn
>>> from flowno.core.event_loop.synchronization import CountdownLatch
>>>
>>> async def consumer(name: str, latch: CountdownLatch):
... print(f"{name}: Waiting for data...")
... await latch.wait()
... print(f"{name}: Data received, processing...")
... return f"{name} processed"
>>>
>>> async def producer(latch: CountdownLatch):
... print("Producer: Preparing data...")
... # Simulate data preparation
... print("Producer: Data ready, notifying consumers...")
... await latch.count_down()
... await latch.count_down() # Notify both consumers
... print("Producer: All consumers notified")
>>>
>>> async def main():
... # Create a latch that will block until counted down twice
... latch = CountdownLatch(count=2)
...
... # Start two consumers that wait on the latch
... consumer1 = await spawn(consumer("Consumer1", latch))
... consumer2 = await spawn(consumer("Consumer2", latch))
...
... # Start the producer that will count down the latch
... producer_task = await spawn(producer(latch))
...
... # Wait for all tasks to complete
... await producer_task.join()
... result1 = await consumer1.join()
... result2 = await consumer2.join()
...
... return [result1, result2]
>>>
>>> event_loop = EventLoop()
>>> results = event_loop.run_until_complete(main(), join=True)
Producer: Preparing data...
Consumer1: Waiting for data...
Consumer2: Waiting for data...
Producer: Data ready, notifying consumers...
Producer: All consumers notified
Consumer1: Data received, processing...
Consumer2: Data received, processing...
>>> print(results)
['Consumer1 processed', 'Consumer2 processed']
- class flowno.core.event_loop.synchronization.Barrier(parties: int)[source]
A synchronization primitive that allows multiple tasks to wait for each other.
A barrier is initialized with a participant count. Each task calls wait() on the barrier, and all tasks are blocked until the specified number of tasks have called wait().
Note
This is a basic implementation. For production use with many participants, consider implementing a more efficient version.
- class flowno.core.event_loop.synchronization.CountdownLatch(count: int)[source]
A synchronization primitive that allows one or more tasks to wait until a set of operations in other tasks completes.
The latch is initialized with a given count. Tasks can then wait on the latch, and each count_down() call decreases the counter. When the counter reaches zero, all waiting tasks are released.
This is currently used in
node_base
where a node needs to ensure all downstream nodes have consumed its previous output before generating new data.- count
The initial count that must be counted down to zero
- async count_down() None [source]
Decrement the latch count by one.
If the count reaches zero as a result of this call, all waiting tasks will be unblocked.
If the latch is already at zero, this method logs a warning.
- set_count(count: int) None [source]
Set a new count for the latch.
This method should only be called when no tasks are waiting on the latch.
- Parameters:
count – The new count value. Must be non-negative.
- Raises:
ValueError – If count is negative.