flowno.core.event_loop.queues
Asynchronous queue implementations for the Flowno event loop.
This module provides queue classes that integrate with Flowno’s custom event loop, allowing tasks to safely exchange data and coordinate their execution. These queues implement the AsyncIterator protocol, making them convenient for use in async for loops.
Examples
Basic queue operations:
>>> from flowno.core.event_loop.event_loop import EventLoop
>>> from flowno.core.event_loop.queues import AsyncQueue
>>>
>>> async def producer_consumer():
... # Create a queue with maximum size 2
... queue = AsyncQueue(maxsize=2)
...
... # Put some items into the queue
... await queue.put("task 1")
... await queue.put("task 2")
...
... # Peek at the first item without removing it
... first = await queue.peek()
...
... # Get and process items
... item1 = await queue.get()
... item2 = await queue.get()
...
... # Close the queue when done
... await queue.close()
... return (first, item1, item2)
>>>
>>> loop = EventLoop()
>>> result = loop.run_until_complete(producer_consumer(), join=True)
>>> result
('task 1', 'task 1', 'task 2')
Using a queue as an async iterator:
>>> async def queue_iterator_example():
... queue = AsyncQueue()
...
... # Add some items
... for i in range(3):
... await queue.put(f"item {i}")
...
... # Process all items using async for
... results = []
... async for item in queue.until_empty():
... results.append(item)
...
... return results
>>>
>>> loop = EventLoop()
>>> loop.run_until_complete(queue_iterator_example(), join=True)
['item 0', 'item 1', 'item 2']
- class flowno.core.event_loop.queues.AsyncQueue(maxsize: int | None = None)[source]
An asynchronous queue for the Flowno event loop.
This queue allows tasks to exchange data safely, with proper synchronization handled by the event loop. When used as an async iterator, it yields items until the queue is closed.
- Parameters:
maxsize – The maximum number of items allowed in the queue. When the queue reaches this size, put() operations will block until items are removed. If None, the queue size is unbounded.
- _close() Generator[QueueCloseCommand[_T], None, None] [source]
Close the queue, preventing further put operations.
- _get() Generator[QueueGetCommand[_T], _T, _T] [source]
Pop an item from the queue or block until an item is available.
- Raises:
QueueClosedError – If the queue is closed and empty.
- _peek() Generator[QueueGetCommand[_T], _T, _T] [source]
Peek at the next item without removing it from the queue.
- Raises:
QueueClosedError – If the queue is closed and empty.
- _put(item: _T) Generator[QueuePutCommand[_T] | QueueNotifyGettersCommand[_T], None, None] [source]
Put an item into the queue and notify blocked tasks or wait for room on the queue.
- Raises:
QueueClosedError – If the queue is closed.
- async close() None [source]
Close the queue, preventing further put operations.
- After closing:
put() will raise QueueClosedError
get() will succeed until the queue is empty, then raise QueueClosedError
AsyncIterator interface will stop iteration when the queue is empty
- async get() _T [source]
Get an item from the queue.
If the queue is empty, this will wait until an item is put into the queue.
- Returns:
The next item from the queue
- Raises:
QueueClosedError – If the queue is closed and empty
- is_closed() bool [source]
Check if the queue is closed.
- Returns:
True if the queue is closed, False otherwise
- async peek() _T [source]
Peek at the next item without removing it from the queue.
If the queue is empty, this will wait until an item is put into the queue.
- Returns:
The next item from the queue (without removing it)
- Raises:
QueueClosedError – If the queue is closed and empty
- async put(item: _T) None [source]
Put an item into the queue.
If the queue is full and has a maxsize, this will wait until space is available.
- Parameters:
item – The item to put into the queue
- Raises:
QueueClosedError – If the queue is closed
- until_empty() AsyncIterator[_T] [source]
Get an async iterator that consumes all items until the queue is empty.
This iterator will close the queue automatically when all items are consumed, unless specified otherwise.
- Returns:
An async iterator that yields items until the queue is empty
- class flowno.core.event_loop.queues.AsyncSetQueue(maxsize: int | None = None)[source]
A queue variant that ensures each item appears only once.
This queue behaves like a standard AsyncQueue, but automatically deduplicates items based on equality.
Example
>>> from flowno.core.event_loop.event_loop import EventLoop >>> from flowno.core.event_loop.queues import AsyncSetQueue >>> >>> async def set_queue_example(): ... queue = AsyncSetQueue() ... ... # Add some items with duplicates ... await queue.put("apple") ... await queue.put("banana") ... await queue.put("apple") # This won't be added again ... await queue.put("cherry") ... ... # Get all unique items ... items = [] ... while len(queue) > 0: ... items.append(await queue.get()) ... ... return items >>> >>> loop = EventLoop() >>> loop.run_until_complete(set_queue_example(), join=True) ['apple', 'banana', 'cherry']
- async put(item: _T) None [source]
Put an item into the queue if it’s not already present.
- Parameters:
item – The item to put into the queue
- Raises:
QueueClosedError – If the queue is closed
- async putAll(items: list[_T]) None [source]
Put multiple unique items into the queue.
- Parameters:
items – A list of items to add to the queue
- Raises:
QueueClosedError – If the queue is closed
- exception flowno.core.event_loop.queues.QueueClosedError[source]
Raised when attempting to put/get on a closed queue.
- class flowno.core.event_loop.queues.TaskWaitingOnQueueGet(task: Coroutine[QueueGetCommand[_T], Any, Any], peek: bool)[source]
Internal class for tracking tasks waiting to get an item.
- class flowno.core.event_loop.queues.TaskWaitingOnQueuePut(task: Coroutine[QueuePutCommand[_T], Any, None], item: _T)[source]
Internal class for tracking tasks waiting to put an item.
- class flowno.core.event_loop.queues._UntilEmptyIterator(queue: AsyncQueue[_T], self_closing: bool = True)[source]
Helper class for implementing the until_empty method.