Python module (submodule repositary), which provides content (video streams) from various online stream sources to corresponding Enigma2, Kodi, Plex plugins

fanout.py 17KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595
  1. "Fanout cache automatically shards keys and values."
  2. import itertools as it
  3. import os.path as op
  4. import sqlite3
  5. import time
  6. from .core import ENOVAL, DEFAULT_SETTINGS, Cache, Disk, Timeout
  7. from .memo import memoize
  8. from .persistent import Deque, Index
  9. class FanoutCache(object):
  10. "Cache that shards keys and values."
  11. def __init__(self, directory, shards=8, timeout=0.010, disk=Disk,
  12. **settings):
  13. """Initialize cache instance.
  14. :param str directory: cache directory
  15. :param int shards: number of shards to distribute writes
  16. :param float timeout: SQLite connection timeout
  17. :param disk: `Disk` instance for serialization
  18. :param settings: any of `DEFAULT_SETTINGS`
  19. """
  20. self._directory = directory
  21. self._count = shards
  22. default_size_limit = DEFAULT_SETTINGS['size_limit']
  23. size_limit = settings.pop('size_limit', default_size_limit) / shards
  24. self._shards = tuple(
  25. Cache(
  26. op.join(directory, '%03d' % num),
  27. timeout=timeout,
  28. disk=disk,
  29. size_limit=size_limit,
  30. **settings
  31. )
  32. for num in range(shards)
  33. )
  34. self._hash = self._shards[0].disk.hash
  35. self._deques = {}
  36. self._indexes = {}
  37. @property
  38. def directory(self):
  39. """Cache directory."""
  40. return self._directory
  41. def __getattr__(self, name):
  42. return getattr(self._shards[0], name)
  43. def set(self, key, value, expire=None, read=False, tag=None, retry=False):
  44. """Set `key` and `value` item in cache.
  45. When `read` is `True`, `value` should be a file-like object opened
  46. for reading in binary mode.
  47. If database timeout occurs then fails silently unless `retry` is set to
  48. `True` (default `False`).
  49. :param key: key for item
  50. :param value: value for item
  51. :param float expire: seconds until the key expires
  52. (default None, no expiry)
  53. :param bool read: read value as raw bytes from file (default False)
  54. :param str tag: text to associate with key (default None)
  55. :param bool retry: retry if database timeout expires (default False)
  56. :return: True if item was set
  57. """
  58. index = self._hash(key) % self._count
  59. set_func = self._shards[index].set
  60. while True:
  61. try:
  62. return set_func(key, value, expire, read, tag)
  63. except Timeout:
  64. if retry:
  65. continue
  66. else:
  67. return False
  68. def __setitem__(self, key, value):
  69. """Set `key` and `value` item in cache.
  70. Calls :func:`FanoutCache.set` internally with `retry` set to `True`.
  71. :param key: key for item
  72. :param value: value for item
  73. """
  74. self.set(key, value, retry=True)
  75. def add(self, key, value, expire=None, read=False, tag=None, retry=False):
  76. """Add `key` and `value` item to cache.
  77. Similar to `set`, but only add to cache if key not present.
  78. This operation is atomic. Only one concurrent add operation for given
  79. key from separate threads or processes will succeed.
  80. When `read` is `True`, `value` should be a file-like object opened
  81. for reading in binary mode.
  82. :param key: key for item
  83. :param value: value for item
  84. :param float expire: seconds until the key expires
  85. (default None, no expiry)
  86. :param bool read: read value as bytes from file (default False)
  87. :param str tag: text to associate with key (default None)
  88. :param bool retry: retry if database timeout expires (default False)
  89. :return: True if item was added
  90. """
  91. index = self._hash(key) % self._count
  92. add_func = self._shards[index].add
  93. while True:
  94. try:
  95. return add_func(key, value, expire, read, tag)
  96. except Timeout:
  97. if retry:
  98. continue
  99. else:
  100. return False
  101. def incr(self, key, delta=1, default=0, retry=False):
  102. """Increment value by delta for item with key.
  103. If key is missing and default is None then raise KeyError. Else if key
  104. is missing and default is not None then use default for value.
  105. Operation is atomic. All concurrent increment operations will be
  106. counted individually.
  107. Assumes value may be stored in a SQLite column. Most builds that target
  108. machines with 64-bit pointer widths will support 64-bit signed
  109. integers.
  110. :param key: key for item
  111. :param int delta: amount to increment (default 1)
  112. :param int default: value if key is missing (default 0)
  113. :param bool retry: retry if database timeout expires (default False)
  114. :return: new value for item on success else None
  115. :raises KeyError: if key is not found and default is None
  116. """
  117. index = self._hash(key) % self._count
  118. incr_func = self._shards[index].incr
  119. while True:
  120. try:
  121. return incr_func(key, delta, default)
  122. except Timeout:
  123. if retry:
  124. continue
  125. else:
  126. return None
  127. def decr(self, key, delta=1, default=0, retry=False):
  128. """Decrement value by delta for item with key.
  129. If key is missing and default is None then raise KeyError. Else if key
  130. is missing and default is not None then use default for value.
  131. Operation is atomic. All concurrent decrement operations will be
  132. counted individually.
  133. Unlike Memcached, negative values are supported. Value may be
  134. decremented below zero.
  135. Assumes value may be stored in a SQLite column. Most builds that target
  136. machines with 64-bit pointer widths will support 64-bit signed
  137. integers.
  138. :param key: key for item
  139. :param int delta: amount to decrement (default 1)
  140. :param int default: value if key is missing (default 0)
  141. :param bool retry: retry if database timeout expires (default False)
  142. :return: new value for item on success else None
  143. :raises KeyError: if key is not found and default is None
  144. """
  145. return self.incr(key, -delta, default, retry)
  146. def get(self, key, default=None, read=False, expire_time=False, tag=False,
  147. retry=False):
  148. """Retrieve value from cache. If `key` is missing, return `default`.
  149. If database timeout occurs then returns `default` unless `retry` is set
  150. to `True` (default `False`).
  151. :param key: key for item
  152. :param default: return value if key is missing (default None)
  153. :param bool read: if True, return file handle to value
  154. (default False)
  155. :param float expire_time: if True, return expire_time in tuple
  156. (default False)
  157. :param tag: if True, return tag in tuple (default False)
  158. :param bool retry: retry if database timeout expires (default False)
  159. :return: value for item if key is found else default
  160. """
  161. index = self._hash(key) % self._count
  162. get_func = self._shards[index].get
  163. while True:
  164. try:
  165. return get_func(
  166. key, default=default, read=read, expire_time=expire_time,
  167. tag=tag,
  168. )
  169. except (Timeout, sqlite3.OperationalError):
  170. if retry:
  171. continue
  172. else:
  173. return default
  174. def __getitem__(self, key):
  175. """Return corresponding value for `key` from cache.
  176. Calls :func:`FanoutCache.get` internally with `retry` set to `True`.
  177. :param key: key for item
  178. :return: value for item
  179. :raises KeyError: if key is not found
  180. """
  181. value = self.get(key, default=ENOVAL, retry=True)
  182. if value is ENOVAL:
  183. raise KeyError(key)
  184. return value
  185. def read(self, key):
  186. """Return file handle corresponding to `key` from cache.
  187. :param key: key for item
  188. :return: file open for reading in binary mode
  189. :raises KeyError: if key is not found
  190. """
  191. handle = self.get(key, default=ENOVAL, read=True, retry=True)
  192. if handle is ENOVAL:
  193. raise KeyError(key)
  194. return handle
  195. def __contains__(self, key):
  196. """Return `True` if `key` matching item is found in cache.
  197. :param key: key for item
  198. :return: True if key is found
  199. """
  200. index = self._hash(key) % self._count
  201. return key in self._shards[index]
  202. def pop(self, key, default=None, expire_time=False, tag=False,
  203. retry=False):
  204. """Remove corresponding item for `key` from cache and return value.
  205. If `key` is missing, return `default`.
  206. Operation is atomic. Concurrent operations will be serialized.
  207. :param key: key for item
  208. :param default: return value if key is missing (default None)
  209. :param float expire_time: if True, return expire_time in tuple
  210. (default False)
  211. :param tag: if True, return tag in tuple (default False)
  212. :param bool retry: retry if database timeout expires (default False)
  213. :return: value for item if key is found else default
  214. """
  215. index = self._hash(key) % self._count
  216. pop_func = self._shards[index].pop
  217. while True:
  218. try:
  219. return pop_func(
  220. key, default=default, expire_time=expire_time, tag=tag,
  221. )
  222. except Timeout:
  223. if retry:
  224. continue
  225. else:
  226. return default
  227. def delete(self, key, retry=False):
  228. """Delete corresponding item for `key` from cache.
  229. Missing keys are ignored.
  230. If database timeout occurs then fails silently unless `retry` is set to
  231. `True` (default `False`).
  232. :param key: key for item
  233. :param bool retry: retry if database timeout expires (default False)
  234. :return: True if item was deleted
  235. """
  236. index = self._hash(key) % self._count
  237. del_func = self._shards[index].__delitem__
  238. while True:
  239. try:
  240. return del_func(key)
  241. except Timeout:
  242. if retry:
  243. continue
  244. else:
  245. return False
  246. except KeyError:
  247. return False
  248. def __delitem__(self, key):
  249. """Delete corresponding item for `key` from cache.
  250. Calls :func:`FanoutCache.delete` internally with `retry` set to `True`.
  251. :param key: key for item
  252. :raises KeyError: if key is not found
  253. """
  254. deleted = self.delete(key, retry=True)
  255. if not deleted:
  256. raise KeyError(key)
  257. memoize = memoize
  258. def check(self, fix=False):
  259. """Check database and file system consistency.
  260. Intended for use in testing and post-mortem error analysis.
  261. While checking the cache table for consistency, a writer lock is held
  262. on the database. The lock blocks other cache clients from writing to
  263. the database. For caches with many file references, the lock may be
  264. held for a long time. For example, local benchmarking shows that a
  265. cache with 1,000 file references takes ~60ms to check.
  266. :param bool fix: correct inconsistencies
  267. :return: list of warnings
  268. :raises Timeout: if database timeout expires
  269. """
  270. return sum((shard.check(fix=fix) for shard in self._shards), [])
  271. def expire(self):
  272. """Remove expired items from cache.
  273. :return: count of items removed
  274. """
  275. return self._remove('expire', args=(time.time(),))
  276. def create_tag_index(self):
  277. """Create tag index on cache database.
  278. It is better to initialize cache with `tag_index=True` than use this.
  279. :raises Timeout: if database timeout expires
  280. """
  281. for shard in self._shards:
  282. shard.create_tag_index()
  283. def drop_tag_index(self):
  284. """Drop tag index on cache database.
  285. :raises Timeout: if database timeout expires
  286. """
  287. for shard in self._shards:
  288. shard.drop_tag_index()
  289. def evict(self, tag):
  290. """Remove items with matching `tag` from cache.
  291. :param str tag: tag identifying items
  292. :return: count of items removed
  293. """
  294. return self._remove('evict', args=(tag,))
  295. def cull(self):
  296. """Cull items from cache until volume is less than size limit.
  297. :return: count of items removed
  298. """
  299. return self._remove('cull')
  300. def clear(self):
  301. """Remove all items from cache.
  302. :return: count of items removed
  303. """
  304. return self._remove('clear')
  305. def _remove(self, name, args=()):
  306. total = 0
  307. for shard in self._shards:
  308. method = getattr(shard, name)
  309. while True:
  310. try:
  311. count = method(*args)
  312. total += count
  313. except Timeout as timeout:
  314. total += timeout.args[0]
  315. else:
  316. break
  317. return total
  318. def stats(self, enable=True, reset=False):
  319. """Return cache statistics hits and misses.
  320. :param bool enable: enable collecting statistics (default True)
  321. :param bool reset: reset hits and misses to 0 (default False)
  322. :return: (hits, misses)
  323. """
  324. results = [shard.stats(enable, reset) for shard in self._shards]
  325. return (sum(result[0] for result in results),
  326. sum(result[1] for result in results))
  327. def volume(self):
  328. """Return estimated total size of cache on disk.
  329. :return: size in bytes
  330. """
  331. return sum(shard.volume() for shard in self._shards)
  332. def close(self):
  333. "Close database connection."
  334. for shard in self._shards:
  335. shard.close()
  336. self._deques.clear()
  337. self._indexes.clear()
  338. def __enter__(self):
  339. return self
  340. def __exit__(self, *exception):
  341. self.close()
  342. def __getstate__(self):
  343. return (self._directory, self._count, self.timeout, type(self.disk))
  344. def __setstate__(self, state):
  345. self.__init__(*state)
  346. def __iter__(self):
  347. "Iterate keys in cache including expired items."
  348. iterators = [iter(shard) for shard in self._shards]
  349. return it.chain.from_iterable(iterators)
  350. def __reversed__(self):
  351. "Reverse iterate keys in cache including expired items."
  352. iterators = [reversed(shard) for shard in self._shards]
  353. return it.chain.from_iterable(reversed(iterators))
  354. def __len__(self):
  355. "Count of items in cache including expired items."
  356. return sum(len(shard) for shard in self._shards)
  357. def reset(self, key, value=ENOVAL):
  358. """Reset `key` and `value` item from Settings table.
  359. If `value` is not given, it is reloaded from the Settings
  360. table. Otherwise, the Settings table is updated.
  361. Settings attributes on cache objects are lazy-loaded and
  362. read-only. Use `reset` to update the value.
  363. Settings with the ``sqlite_`` prefix correspond to SQLite
  364. pragmas. Updating the value will execute the corresponding PRAGMA
  365. statement.
  366. :param str key: Settings key for item
  367. :param value: value for item (optional)
  368. :return: updated value for item
  369. :raises Timeout: if database timeout expires
  370. """
  371. for shard in self._shards:
  372. while True:
  373. try:
  374. result = shard.reset(key, value)
  375. except Timeout:
  376. pass
  377. else:
  378. break
  379. return result
  380. def deque(self, name):
  381. """Return Deque with given `name` in subdirectory.
  382. >>> cache = FanoutCache('/tmp/diskcache/fanoutcache')
  383. >>> deque = cache.deque('test')
  384. >>> deque.clear()
  385. >>> deque.extend('abc')
  386. >>> deque.popleft()
  387. 'a'
  388. >>> deque.pop()
  389. 'c'
  390. >>> len(deque)
  391. 1
  392. :param str name: subdirectory name for Deque
  393. :return: Deque with given name
  394. """
  395. _deques = self._deques
  396. try:
  397. return _deques[name]
  398. except KeyError:
  399. parts = name.split('/')
  400. directory = op.join(self._directory, 'deque', *parts)
  401. temp = Deque(directory=directory)
  402. _deques[name] = temp
  403. return temp
  404. def index(self, name):
  405. """Return Index with given `name` in subdirectory.
  406. >>> cache = FanoutCache('/tmp/diskcache/fanoutcache')
  407. >>> index = cache.index('test')
  408. >>> index.clear()
  409. >>> index['abc'] = 123
  410. >>> index['def'] = 456
  411. >>> index['ghi'] = 789
  412. >>> index.popitem()
  413. ('ghi', 789)
  414. >>> del index['abc']
  415. >>> len(index)
  416. 1
  417. >>> index['def']
  418. 456
  419. :param str name: subdirectory name for Index
  420. :return: Index with given name
  421. """
  422. _indexes = self._indexes
  423. try:
  424. return _indexes[name]
  425. except KeyError:
  426. parts = name.split('/')
  427. directory = op.join(self._directory, 'index', *parts)
  428. temp = Index(directory)
  429. _indexes[name] = temp
  430. return temp