Module hub.core.storage.lru_cache

Expand source code
from collections import OrderedDict
from hub.core.storage.provider import StorageProvider
from typing import Set

# TODO use lock for multiprocessing
class LRUCache(StorageProvider):
    """LRU Cache that uses StorageProvider for caching"""

    def __init__(
        self,
        cache_storage: StorageProvider,
        next_storage: StorageProvider,
        cache_size: int,
    ):
        """Initializes the LRUCache. It can be chained with other LRUCache objects to create multilayer caches.

        Args:
            cache_storage (StorageProvider): The storage being used as the caching layer of the cache.
                This should be a base provider such as MemoryProvider, LocalProvider or S3Provider but not another LRUCache.
            next_storage (StorageProvider): The next storage layer of the cache.
                This can either be a base provider (i.e. it is the final storage) or another LRUCache (i.e. in case of chained cache).
                While reading data, all misses from cache would be retrieved from here.
                While writing data, the data will be written to the next_storage when cache_storage is full or flush is called.
            cache_size (int): The total space that can be used from the cache_storage in bytes.
                This number may be less than the actual space available on the cache_storage.
                Setting it to a higher value than actually available space may lead to unexpected behaviors.
        """
        self.next_storage = next_storage
        self.cache_storage = cache_storage
        self.cache_size = cache_size

        # tracks keys in lru order, stores size of value, only keys present in this exist in cache
        self.lru_sizes: OrderedDict[str, int] = OrderedDict()
        self.dirty_keys: Set[str] = set()  # keys present in cache but not next_storage
        self.cache_used = 0

    def flush(self):
        """Writes data from cache_storage to next_storage. Only the dirty keys are written.
        This is a cascading function and leads to data being written to the final storage in case of a chained cache.
        """
        for key in self.dirty_keys:
            self.next_storage[key] = self.cache_storage[key]
        self.dirty_keys.clear()

        self.next_storage.flush()

    def __getitem__(self, path: str):
        """If item is in cache_storage, retrieves from there and returns.
        If item isn't in cache_storage, retrieves from next storage, stores in cache_storage (if possible) and returns.

        Args:
            path (str): the path relative to the root of the underlying storage.

        Raises:
            KeyError: if an object is not found at the path.

        Returns:
            bytes: The bytes of the object present at the path.
        """
        if path in self.lru_sizes:
            self.lru_sizes.move_to_end(path)  # refresh position for LRU
            return self.cache_storage[path]
        else:
            result = self.next_storage[path]  # fetch from storage, may throw KeyError
            if len(result) <= self.cache_size:  # insert in cache if it fits
                self._insert_in_cache(path, result)
            return result

    def __setitem__(self, path: str, value: bytes):
        """Puts the item in the cache_storage (if possible), else writes to next_storage.

        Args:
            path (str): the path relative to the root of the underlying storage.
            value (bytes): the value to be assigned at the path.
        """
        if path in self.lru_sizes:
            size = self.lru_sizes.pop(path)
            self.cache_used -= size

        if len(value) <= self.cache_size:
            self._insert_in_cache(path, value)
            self.dirty_keys.add(path)
        else:  # larger than cache, directly send to next layer
            self.dirty_keys.discard(path)
            self.next_storage[path] = value

    def __delitem__(self, path: str):
        """Deletes the object present at the path from the cache and the underlying storage.

        Args:
            path (str): the path to the object relative to the root of the provider.

        Raises:
            KeyError: If an object is not found at the path.
        """
        deleted_from_cache = False
        if path in self.lru_sizes:
            size = self.lru_sizes.pop(path)
            self.cache_used -= size
            del self.cache_storage[path]
            self.dirty_keys.discard(path)
            deleted_from_cache = True

        try:
            del self.next_storage[path]
        except KeyError:
            if not deleted_from_cache:
                raise

    def __len__(self):
        """Returns the number of files present in the cache and the underlying storage.

        Returns:
            int: the number of files present inside the root.
        """
        return len(self._list_keys())

    def __iter__(self):
        """Generator function that iterates over the keys of the cache and the underlying storage.

        Yields:
            str: the path of the object that it is iterating over, relative to the root of the provider.
        """
        yield from self._list_keys()

    def _free_up_space(self, extra_size: int):
        """Helper function that frees up space the requred space in cache.
            No action is taken if there is sufficient space in the cache.

        Args:
            extra_size (int): the space that needs is required in bytes.
        """
        while self.cache_used > 0 and extra_size + self.cache_used > self.cache_size:
            self._pop_from_cache()

    def _pop_from_cache(self):
        """Helper function that pops the least recently used key, value pair from the cache"""
        key, itemsize = self.lru_sizes.popitem(last=False)
        if key in self.dirty_keys:
            self.next_storage[key] = self.cache_storage[key]
            self.dirty_keys.discard(key)
        del self.cache_storage[key]
        self.cache_used -= itemsize

    def _insert_in_cache(self, path: str, value: bytes):
        """Helper function that adds a key value pair to the cache.

        Args:
            path (str): the path relative to the root of the underlying storage.
            value (bytes): the value to be assigned at the path.
        """
        self._free_up_space(len(value))
        self.cache_storage[path] = value
        self.cache_used += len(value)
        self.lru_sizes[path] = len(value)

    def _list_keys(self):
        """Helper function that lists all the objects present in the cache and the underlying storage.

        Returns:
            list: list of all the objects found in the cache and the underlying storage.
        """
        all_keys = {key for key in self.next_storage}
        for key in self.cache_storage:
            all_keys.add(key)
        return list(all_keys)

