123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595 |
- "Fanout cache automatically shards keys and values."
-
- import itertools as it
- import os.path as op
- import sqlite3
- import time
-
- from .core import ENOVAL, DEFAULT_SETTINGS, Cache, Disk, Timeout
- from .memo import memoize
- from .persistent import Deque, Index
-
-
- class FanoutCache(object):
- "Cache that shards keys and values."
- def __init__(self, directory, shards=8, timeout=0.010, disk=Disk,
- **settings):
- """Initialize cache instance.
-
- :param str directory: cache directory
- :param int shards: number of shards to distribute writes
- :param float timeout: SQLite connection timeout
- :param disk: `Disk` instance for serialization
- :param settings: any of `DEFAULT_SETTINGS`
-
- """
- self._directory = directory
- self._count = shards
- default_size_limit = DEFAULT_SETTINGS['size_limit']
- size_limit = settings.pop('size_limit', default_size_limit) / shards
- self._shards = tuple(
- Cache(
- op.join(directory, '%03d' % num),
- timeout=timeout,
- disk=disk,
- size_limit=size_limit,
- **settings
- )
- for num in range(shards)
- )
- self._hash = self._shards[0].disk.hash
- self._deques = {}
- self._indexes = {}
-
-
- @property
- def directory(self):
- """Cache directory."""
- return self._directory
-
-
- def __getattr__(self, name):
- return getattr(self._shards[0], name)
-
-
- def set(self, key, value, expire=None, read=False, tag=None, retry=False):
- """Set `key` and `value` item in cache.
-
- When `read` is `True`, `value` should be a file-like object opened
- for reading in binary mode.
-
- If database timeout occurs then fails silently unless `retry` is set to
- `True` (default `False`).
-
- :param key: key for item
- :param value: value for item
- :param float expire: seconds until the key expires
- (default None, no expiry)
- :param bool read: read value as raw bytes from file (default False)
- :param str tag: text to associate with key (default None)
- :param bool retry: retry if database timeout expires (default False)
- :return: True if item was set
-
- """
- index = self._hash(key) % self._count
- set_func = self._shards[index].set
-
- while True:
- try:
- return set_func(key, value, expire, read, tag)
- except Timeout:
- if retry:
- continue
- else:
- return False
-
-
- def __setitem__(self, key, value):
- """Set `key` and `value` item in cache.
-
- Calls :func:`FanoutCache.set` internally with `retry` set to `True`.
-
- :param key: key for item
- :param value: value for item
-
- """
- self.set(key, value, retry=True)
-
-
- def add(self, key, value, expire=None, read=False, tag=None, retry=False):
- """Add `key` and `value` item to cache.
-
- Similar to `set`, but only add to cache if key not present.
-
- This operation is atomic. Only one concurrent add operation for given
- key from separate threads or processes will succeed.
-
- When `read` is `True`, `value` should be a file-like object opened
- for reading in binary mode.
-
- :param key: key for item
- :param value: value for item
- :param float expire: seconds until the key expires
- (default None, no expiry)
- :param bool read: read value as bytes from file (default False)
- :param str tag: text to associate with key (default None)
- :param bool retry: retry if database timeout expires (default False)
- :return: True if item was added
-
- """
- index = self._hash(key) % self._count
- add_func = self._shards[index].add
-
- while True:
- try:
- return add_func(key, value, expire, read, tag)
- except Timeout:
- if retry:
- continue
- else:
- return False
-
-
- def incr(self, key, delta=1, default=0, retry=False):
- """Increment value by delta for item with key.
-
- If key is missing and default is None then raise KeyError. Else if key
- is missing and default is not None then use default for value.
-
- Operation is atomic. All concurrent increment operations will be
- counted individually.
-
- Assumes value may be stored in a SQLite column. Most builds that target
- machines with 64-bit pointer widths will support 64-bit signed
- integers.
-
- :param key: key for item
- :param int delta: amount to increment (default 1)
- :param int default: value if key is missing (default 0)
- :param bool retry: retry if database timeout expires (default False)
- :return: new value for item on success else None
- :raises KeyError: if key is not found and default is None
-
- """
- index = self._hash(key) % self._count
- incr_func = self._shards[index].incr
-
- while True:
- try:
- return incr_func(key, delta, default)
- except Timeout:
- if retry:
- continue
- else:
- return None
-
-
- def decr(self, key, delta=1, default=0, retry=False):
- """Decrement value by delta for item with key.
-
- If key is missing and default is None then raise KeyError. Else if key
- is missing and default is not None then use default for value.
-
- Operation is atomic. All concurrent decrement operations will be
- counted individually.
-
- Unlike Memcached, negative values are supported. Value may be
- decremented below zero.
-
- Assumes value may be stored in a SQLite column. Most builds that target
- machines with 64-bit pointer widths will support 64-bit signed
- integers.
-
- :param key: key for item
- :param int delta: amount to decrement (default 1)
- :param int default: value if key is missing (default 0)
- :param bool retry: retry if database timeout expires (default False)
- :return: new value for item on success else None
- :raises KeyError: if key is not found and default is None
-
- """
- return self.incr(key, -delta, default, retry)
-
-
- def get(self, key, default=None, read=False, expire_time=False, tag=False,
- retry=False):
- """Retrieve value from cache. If `key` is missing, return `default`.
-
- If database timeout occurs then returns `default` unless `retry` is set
- to `True` (default `False`).
-
- :param key: key for item
- :param default: return value if key is missing (default None)
- :param bool read: if True, return file handle to value
- (default False)
- :param float expire_time: if True, return expire_time in tuple
- (default False)
- :param tag: if True, return tag in tuple (default False)
- :param bool retry: retry if database timeout expires (default False)
- :return: value for item if key is found else default
-
- """
- index = self._hash(key) % self._count
- get_func = self._shards[index].get
-
- while True:
- try:
- return get_func(
- key, default=default, read=read, expire_time=expire_time,
- tag=tag,
- )
- except (Timeout, sqlite3.OperationalError):
- if retry:
- continue
- else:
- return default
-
-
- def __getitem__(self, key):
- """Return corresponding value for `key` from cache.
-
- Calls :func:`FanoutCache.get` internally with `retry` set to `True`.
-
- :param key: key for item
- :return: value for item
- :raises KeyError: if key is not found
-
- """
- value = self.get(key, default=ENOVAL, retry=True)
-
- if value is ENOVAL:
- raise KeyError(key)
-
- return value
-
-
- def read(self, key):
- """Return file handle corresponding to `key` from cache.
-
- :param key: key for item
- :return: file open for reading in binary mode
- :raises KeyError: if key is not found
-
- """
- handle = self.get(key, default=ENOVAL, read=True, retry=True)
- if handle is ENOVAL:
- raise KeyError(key)
- return handle
-
-
- def __contains__(self, key):
- """Return `True` if `key` matching item is found in cache.
-
- :param key: key for item
- :return: True if key is found
-
- """
- index = self._hash(key) % self._count
- return key in self._shards[index]
-
-
- def pop(self, key, default=None, expire_time=False, tag=False,
- retry=False):
- """Remove corresponding item for `key` from cache and return value.
-
- If `key` is missing, return `default`.
-
- Operation is atomic. Concurrent operations will be serialized.
-
- :param key: key for item
- :param default: return value if key is missing (default None)
- :param float expire_time: if True, return expire_time in tuple
- (default False)
- :param tag: if True, return tag in tuple (default False)
- :param bool retry: retry if database timeout expires (default False)
- :return: value for item if key is found else default
-
- """
- index = self._hash(key) % self._count
- pop_func = self._shards[index].pop
-
- while True:
- try:
- return pop_func(
- key, default=default, expire_time=expire_time, tag=tag,
- )
- except Timeout:
- if retry:
- continue
- else:
- return default
-
-
- def delete(self, key, retry=False):
- """Delete corresponding item for `key` from cache.
-
- Missing keys are ignored.
-
- If database timeout occurs then fails silently unless `retry` is set to
- `True` (default `False`).
-
- :param key: key for item
- :param bool retry: retry if database timeout expires (default False)
- :return: True if item was deleted
-
- """
- index = self._hash(key) % self._count
- del_func = self._shards[index].__delitem__
-
- while True:
- try:
- return del_func(key)
- except Timeout:
- if retry:
- continue
- else:
- return False
- except KeyError:
- return False
-
-
- def __delitem__(self, key):
- """Delete corresponding item for `key` from cache.
-
- Calls :func:`FanoutCache.delete` internally with `retry` set to `True`.
-
- :param key: key for item
- :raises KeyError: if key is not found
-
- """
- deleted = self.delete(key, retry=True)
-
- if not deleted:
- raise KeyError(key)
-
-
- memoize = memoize
-
-
- def check(self, fix=False):
- """Check database and file system consistency.
-
- Intended for use in testing and post-mortem error analysis.
-
- While checking the cache table for consistency, a writer lock is held
- on the database. The lock blocks other cache clients from writing to
- the database. For caches with many file references, the lock may be
- held for a long time. For example, local benchmarking shows that a
- cache with 1,000 file references takes ~60ms to check.
-
- :param bool fix: correct inconsistencies
- :return: list of warnings
- :raises Timeout: if database timeout expires
-
- """
- return sum((shard.check(fix=fix) for shard in self._shards), [])
-
-
- def expire(self):
- """Remove expired items from cache.
-
- :return: count of items removed
-
- """
- return self._remove('expire', args=(time.time(),))
-
-
- def create_tag_index(self):
- """Create tag index on cache database.
-
- It is better to initialize cache with `tag_index=True` than use this.
-
- :raises Timeout: if database timeout expires
-
- """
- for shard in self._shards:
- shard.create_tag_index()
-
-
- def drop_tag_index(self):
- """Drop tag index on cache database.
-
- :raises Timeout: if database timeout expires
-
- """
- for shard in self._shards:
- shard.drop_tag_index()
-
-
- def evict(self, tag):
- """Remove items with matching `tag` from cache.
-
- :param str tag: tag identifying items
- :return: count of items removed
-
- """
- return self._remove('evict', args=(tag,))
-
-
- def cull(self):
- """Cull items from cache until volume is less than size limit.
-
- :return: count of items removed
-
- """
- return self._remove('cull')
-
-
- def clear(self):
- """Remove all items from cache.
-
- :return: count of items removed
-
- """
- return self._remove('clear')
-
-
- def _remove(self, name, args=()):
- total = 0
- for shard in self._shards:
- method = getattr(shard, name)
- while True:
- try:
- count = method(*args)
- total += count
- except Timeout as timeout:
- total += timeout.args[0]
- else:
- break
- return total
-
-
- def stats(self, enable=True, reset=False):
- """Return cache statistics hits and misses.
-
- :param bool enable: enable collecting statistics (default True)
- :param bool reset: reset hits and misses to 0 (default False)
- :return: (hits, misses)
-
- """
- results = [shard.stats(enable, reset) for shard in self._shards]
- return (sum(result[0] for result in results),
- sum(result[1] for result in results))
-
-
- def volume(self):
- """Return estimated total size of cache on disk.
-
- :return: size in bytes
-
- """
- return sum(shard.volume() for shard in self._shards)
-
-
- def close(self):
- "Close database connection."
- for shard in self._shards:
- shard.close()
- self._deques.clear()
- self._indexes.clear()
-
-
- def __enter__(self):
- return self
-
-
- def __exit__(self, *exception):
- self.close()
-
-
- def __getstate__(self):
- return (self._directory, self._count, self.timeout, type(self.disk))
-
-
- def __setstate__(self, state):
- self.__init__(*state)
-
-
- def __iter__(self):
- "Iterate keys in cache including expired items."
- iterators = [iter(shard) for shard in self._shards]
- return it.chain.from_iterable(iterators)
-
-
- def __reversed__(self):
- "Reverse iterate keys in cache including expired items."
- iterators = [reversed(shard) for shard in self._shards]
- return it.chain.from_iterable(reversed(iterators))
-
-
- def __len__(self):
- "Count of items in cache including expired items."
- return sum(len(shard) for shard in self._shards)
-
-
- def reset(self, key, value=ENOVAL):
- """Reset `key` and `value` item from Settings table.
-
- If `value` is not given, it is reloaded from the Settings
- table. Otherwise, the Settings table is updated.
-
- Settings attributes on cache objects are lazy-loaded and
- read-only. Use `reset` to update the value.
-
- Settings with the ``sqlite_`` prefix correspond to SQLite
- pragmas. Updating the value will execute the corresponding PRAGMA
- statement.
-
- :param str key: Settings key for item
- :param value: value for item (optional)
- :return: updated value for item
- :raises Timeout: if database timeout expires
-
- """
- for shard in self._shards:
- while True:
- try:
- result = shard.reset(key, value)
- except Timeout:
- pass
- else:
- break
- return result
-
-
- def deque(self, name):
- """Return Deque with given `name` in subdirectory.
-
- >>> cache = FanoutCache('/tmp/diskcache/fanoutcache')
- >>> deque = cache.deque('test')
- >>> deque.clear()
- >>> deque.extend('abc')
- >>> deque.popleft()
- 'a'
- >>> deque.pop()
- 'c'
- >>> len(deque)
- 1
-
- :param str name: subdirectory name for Deque
- :return: Deque with given name
-
- """
- _deques = self._deques
-
- try:
- return _deques[name]
- except KeyError:
- parts = name.split('/')
- directory = op.join(self._directory, 'deque', *parts)
- temp = Deque(directory=directory)
- _deques[name] = temp
- return temp
-
-
- def index(self, name):
- """Return Index with given `name` in subdirectory.
-
- >>> cache = FanoutCache('/tmp/diskcache/fanoutcache')
- >>> index = cache.index('test')
- >>> index.clear()
- >>> index['abc'] = 123
- >>> index['def'] = 456
- >>> index['ghi'] = 789
- >>> index.popitem()
- ('ghi', 789)
- >>> del index['abc']
- >>> len(index)
- 1
- >>> index['def']
- 456
-
- :param str name: subdirectory name for Index
- :return: Index with given name
-
- """
- _indexes = self._indexes
-
- try:
- return _indexes[name]
- except KeyError:
- parts = name.split('/')
- directory = op.join(self._directory, 'index', *parts)
- temp = Index(directory)
- _indexes[name] = temp
- return temp
|