Source code for stick_ray.blocking_dict

import asyncio
from typing import Optional, Any, List, Hashable, Awaitable, Union, Dict


[docs] async def parallel(*awaitables: Awaitable, return_exceptions: bool = False) -> List[Union[Any, BaseException]]: """ Run multiple requests in parallel. Args: *awaitables: awaitables to run in parallel return_exceptions: if True, return exceptions instead of raising them Returns: list of results """ refs = await asyncio.gather( *awaitables ) results = await asyncio.gather( *refs, return_exceptions=return_exceptions ) return list(results)
[docs] class BlockingDict(object): """ A blocking dictionary that can be used to store and retrieve items in a thread-safe manner. """ def __init__(self): self._conditions: Dict[Hashable, asyncio.Condition] = dict() # Used to store condition variables self._data: Dict[Hashable, Any] = dict() # Used to store items self._lock = asyncio.Lock() # Used to lock the dictionary
[docs] def keys(self) -> List[Hashable]: """ Returns a list of keys in the dictionary. Returns: list of keys """ return list(self._data.keys())
[docs] def size(self) -> int: """ Returns the size of the dictionary. Returns: size """ return len(self._data)
[docs] async def put(self, key: Hashable, value: Any): """ Put an item into the dictionary with the given key. Overwrites the existing value with same key, if it exists. Args: key: key value: value """ cv: asyncio.Condition async with self._lock: if key not in self._conditions: # New lock is created for new key which may be overhead (should test) self._conditions[key] = asyncio.Condition() cv = self._conditions[key] # Get this reference before releasing lock async with cv: self._data[key] = value cv.notify_all()
[docs] async def peek(self, key: Hashable, timeout: Optional[float] = None) -> Any: """ Get an item from the dictionary, leaving the item there, optionally blocking and with timeout. Args: key: key timeout: timeout in seconds to wait when blocking. Returns: item matching key Raises: asyncio.Timeout if timeout elapsed and item not found """ cv: asyncio.Condition async with self._lock: if key not in self._conditions: # New lock is created which may be overhead (should test) self._conditions[key] = asyncio.Condition() cv = self._conditions[key] # Get this reference before releasing lock async with cv: async def _peek(): await cv.wait_for(lambda: key in self._data) return self._data[key] if timeout is not None: return await asyncio.wait_for(_peek(), timeout=timeout) return await _peek()
[docs] def has(self, key: Hashable) -> bool: """ Check if the dictionary contains the given key. Args: key: key Returns: True if key is in dictionary, False otherwise """ return key in self._data
[docs] async def delete(self, key: Hashable): """ Delete the given key from the dictionary. Args: key: key """ cv: asyncio.Condition async with self._lock: if key not in self._conditions: # New lock is created which may be overhead (should test) self._conditions[key] = asyncio.Condition() cv = self._conditions[key] # Get this reference before releasing lock async with cv: # Get lock on this condition, prevents a delete from happening while another thread has this lock. # Delete the data if key in self._data: del self._data[key] cv.notify_all()