Classes

class LRUCache (cache_storage: StorageProvider, next_storage: StorageProvider, cache_size: int)

LRU Cache that uses StorageProvider for caching

Initializes the LRUCache. It can be chained with other LRUCache objects to create multilayer caches.

Args

cache_storage : StorageProvider
The storage being used as the caching layer of the cache. This should be a base provider such as MemoryProvider, LocalProvider or S3Provider but not another LRUCache.
next_storage : StorageProvider
The next storage layer of the cache. This can either be a base provider (i.e. it is the final storage) or another LRUCache (i.e. in case of chained cache). While reading data, all misses from cache would be retrieved from here. While writing data, the data will be written to the next_storage when cache_storage is full or flush is called.
cache_size : int
The total space that can be used from the cache_storage in bytes. This number may be less than the actual space available on the cache_storage. Setting it to a higher value than actually available space may lead to unexpected behaviors.
Expand source code
class LRUCache(StorageProvider):
    """LRU Cache that uses StorageProvider for caching"""

    def __init__(
        self,
        cache_storage: StorageProvider,
        next_storage: StorageProvider,
        cache_size: int,
    ):
        """Initializes the LRUCache. It can be chained with other LRUCache objects to create multilayer caches.

        Args:
            cache_storage (StorageProvider): The storage being used as the caching layer of the cache.
                This should be a base provider such as MemoryProvider, LocalProvider or S3Provider but not another LRUCache.
            next_storage (StorageProvider): The next storage layer of the cache.
                This can either be a base provider (i.e. it is the final storage) or another LRUCache (i.e. in case of chained cache).
                While reading data, all misses from cache would be retrieved from here.
                While writing data, the data will be written to the next_storage when cache_storage is full or flush is called.
            cache_size (int): The total space that can be used from the cache_storage in bytes.
                This number may be less than the actual space available on the cache_storage.
                Setting it to a higher value than actually available space may lead to unexpected behaviors.
        """
        self.next_storage = next_storage
        self.cache_storage = cache_storage
        self.cache_size = cache_size

        # tracks keys in lru order, stores size of value, only keys present in this exist in cache
        self.lru_sizes: OrderedDict[str, int] = OrderedDict()
        self.dirty_keys: Set[str] = set()  # keys present in cache but not next_storage
        self.cache_used = 0

    def flush(self):
        """Writes data from cache_storage to next_storage. Only the dirty keys are written.
        This is a cascading function and leads to data being written to the final storage in case of a chained cache.
        """
        for key in self.dirty_keys:
            self.next_storage[key] = self.cache_storage[key]
        self.dirty_keys.clear()

        self.next_storage.flush()

    def __getitem__(self, path: str):
        """If item is in cache_storage, retrieves from there and returns.
        If item isn't in cache_storage, retrieves from next storage, stores in cache_storage (if possible) and returns.

        Args:
            path (str): the path relative to the root of the underlying storage.

        Raises:
            KeyError: if an object is not found at the path.

        Returns:
            bytes: The bytes of the object present at the path.
        """
        if path in self.lru_sizes:
            self.lru_sizes.move_to_end(path)  # refresh position for LRU
            return self.cache_storage[path]
        else:
            result = self.next_storage[path]  # fetch from storage, may throw KeyError
            if len(result) <= self.cache_size:  # insert in cache if it fits
                self._insert_in_cache(path, result)
            return result

    def __setitem__(self, path: str, value: bytes):
        """Puts the item in the cache_storage (if possible), else writes to next_storage.

        Args:
            path (str): the path relative to the root of the underlying storage.
            value (bytes): the value to be assigned at the path.
        """
        if path in self.lru_sizes:
            size = self.lru_sizes.pop(path)
            self.cache_used -= size

        if len(value) <= self.cache_size:
            self._insert_in_cache(path, value)
            self.dirty_keys.add(path)
        else:  # larger than cache, directly send to next layer
            self.dirty_keys.discard(path)
            self.next_storage[path] = value

    def __delitem__(self, path: str):
        """Deletes the object present at the path from the cache and the underlying storage.

        Args:
            path (str): the path to the object relative to the root of the provider.

        Raises:
            KeyError: If an object is not found at the path.
        """
        deleted_from_cache = False
        if path in self.lru_sizes:
            size = self.lru_sizes.pop(path)
            self.cache_used -= size
            del self.cache_storage[path]
            self.dirty_keys.discard(path)
            deleted_from_cache = True

        try:
            del self.next_storage[path]
        except KeyError:
            if not deleted_from_cache:
                raise

    def __len__(self):
        """Returns the number of files present in the cache and the underlying storage.

        Returns:
            int: the number of files present inside the root.
        """
        return len(self._list_keys())

    def __iter__(self):
        """Generator function that iterates over the keys of the cache and the underlying storage.

        Yields:
            str: the path of the object that it is iterating over, relative to the root of the provider.
        """
        yield from self._list_keys()

    def _free_up_space(self, extra_size: int):
        """Helper function that frees up space the requred space in cache.
            No action is taken if there is sufficient space in the cache.

        Args:
            extra_size (int): the space that needs is required in bytes.
        """
        while self.cache_used > 0 and extra_size + self.cache_used > self.cache_size:
            self._pop_from_cache()

    def _pop_from_cache(self):
        """Helper function that pops the least recently used key, value pair from the cache"""
        key, itemsize = self.lru_sizes.popitem(last=False)
        if key in self.dirty_keys:
            self.next_storage[key] = self.cache_storage[key]
            self.dirty_keys.discard(key)
        del self.cache_storage[key]
        self.cache_used -= itemsize

    def _insert_in_cache(self, path: str, value: bytes):
        """Helper function that adds a key value pair to the cache.

        Args:
            path (str): the path relative to the root of the underlying storage.
            value (bytes): the value to be assigned at the path.
        """
        self._free_up_space(len(value))
        self.cache_storage[path] = value
        self.cache_used += len(value)
        self.lru_sizes[path] = len(value)

    def _list_keys(self):
        """Helper function that lists all the objects present in the cache and the underlying storage.

        Returns:
            list: list of all the objects found in the cache and the underlying storage.
        """
        all_keys = {key for key in self.next_storage}
        for key in self.cache_storage:
            all_keys.add(key)
        return list(all_keys)

Ancestors

  • StorageProvider
  • abc.ABC
  • collections.abc.MutableMapping
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Methods

def flush(self)

Writes data from cache_storage to next_storage. Only the dirty keys are written. This is a cascading function and leads to data being written to the final storage in case of a chained cache.

Expand source code
def flush(self):
    """Writes data from cache_storage to next_storage. Only the dirty keys are written.
    This is a cascading function and leads to data being written to the final storage in case of a chained cache.
    """
    for key in self.dirty_keys:
        self.next_storage[key] = self.cache_storage[key]
    self.dirty_keys.clear()

    self.next_storage.flush()

Inherited members