Python module (submodule repositary), which provides content (video streams) from various online stream sources to corresponding Enigma2, Kodi, Plex plugins

core.py 61KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947
  1. """Core disk and file backed cache API.
  2. """
  3. import codecs
  4. import contextlib as cl
  5. import errno
  6. import functools as ft
  7. import io
  8. import os
  9. import os.path as op
  10. import pickletools
  11. import sqlite3
  12. import struct
  13. import sys
  14. import threading
  15. import time
  16. import warnings
  17. import zlib
  18. if sys.hexversion < 0x03000000:
  19. import cPickle as pickle # pylint: disable=import-error
  20. # ISSUE #25 Fix for http://bugs.python.org/issue10211
  21. from cStringIO import StringIO as BytesIO # pylint: disable=import-error
  22. TextType = unicode # pylint: disable=invalid-name,undefined-variable
  23. BytesType = str
  24. INT_TYPES = int, long # pylint: disable=undefined-variable
  25. range = xrange # pylint: disable=redefined-builtin,invalid-name,undefined-variable
  26. io_open = io.open # pylint: disable=invalid-name
  27. else:
  28. import pickle
  29. from io import BytesIO # pylint: disable=ungrouped-imports
  30. TextType = str
  31. BytesType = bytes
  32. INT_TYPES = (int,)
  33. io_open = open # pylint: disable=invalid-name
  34. try:
  35. WindowsError
  36. except NameError:
  37. class WindowsError(Exception):
  38. "Windows error place-holder on platforms without support."
  39. pass
  40. class Constant(tuple):
  41. "Pretty display of immutable constant."
  42. def __new__(cls, name):
  43. return tuple.__new__(cls, (name,))
  44. def __repr__(self):
  45. return '%s' % self[0]
  46. DBNAME = 'cache.db'
  47. ENOVAL = Constant('ENOVAL')
  48. UNKNOWN = Constant('UNKNOWN')
  49. MODE_NONE = 0
  50. MODE_RAW = 1
  51. MODE_BINARY = 2
  52. MODE_TEXT = 3
  53. MODE_PICKLE = 4
  54. DEFAULT_SETTINGS = {
  55. u'statistics': 0, # False
  56. u'tag_index': 0, # False
  57. u'eviction_policy': u'least-recently-stored',
  58. u'size_limit': 2 ** 30, # 1gb
  59. u'cull_limit': 10,
  60. u'sqlite_auto_vacuum': 1, # FULL
  61. u'sqlite_cache_size': 2 ** 13, # 8,192 pages
  62. u'sqlite_journal_mode': u'wal',
  63. u'sqlite_mmap_size': 2 ** 26, # 64mb
  64. u'sqlite_synchronous': 1, # NORMAL
  65. u'disk_min_file_size': 2 ** 15, # 32kb
  66. u'disk_pickle_protocol': pickle.HIGHEST_PROTOCOL,
  67. }
  68. METADATA = {
  69. u'count': 0,
  70. u'size': 0,
  71. u'hits': 0,
  72. u'misses': 0,
  73. }
  74. EVICTION_POLICY = {
  75. 'none': {
  76. 'init': None,
  77. 'get': None,
  78. 'cull': None,
  79. },
  80. 'least-recently-stored': {
  81. 'init': (
  82. 'CREATE INDEX IF NOT EXISTS Cache_store_time ON'
  83. ' Cache (store_time)'
  84. ),
  85. 'get': None,
  86. 'cull': 'SELECT {fields} FROM Cache ORDER BY store_time LIMIT ?',
  87. },
  88. 'least-recently-used': {
  89. 'init': (
  90. 'CREATE INDEX IF NOT EXISTS Cache_access_time ON'
  91. ' Cache (access_time)'
  92. ),
  93. 'get': 'access_time = {now}',
  94. 'cull': 'SELECT {fields} FROM Cache ORDER BY access_time LIMIT ?',
  95. },
  96. 'least-frequently-used': {
  97. 'init': (
  98. 'CREATE INDEX IF NOT EXISTS Cache_access_count ON'
  99. ' Cache (access_count)'
  100. ),
  101. 'get': 'access_count = access_count + 1',
  102. 'cull': 'SELECT {fields} FROM Cache ORDER BY access_count LIMIT ?',
  103. },
  104. }
  105. class Disk(object):
  106. "Cache key and value serialization for SQLite database and files."
  107. def __init__(self, directory, min_file_size=0, pickle_protocol=0):
  108. """Initialize disk instance.
  109. :param str directory: directory path
  110. :param int min_file_size: minimum size for file use
  111. :param int pickle_protocol: pickle protocol for serialization
  112. """
  113. self._directory = directory
  114. self.min_file_size = min_file_size
  115. self.pickle_protocol = pickle_protocol
  116. def hash(self, key):
  117. """Compute portable hash for `key`.
  118. :param key: key to hash
  119. :return: hash value
  120. """
  121. mask = 0xFFFFFFFF
  122. disk_key, _ = self.put(key)
  123. type_disk_key = type(disk_key)
  124. if type_disk_key is sqlite3.Binary:
  125. return zlib.adler32(disk_key) & mask
  126. elif type_disk_key is TextType:
  127. return zlib.adler32(disk_key.encode('utf-8')) & mask # pylint: disable=no-member
  128. elif type_disk_key in INT_TYPES:
  129. return disk_key % mask
  130. else:
  131. assert type_disk_key is float
  132. return zlib.adler32(struct.pack('!d', disk_key)) & mask
  133. def put(self, key):
  134. """Convert `key` to fields key and raw for Cache table.
  135. :param key: key to convert
  136. :return: (database key, raw boolean) pair
  137. """
  138. # pylint: disable=bad-continuation,unidiomatic-typecheck
  139. type_key = type(key)
  140. if type_key is BytesType:
  141. return sqlite3.Binary(key), True
  142. elif ((type_key is TextType)
  143. or (type_key in INT_TYPES
  144. and -9223372036854775808 <= key <= 9223372036854775807)
  145. or (type_key is float)):
  146. return key, True
  147. else:
  148. data = pickle.dumps(key, protocol=self.pickle_protocol)
  149. result = pickletools.optimize(data)
  150. return sqlite3.Binary(result), False
  151. def get(self, key, raw):
  152. """Convert fields `key` and `raw` from Cache table to key.
  153. :param key: database key to convert
  154. :param bool raw: flag indicating raw database storage
  155. :return: corresponding Python key
  156. """
  157. # pylint: disable=no-self-use,unidiomatic-typecheck
  158. if raw:
  159. return BytesType(key) if type(key) is sqlite3.Binary else key
  160. else:
  161. return pickle.load(BytesIO(key))
  162. def store(self, value, read, key=UNKNOWN):
  163. """Convert `value` to fields size, mode, filename, and value for Cache
  164. table.
  165. :param value: value to convert
  166. :param bool read: True when value is file-like object
  167. :param key: key for item (default UNKNOWN)
  168. :return: (size, mode, filename, value) tuple for Cache table
  169. """
  170. # pylint: disable=unidiomatic-typecheck
  171. type_value = type(value)
  172. min_file_size = self.min_file_size
  173. if ((type_value is TextType and len(value) < min_file_size)
  174. or (type_value in INT_TYPES
  175. and -9223372036854775808 <= value <= 9223372036854775807)
  176. or (type_value is float)):
  177. return 0, MODE_RAW, None, value
  178. elif type_value is BytesType:
  179. if len(value) < min_file_size:
  180. return 0, MODE_RAW, None, sqlite3.Binary(value)
  181. else:
  182. filename, full_path = self.filename(key, value)
  183. with open(full_path, 'wb') as writer:
  184. writer.write(value)
  185. return len(value), MODE_BINARY, filename, None
  186. elif type_value is TextType:
  187. filename, full_path = self.filename(key, value)
  188. with io_open(full_path, 'w', encoding='UTF-8') as writer:
  189. writer.write(value)
  190. size = op.getsize(full_path)
  191. return size, MODE_TEXT, filename, None
  192. elif read:
  193. size = 0
  194. reader = ft.partial(value.read, 2 ** 22)
  195. filename, full_path = self.filename(key, value)
  196. with open(full_path, 'wb') as writer:
  197. for chunk in iter(reader, b''):
  198. size += len(chunk)
  199. writer.write(chunk)
  200. return size, MODE_BINARY, filename, None
  201. else:
  202. result = pickle.dumps(value, protocol=self.pickle_protocol)
  203. if len(result) < min_file_size:
  204. return 0, MODE_PICKLE, None, sqlite3.Binary(result)
  205. else:
  206. filename, full_path = self.filename(key, value)
  207. with open(full_path, 'wb') as writer:
  208. writer.write(result)
  209. return len(result), MODE_PICKLE, filename, None
  210. def fetch(self, mode, filename, value, read):
  211. """Convert fields `mode`, `filename`, and `value` from Cache table to
  212. value.
  213. :param int mode: value mode raw, binary, text, or pickle
  214. :param str filename: filename of corresponding value
  215. :param value: database value
  216. :param bool read: when True, return an open file handle
  217. :return: corresponding Python value
  218. """
  219. # pylint: disable=no-self-use,unidiomatic-typecheck
  220. if mode == MODE_RAW:
  221. return BytesType(value) if type(value) is sqlite3.Binary else value
  222. elif mode == MODE_BINARY:
  223. if read:
  224. return open(op.join(self._directory, filename), 'rb')
  225. else:
  226. with open(op.join(self._directory, filename), 'rb') as reader:
  227. return reader.read()
  228. elif mode == MODE_TEXT:
  229. full_path = op.join(self._directory, filename)
  230. with io_open(full_path, 'r', encoding='UTF-8') as reader:
  231. return reader.read()
  232. elif mode == MODE_PICKLE:
  233. if value is None:
  234. with open(op.join(self._directory, filename), 'rb') as reader:
  235. return pickle.load(reader)
  236. else:
  237. return pickle.load(BytesIO(value))
  238. def filename(self, key=UNKNOWN, value=UNKNOWN):
  239. """Return filename and full-path tuple for file storage.
  240. Filename will be a randomly generated 28 character hexadecimal string
  241. with ".val" suffixed. Two levels of sub-directories will be used to
  242. reduce the size of directories. On older filesystems, lookups in
  243. directories with many files may be slow.
  244. The default implementation ignores the `key` and `value` parameters.
  245. In some scenarios, for example :meth:`Cache.push
  246. <diskcache.Cache.push>`, the `key` or `value` may not be known when the
  247. item is stored in the cache.
  248. :param key: key for item (default UNKNOWN)
  249. :param value: value for item (default UNKNOWN)
  250. """
  251. # pylint: disable=unused-argument
  252. hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
  253. sub_dir = op.join(hex_name[:2], hex_name[2:4])
  254. name = hex_name[4:] + '.val'
  255. directory = op.join(self._directory, sub_dir)
  256. try:
  257. os.makedirs(directory)
  258. except OSError as error:
  259. if error.errno != errno.EEXIST:
  260. raise
  261. filename = op.join(sub_dir, name)
  262. full_path = op.join(self._directory, filename)
  263. return filename, full_path
  264. def remove(self, filename):
  265. """Remove a file given by `filename`.
  266. This method is cross-thread and cross-process safe. If an "error no
  267. entry" occurs, it is suppressed.
  268. :param str filename: relative path to file
  269. """
  270. full_path = op.join(self._directory, filename)
  271. try:
  272. os.remove(full_path)
  273. except WindowsError:
  274. pass
  275. except OSError as error:
  276. if error.errno != errno.ENOENT:
  277. # ENOENT may occur if two caches attempt to delete the same
  278. # file at the same time.
  279. raise
  280. class Timeout(Exception):
  281. "Database timeout expired."
  282. pass
  283. class UnknownFileWarning(UserWarning):
  284. "Warning used by Cache.check for unknown files."
  285. pass
  286. class EmptyDirWarning(UserWarning):
  287. "Warning used by Cache.check for empty directories."
  288. pass
  289. class Cache(object):
  290. "Disk and file backed cache."
  291. # pylint: disable=bad-continuation
  292. def __init__(self, directory, timeout=60, disk=Disk, **settings):
  293. """Initialize cache instance.
  294. :param str directory: cache directory
  295. :param float timeout: SQLite connection timeout
  296. :param disk: Disk type or subclass for serialization
  297. :param settings: any of DEFAULT_SETTINGS
  298. """
  299. try:
  300. assert issubclass(disk, Disk)
  301. except (TypeError, AssertionError):
  302. raise ValueError('disk must subclass diskcache.Disk')
  303. self._directory = directory
  304. self._timeout = 0 # Manually handle retries during initialization.
  305. self._local = threading.local()
  306. if not op.isdir(directory):
  307. try:
  308. os.makedirs(directory, 0o755)
  309. except OSError as error:
  310. if error.errno != errno.EEXIST:
  311. raise EnvironmentError(
  312. error.errno,
  313. 'Cache directory "%s" does not exist'
  314. ' and could not be created' % self._directory
  315. )
  316. sql = self._sql_retry
  317. # Setup Settings table.
  318. try:
  319. current_settings = dict(sql(
  320. 'SELECT key, value FROM Settings'
  321. ).fetchall())
  322. except sqlite3.OperationalError:
  323. current_settings = {}
  324. sets = DEFAULT_SETTINGS.copy()
  325. sets.update(current_settings)
  326. sets.update(settings)
  327. for key in METADATA:
  328. sets.pop(key, None)
  329. # Chance to set pragmas before any tables are created.
  330. for key, value in sorted(sets.items()):
  331. if key.startswith('sqlite_'):
  332. self.reset(key, value, update=False)
  333. sql('CREATE TABLE IF NOT EXISTS Settings ('
  334. ' key TEXT NOT NULL UNIQUE,'
  335. ' value)'
  336. )
  337. # Setup Disk object (must happen after settings initialized).
  338. kwargs = {
  339. key[5:]: value for key, value in sets.items()
  340. if key.startswith('disk_')
  341. }
  342. self._disk = disk(directory, **kwargs)
  343. # Set cached attributes: updates settings and sets pragmas.
  344. for key, value in sets.items():
  345. query = 'INSERT OR REPLACE INTO Settings VALUES (?, ?)'
  346. sql(query, (key, value))
  347. self.reset(key, value)
  348. for key, value in METADATA.items():
  349. query = 'INSERT OR IGNORE INTO Settings VALUES (?, ?)'
  350. sql(query, (key, value))
  351. self.reset(key)
  352. (self._page_size,), = sql('PRAGMA page_size').fetchall()
  353. # Setup Cache table.
  354. sql('CREATE TABLE IF NOT EXISTS Cache ('
  355. ' rowid INTEGER PRIMARY KEY,'
  356. ' key BLOB,'
  357. ' raw INTEGER,'
  358. ' store_time REAL,'
  359. ' expire_time REAL,'
  360. ' access_time REAL,'
  361. ' access_count INTEGER DEFAULT 0,'
  362. ' tag BLOB,'
  363. ' size INTEGER DEFAULT 0,'
  364. ' mode INTEGER DEFAULT 0,'
  365. ' filename TEXT,'
  366. ' value BLOB)'
  367. )
  368. sql('CREATE UNIQUE INDEX IF NOT EXISTS Cache_key_raw ON'
  369. ' Cache(key, raw)'
  370. )
  371. sql('CREATE INDEX IF NOT EXISTS Cache_expire_time ON'
  372. ' Cache (expire_time)'
  373. )
  374. query = EVICTION_POLICY[self.eviction_policy]['init']
  375. if query is not None:
  376. sql(query)
  377. # Use triggers to keep Metadata updated.
  378. sql('CREATE TRIGGER IF NOT EXISTS Settings_count_insert'
  379. ' AFTER INSERT ON Cache FOR EACH ROW BEGIN'
  380. ' UPDATE Settings SET value = value + 1'
  381. ' WHERE key = "count"; END'
  382. )
  383. sql('CREATE TRIGGER IF NOT EXISTS Settings_count_delete'
  384. ' AFTER DELETE ON Cache FOR EACH ROW BEGIN'
  385. ' UPDATE Settings SET value = value - 1'
  386. ' WHERE key = "count"; END'
  387. )
  388. sql('CREATE TRIGGER IF NOT EXISTS Settings_size_insert'
  389. ' AFTER INSERT ON Cache FOR EACH ROW BEGIN'
  390. ' UPDATE Settings SET value = value + NEW.size'
  391. ' WHERE key = "size"; END'
  392. )
  393. sql('CREATE TRIGGER IF NOT EXISTS Settings_size_update'
  394. ' AFTER UPDATE ON Cache FOR EACH ROW BEGIN'
  395. ' UPDATE Settings'
  396. ' SET value = value + NEW.size - OLD.size'
  397. ' WHERE key = "size"; END'
  398. )
  399. sql('CREATE TRIGGER IF NOT EXISTS Settings_size_delete'
  400. ' AFTER DELETE ON Cache FOR EACH ROW BEGIN'
  401. ' UPDATE Settings SET value = value - OLD.size'
  402. ' WHERE key = "size"; END'
  403. )
  404. # Create tag index if requested.
  405. if self.tag_index: # pylint: disable=no-member
  406. self.create_tag_index()
  407. else:
  408. self.drop_tag_index()
  409. # Close and re-open database connection with given timeout.
  410. self.close()
  411. self._timeout = timeout
  412. self._sql # pylint: disable=pointless-statement
  413. @property
  414. def directory(self):
  415. """Cache directory."""
  416. return self._directory
  417. @property
  418. def timeout(self):
  419. """SQLite connection timeout value in seconds."""
  420. return self._timeout
  421. @property
  422. def disk(self):
  423. """Disk used for serialization."""
  424. return self._disk
  425. @property
  426. def _con(self):
  427. # Check process ID to support process forking. If the process
  428. # ID changes, close the connection and update the process ID.
  429. local_pid = getattr(self._local, 'pid', None)
  430. pid = os.getpid()
  431. if local_pid != pid:
  432. self.close()
  433. self._local.pid = pid
  434. con = getattr(self._local, 'con', None)
  435. if con is None:
  436. con = self._local.con = sqlite3.connect(
  437. op.join(self._directory, DBNAME),
  438. timeout=self._timeout,
  439. isolation_level=None,
  440. )
  441. # Some SQLite pragmas work on a per-connection basis so
  442. # query the Settings table and reset the pragmas. The
  443. # Settings table may not exist so catch and ignore the
  444. # OperationalError that may occur.
  445. try:
  446. select = 'SELECT key, value FROM Settings'
  447. settings = con.execute(select).fetchall()
  448. except sqlite3.OperationalError:
  449. pass
  450. else:
  451. for key, value in settings:
  452. if key.startswith('sqlite_'):
  453. self.reset(key, value, update=False)
  454. return con
  455. @property
  456. def _sql(self):
  457. return self._con.execute
  458. @property
  459. def _sql_retry(self):
  460. sql = self._sql
  461. # 2018-11-01 GrantJ - Some SQLite builds/versions handle
  462. # the SQLITE_BUSY return value and connection parameter
  463. # "timeout" differently. For a more reliable duration,
  464. # manually retry the statement for 60 seconds. Only used
  465. # by statements which modify the database and do not use
  466. # a transaction (like those in ``__init__`` or ``reset``).
  467. # See Issue #85 for and tests/issue_85.py for more details.
  468. def _execute_with_retry(statement, *args, **kwargs):
  469. start = time.time()
  470. while True:
  471. try:
  472. return sql(statement, *args, **kwargs)
  473. except sqlite3.OperationalError as exc:
  474. if str(exc) != 'database is locked':
  475. raise
  476. diff = time.time() - start
  477. if diff > 60:
  478. raise
  479. time.sleep(0.001)
  480. return _execute_with_retry
  481. @cl.contextmanager
  482. def _transact(self, filename=None):
  483. sql = self._sql
  484. filenames = []
  485. _disk_remove = self._disk.remove
  486. try:
  487. sql('BEGIN IMMEDIATE')
  488. except sqlite3.OperationalError:
  489. if filename is not None:
  490. _disk_remove(filename)
  491. raise Timeout
  492. try:
  493. yield sql, filenames.append
  494. except BaseException:
  495. sql('ROLLBACK')
  496. raise
  497. else:
  498. sql('COMMIT')
  499. for name in filenames:
  500. if name is not None:
  501. _disk_remove(name)
  502. def set(self, key, value, expire=None, read=False, tag=None):
  503. """Set `key` and `value` item in cache.
  504. When `read` is `True`, `value` should be a file-like object opened
  505. for reading in binary mode.
  506. :param key: key for item
  507. :param value: value for item
  508. :param float expire: seconds until item expires
  509. (default None, no expiry)
  510. :param bool read: read value as bytes from file (default False)
  511. :param str tag: text to associate with key (default None)
  512. :return: True if item was set
  513. :raises Timeout: if database timeout expires
  514. """
  515. now = time.time()
  516. db_key, raw = self._disk.put(key)
  517. expire_time = None if expire is None else now + expire
  518. size, mode, filename, db_value = self._disk.store(value, read, key=key)
  519. columns = (expire_time, tag, size, mode, filename, db_value)
  520. # The order of SELECT, UPDATE, and INSERT is important below.
  521. #
  522. # Typical cache usage pattern is:
  523. #
  524. # value = cache.get(key)
  525. # if value is None:
  526. # value = expensive_calculation()
  527. # cache.set(key, value)
  528. #
  529. # Cache.get does not evict expired keys to avoid writes during lookups.
  530. # Commonly used/expired keys will therefore remain in the cache making
  531. # an UPDATE the preferred path.
  532. #
  533. # The alternative is to assume the key is not present by first trying
  534. # to INSERT and then handling the IntegrityError that occurs from
  535. # violating the UNIQUE constraint. This optimistic approach was
  536. # rejected based on the common cache usage pattern.
  537. #
  538. # INSERT OR REPLACE aka UPSERT is not used because the old filename may
  539. # need cleanup.
  540. with self._transact(filename) as (sql, cleanup):
  541. rows = sql(
  542. 'SELECT rowid, filename FROM Cache'
  543. ' WHERE key = ? AND raw = ?',
  544. (db_key, raw),
  545. ).fetchall()
  546. if rows:
  547. (rowid, old_filename), = rows
  548. cleanup(old_filename)
  549. self._row_update(rowid, now, columns)
  550. else:
  551. self._row_insert(db_key, raw, now, columns)
  552. self._cull(now, sql, cleanup)
  553. return True
  554. __setitem__ = set
  555. def _row_update(self, rowid, now, columns):
  556. sql = self._sql
  557. expire_time, tag, size, mode, filename, value = columns
  558. sql('UPDATE Cache SET'
  559. ' store_time = ?,'
  560. ' expire_time = ?,'
  561. ' access_time = ?,'
  562. ' access_count = ?,'
  563. ' tag = ?,'
  564. ' size = ?,'
  565. ' mode = ?,'
  566. ' filename = ?,'
  567. ' value = ?'
  568. ' WHERE rowid = ?', (
  569. now, # store_time
  570. expire_time,
  571. now, # access_time
  572. 0, # access_count
  573. tag,
  574. size,
  575. mode,
  576. filename,
  577. value,
  578. rowid,
  579. ),
  580. )
  581. def _row_insert(self, key, raw, now, columns):
  582. sql = self._sql
  583. expire_time, tag, size, mode, filename, value = columns
  584. sql('INSERT INTO Cache('
  585. ' key, raw, store_time, expire_time, access_time,'
  586. ' access_count, tag, size, mode, filename, value'
  587. ') VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (
  588. key,
  589. raw,
  590. now, # store_time
  591. expire_time,
  592. now, # access_time
  593. 0, # access_count
  594. tag,
  595. size,
  596. mode,
  597. filename,
  598. value,
  599. ),
  600. )
  601. def _cull(self, now, sql, cleanup, limit=None):
  602. cull_limit = self.cull_limit if limit is None else limit
  603. if cull_limit == 0:
  604. return
  605. # Evict expired keys.
  606. select_expired_template = (
  607. 'SELECT %s FROM Cache'
  608. ' WHERE expire_time IS NOT NULL AND expire_time < ?'
  609. ' ORDER BY expire_time LIMIT ?'
  610. )
  611. select_expired = select_expired_template % 'filename'
  612. rows = sql(select_expired, (now, cull_limit)).fetchall()
  613. if rows:
  614. delete_expired = (
  615. 'DELETE FROM Cache WHERE rowid IN (%s)'
  616. % (select_expired_template % 'rowid')
  617. )
  618. sql(delete_expired, (now, cull_limit))
  619. for filename, in rows:
  620. cleanup(filename)
  621. cull_limit -= len(rows)
  622. if cull_limit == 0:
  623. return
  624. # Evict keys by policy.
  625. select_policy = EVICTION_POLICY[self.eviction_policy]['cull']
  626. if select_policy is None or self.volume() < self.size_limit:
  627. return
  628. select_filename = select_policy.format(fields='filename', now=now)
  629. rows = sql(select_filename, (cull_limit,)).fetchall()
  630. if rows:
  631. delete = (
  632. 'DELETE FROM Cache WHERE rowid IN (%s)'
  633. % (select_policy.format(fields='rowid', now=now))
  634. )
  635. sql(delete, (cull_limit,))
  636. for filename, in rows:
  637. cleanup(filename)
  638. def add(self, key, value, expire=None, read=False, tag=None):
  639. """Add `key` and `value` item to cache.
  640. Similar to `set`, but only add to cache if key not present.
  641. Operation is atomic. Only one concurrent add operation for a given key
  642. will succeed.
  643. When `read` is `True`, `value` should be a file-like object opened
  644. for reading in binary mode.
  645. :param key: key for item
  646. :param value: value for item
  647. :param float expire: seconds until the key expires
  648. (default None, no expiry)
  649. :param bool read: read value as bytes from file (default False)
  650. :param str tag: text to associate with key (default None)
  651. :return: True if item was added
  652. :raises Timeout: if database timeout expires
  653. """
  654. now = time.time()
  655. db_key, raw = self._disk.put(key)
  656. expire_time = None if expire is None else now + expire
  657. size, mode, filename, db_value = self._disk.store(value, read, key=key)
  658. columns = (expire_time, tag, size, mode, filename, db_value)
  659. with self._transact(filename) as (sql, cleanup):
  660. rows = sql(
  661. 'SELECT rowid, filename, expire_time FROM Cache'
  662. ' WHERE key = ? AND raw = ?',
  663. (db_key, raw),
  664. ).fetchall()
  665. if rows:
  666. (rowid, old_filename, old_expire_time), = rows
  667. if old_expire_time is None or old_expire_time > now:
  668. cleanup(filename)
  669. return False
  670. cleanup(old_filename)
  671. self._row_update(rowid, now, columns)
  672. else:
  673. self._row_insert(db_key, raw, now, columns)
  674. self._cull(now, sql, cleanup)
  675. return True
  676. def incr(self, key, delta=1, default=0):
  677. """Increment value by delta for item with key.
  678. If key is missing and default is None then raise KeyError. Else if key
  679. is missing and default is not None then use default for value.
  680. Operation is atomic. All concurrent increment operations will be
  681. counted individually.
  682. Assumes value may be stored in a SQLite column. Most builds that target
  683. machines with 64-bit pointer widths will support 64-bit signed
  684. integers.
  685. :param key: key for item
  686. :param int delta: amount to increment (default 1)
  687. :param int default: value if key is missing (default None)
  688. :return: new value for item
  689. :raises KeyError: if key is not found and default is None
  690. :raises Timeout: if database timeout expires
  691. """
  692. now = time.time()
  693. db_key, raw = self._disk.put(key)
  694. select = (
  695. 'SELECT rowid, expire_time, filename, value FROM Cache'
  696. ' WHERE key = ? AND raw = ?'
  697. )
  698. with self._transact() as (sql, cleanup):
  699. rows = sql(select, (db_key, raw)).fetchall()
  700. if not rows:
  701. if default is None:
  702. raise KeyError(key)
  703. value = default + delta
  704. columns = (None, None) + self._disk.store(value, False, key=key)
  705. self._row_insert(db_key, raw, now, columns)
  706. self._cull(now, sql, cleanup)
  707. return value
  708. (rowid, expire_time, filename, value), = rows
  709. if expire_time is not None and expire_time < now:
  710. if default is None:
  711. raise KeyError(key)
  712. value = default + delta
  713. columns = (None, None) + self._disk.store(value, False, key=key)
  714. self._row_update(rowid, now, columns)
  715. self._cull(now, sql, cleanup)
  716. cleanup(filename)
  717. return value
  718. value += delta
  719. columns = 'store_time = ?, value = ?'
  720. update_column = EVICTION_POLICY[self.eviction_policy]['get']
  721. if update_column is not None:
  722. columns += ', ' + update_column.format(now=now)
  723. update = 'UPDATE Cache SET %s WHERE rowid = ?' % columns
  724. sql(update, (now, value, rowid))
  725. return value
  726. def decr(self, key, delta=1, default=0):
  727. """Decrement value by delta for item with key.
  728. If key is missing and default is None then raise KeyError. Else if key
  729. is missing and default is not None then use default for value.
  730. Operation is atomic. All concurrent decrement operations will be
  731. counted individually.
  732. Unlike Memcached, negative values are supported. Value may be
  733. decremented below zero.
  734. Assumes value may be stored in a SQLite column. Most builds that target
  735. machines with 64-bit pointer widths will support 64-bit signed
  736. integers.
  737. :param key: key for item
  738. :param int delta: amount to decrement (default 1)
  739. :param int default: value if key is missing (default 0)
  740. :return: new value for item
  741. :raises KeyError: if key is not found and default is None
  742. :raises Timeout: if database timeout expires
  743. """
  744. return self.incr(key, -delta, default)
  745. def get(self, key, default=None, read=False, expire_time=False, tag=False):
  746. """Retrieve value from cache. If `key` is missing, return `default`.
  747. :param key: key for item
  748. :param default: value to return if key is missing (default None)
  749. :param bool read: if True, return file handle to value
  750. (default False)
  751. :param bool expire_time: if True, return expire_time in tuple
  752. (default False)
  753. :param bool tag: if True, return tag in tuple (default False)
  754. :return: value for item or default if key not found
  755. :raises Timeout: if database timeout expires
  756. """
  757. db_key, raw = self._disk.put(key)
  758. update_column = EVICTION_POLICY[self.eviction_policy]['get']
  759. select = (
  760. 'SELECT rowid, expire_time, tag, mode, filename, value'
  761. ' FROM Cache WHERE key = ? AND raw = ?'
  762. ' AND (expire_time IS NULL OR expire_time > ?)'
  763. )
  764. if expire_time and tag:
  765. default = (default, None, None)
  766. elif expire_time or tag:
  767. default = (default, None)
  768. if not self.statistics and update_column is None:
  769. # Fast path, no transaction necessary.
  770. rows = self._sql(select, (db_key, raw, time.time())).fetchall()
  771. if not rows:
  772. return default
  773. (rowid, db_expire_time, db_tag, mode, filename, db_value), = rows
  774. try:
  775. value = self._disk.fetch(mode, filename, db_value, read)
  776. except IOError:
  777. # Key was deleted before we could retrieve result.
  778. return default
  779. else: # Slow path, transaction required.
  780. cache_hit = (
  781. 'UPDATE Settings SET value = value + 1 WHERE key = "hits"'
  782. )
  783. cache_miss = (
  784. 'UPDATE Settings SET value = value + 1 WHERE key = "misses"'
  785. )
  786. with self._transact() as (sql, _):
  787. rows = sql(select, (db_key, raw, time.time())).fetchall()
  788. if not rows:
  789. if self.statistics:
  790. sql(cache_miss)
  791. return default
  792. (rowid, db_expire_time, db_tag,
  793. mode, filename, db_value), = rows
  794. try:
  795. value = self._disk.fetch(mode, filename, db_value, read)
  796. except IOError as error:
  797. if error.errno == errno.ENOENT:
  798. # Key was deleted before we could retrieve result.
  799. if self.statistics:
  800. sql(cache_miss)
  801. return default
  802. else:
  803. raise
  804. if self.statistics:
  805. sql(cache_hit)
  806. now = time.time()
  807. update = 'UPDATE Cache SET %s WHERE rowid = ?'
  808. if update_column is not None:
  809. sql(update % update_column.format(now=now), (rowid,))
  810. if expire_time and tag:
  811. return (value, db_expire_time, db_tag)
  812. elif expire_time:
  813. return (value, db_expire_time)
  814. elif tag:
  815. return (value, db_tag)
  816. else:
  817. return value
  818. def __getitem__(self, key):
  819. """Return corresponding value for `key` from cache.
  820. :param key: key matching item
  821. :return: corresponding value
  822. :raises KeyError: if key is not found
  823. :raises Timeout: if database timeout expires
  824. """
  825. value = self.get(key, default=ENOVAL)
  826. if value is ENOVAL:
  827. raise KeyError(key)
  828. return value
  829. def read(self, key):
  830. """Return file handle value corresponding to `key` from cache.
  831. :param key: key matching item
  832. :return: file open for reading in binary mode
  833. :raises KeyError: if key is not found
  834. :raises Timeout: if database timeout expires
  835. """
  836. handle = self.get(key, default=ENOVAL, read=True)
  837. if handle is ENOVAL:
  838. raise KeyError(key)
  839. return handle
  840. def __contains__(self, key):
  841. """Return `True` if `key` matching item is found in cache.
  842. :param key: key matching item
  843. :return: True if key matching item
  844. """
  845. sql = self._sql
  846. db_key, raw = self._disk.put(key)
  847. select = (
  848. 'SELECT rowid FROM Cache'
  849. ' WHERE key = ? AND raw = ?'
  850. ' AND (expire_time IS NULL OR expire_time > ?)'
  851. )
  852. rows = sql(select, (db_key, raw, time.time())).fetchall()
  853. return bool(rows)
  854. def pop(self, key, default=None, expire_time=False, tag=False):
  855. """Remove corresponding item for `key` from cache and return value.
  856. If `key` is missing, return `default`.
  857. Operation is atomic. Concurrent operations will be serialized.
  858. :param key: key for item
  859. :param default: value to return if key is missing (default None)
  860. :param bool expire_time: if True, return expire_time in tuple
  861. (default False)
  862. :param bool tag: if True, return tag in tuple (default False)
  863. :return: value for item or default if key not found
  864. :raises Timeout: if database timeout expires
  865. """
  866. db_key, raw = self._disk.put(key)
  867. select = (
  868. 'SELECT rowid, expire_time, tag, mode, filename, value'
  869. ' FROM Cache WHERE key = ? AND raw = ?'
  870. ' AND (expire_time IS NULL OR expire_time > ?)'
  871. )
  872. if expire_time and tag:
  873. default = default, None, None
  874. elif expire_time or tag:
  875. default = default, None
  876. with self._transact() as (sql, _):
  877. rows = sql(select, (db_key, raw, time.time())).fetchall()
  878. if not rows:
  879. return default
  880. (rowid, db_expire_time, db_tag, mode, filename, db_value), = rows
  881. sql('DELETE FROM Cache WHERE rowid = ?', (rowid,))
  882. try:
  883. value = self._disk.fetch(mode, filename, db_value, False)
  884. except IOError as error:
  885. if error.errno == errno.ENOENT:
  886. # Key was deleted before we could retrieve result.
  887. return default
  888. else:
  889. raise
  890. finally:
  891. if filename is not None:
  892. self._disk.remove(filename)
  893. if expire_time and tag:
  894. return value, db_expire_time, db_tag
  895. elif expire_time:
  896. return value, db_expire_time
  897. elif tag:
  898. return value, db_tag
  899. else:
  900. return value
  901. def __delitem__(self, key):
  902. """Delete corresponding item for `key` from cache.
  903. :param key: key matching item
  904. :raises KeyError: if key is not found
  905. :raises Timeout: if database timeout expires
  906. """
  907. db_key, raw = self._disk.put(key)
  908. with self._transact() as (sql, cleanup):
  909. rows = sql(
  910. 'SELECT rowid, filename FROM Cache'
  911. ' WHERE key = ? AND raw = ?'
  912. ' AND (expire_time IS NULL OR expire_time > ?)',
  913. (db_key, raw, time.time()),
  914. ).fetchall()
  915. if not rows:
  916. raise KeyError(key)
  917. (rowid, filename), = rows
  918. sql('DELETE FROM Cache WHERE rowid = ?', (rowid,))
  919. cleanup(filename)
  920. return True
  921. def delete(self, key):
  922. """Delete corresponding item for `key` from cache.
  923. Missing keys are ignored.
  924. :param key: key matching item
  925. :return: True if item was deleted
  926. :raises Timeout: if database timeout expires
  927. """
  928. try:
  929. return self.__delitem__(key)
  930. except KeyError:
  931. return False
  932. def push(self, value, prefix=None, side='back', expire=None, read=False,
  933. tag=None):
  934. """Push `value` onto `side` of queue identified by `prefix` in cache.
  935. When prefix is None, integer keys are used. Otherwise, string keys are
  936. used in the format "prefix-integer". Integer starts at 500 trillion.
  937. Defaults to pushing value on back of queue. Set side to 'front' to push
  938. value on front of queue. Side must be one of 'back' or 'front'.
  939. Operation is atomic. Concurrent operations will be serialized.
  940. When `read` is `True`, `value` should be a file-like object opened
  941. for reading in binary mode.
  942. See also `Cache.pull`.
  943. >>> cache = Cache('/tmp/test')
  944. >>> _ = cache.clear()
  945. >>> print(cache.push('first value'))
  946. 500000000000000
  947. >>> cache.get(500000000000000)
  948. 'first value'
  949. >>> print(cache.push('second value'))
  950. 500000000000001
  951. >>> print(cache.push('third value', side='front'))
  952. 499999999999999
  953. >>> cache.push(1234, prefix='userids')
  954. 'userids-500000000000000'
  955. :param value: value for item
  956. :param str prefix: key prefix (default None, key is integer)
  957. :param str side: either 'back' or 'front' (default 'back')
  958. :param float expire: seconds until the key expires
  959. (default None, no expiry)
  960. :param bool read: read value as bytes from file (default False)
  961. :param str tag: text to associate with key (default None)
  962. :return: key for item in cache
  963. :raises Timeout: if database timeout expires
  964. """
  965. if prefix is None:
  966. min_key = 0
  967. max_key = 999999999999999
  968. else:
  969. min_key = prefix + '-000000000000000'
  970. max_key = prefix + '-999999999999999'
  971. now = time.time()
  972. raw = True
  973. expire_time = None if expire is None else now + expire
  974. size, mode, filename, db_value = self._disk.store(value, read)
  975. columns = (expire_time, tag, size, mode, filename, db_value)
  976. order = {'back': 'DESC', 'front': 'ASC'}
  977. select = (
  978. 'SELECT key FROM Cache'
  979. ' WHERE ? < key AND key < ? AND raw = ?'
  980. ' ORDER BY key %s LIMIT 1'
  981. ) % order[side]
  982. with self._transact(filename) as (sql, cleanup):
  983. rows = sql(select, (min_key, max_key, raw)).fetchall()
  984. if rows:
  985. (key,), = rows
  986. if prefix is not None:
  987. num = int(key[(key.rfind('-') + 1):])
  988. else:
  989. num = key
  990. if side == 'back':
  991. num += 1
  992. else:
  993. assert side == 'front'
  994. num -= 1
  995. else:
  996. num = 500000000000000
  997. if prefix is not None:
  998. db_key = '{0}-{1:015d}'.format(prefix, num)
  999. else:
  1000. db_key = num
  1001. self._row_insert(db_key, raw, now, columns)
  1002. self._cull(now, sql, cleanup)
  1003. return db_key
  1004. def pull(self, prefix=None, default=(None, None), side='front',
  1005. expire_time=False, tag=False):
  1006. """Pull key and value item pair from `side` of queue in cache.
  1007. When prefix is None, integer keys are used. Otherwise, string keys are
  1008. used in the format "prefix-integer". Integer starts at 500 trillion.
  1009. If queue is empty, return default.
  1010. Defaults to pulling key and value item pairs from front of queue. Set
  1011. side to 'back' to pull from back of queue. Side must be one of 'front'
  1012. or 'back'.
  1013. Operation is atomic. Concurrent operations will be serialized.
  1014. See also `Cache.push` and `Cache.get`.
  1015. >>> cache = Cache('/tmp/test')
  1016. >>> _ = cache.clear()
  1017. >>> cache.pull()
  1018. (None, None)
  1019. >>> for letter in 'abc':
  1020. ... print(cache.push(letter))
  1021. 500000000000000
  1022. 500000000000001
  1023. 500000000000002
  1024. >>> key, value = cache.pull()
  1025. >>> print(key)
  1026. 500000000000000
  1027. >>> value
  1028. 'a'
  1029. >>> _, value = cache.pull(side='back')
  1030. >>> value
  1031. 'c'
  1032. >>> cache.push(1234, 'userids')
  1033. 'userids-500000000000000'
  1034. >>> _, value = cache.pull('userids')
  1035. >>> value
  1036. 1234
  1037. :param str prefix: key prefix (default None, key is integer)
  1038. :param default: value to return if key is missing
  1039. (default (None, None))
  1040. :param str side: either 'front' or 'back' (default 'front')
  1041. :param bool expire_time: if True, return expire_time in tuple
  1042. (default False)
  1043. :param bool tag: if True, return tag in tuple (default False)
  1044. :return: key and value item pair or default if queue is empty
  1045. :raises Timeout: if database timeout expires
  1046. """
  1047. if prefix is None:
  1048. min_key = 0
  1049. max_key = 999999999999999
  1050. else:
  1051. min_key = prefix + '-000000000000000'
  1052. max_key = prefix + '-999999999999999'
  1053. order = {'front': 'ASC', 'back': 'DESC'}
  1054. select = (
  1055. 'SELECT rowid, key, expire_time, tag, mode, filename, value'
  1056. ' FROM Cache WHERE ? < key AND key < ? AND raw = 1'
  1057. ' ORDER BY key %s LIMIT 1'
  1058. ) % order[side]
  1059. if expire_time and tag:
  1060. default = default, None, None
  1061. elif expire_time or tag:
  1062. default = default, None
  1063. while True:
  1064. with self._transact() as (sql, cleanup):
  1065. rows = sql(select, (min_key, max_key)).fetchall()
  1066. if not rows:
  1067. return default
  1068. (rowid, key, db_expire, db_tag, mode, name, db_value), = rows
  1069. sql('DELETE FROM Cache WHERE rowid = ?', (rowid,))
  1070. if db_expire is not None and db_expire < time.time():
  1071. cleanup(name)
  1072. else:
  1073. break
  1074. try:
  1075. value = self._disk.fetch(mode, name, db_value, False)
  1076. except IOError as error:
  1077. if error.errno == errno.ENOENT:
  1078. # Key was deleted before we could retrieve result.
  1079. return default
  1080. else:
  1081. raise
  1082. finally:
  1083. if name is not None:
  1084. self._disk.remove(name)
  1085. if expire_time and tag:
  1086. return (key, value), db_expire, db_tag
  1087. elif expire_time:
  1088. return (key, value), db_expire
  1089. elif tag:
  1090. return (key, value), db_tag
  1091. else:
  1092. return key, value
  1093. def check(self, fix=False):
  1094. """Check database and file system consistency.
  1095. Intended for use in testing and post-mortem error analysis.
  1096. While checking the Cache table for consistency, a writer lock is held
  1097. on the database. The lock blocks other cache clients from writing to
  1098. the database. For caches with many file references, the lock may be
  1099. held for a long time. For example, local benchmarking shows that a
  1100. cache with 1,000 file references takes ~60ms to check.
  1101. :param bool fix: correct inconsistencies
  1102. :return: list of warnings
  1103. :raises Timeout: if database timeout expires
  1104. """
  1105. # pylint: disable=access-member-before-definition,W0201
  1106. with warnings.catch_warnings(record=True) as warns:
  1107. sql = self._sql
  1108. # Check integrity of database.
  1109. rows = sql('PRAGMA integrity_check').fetchall()
  1110. if len(rows) != 1 or rows[0][0] != u'ok':
  1111. for message, in rows:
  1112. warnings.warn(message)
  1113. if fix:
  1114. sql('VACUUM')
  1115. with self._transact() as (sql, _):
  1116. # Check Cache.filename against file system.
  1117. filenames = set()
  1118. select = (
  1119. 'SELECT rowid, size, filename FROM Cache'
  1120. ' WHERE filename IS NOT NULL'
  1121. )
  1122. rows = sql(select).fetchall()
  1123. for rowid, size, filename in rows:
  1124. full_path = op.join(self._directory, filename)
  1125. filenames.add(full_path)
  1126. if op.exists(full_path):
  1127. real_size = op.getsize(full_path)
  1128. if size != real_size:
  1129. message = 'wrong file size: %s, %d != %d'
  1130. args = full_path, real_size, size
  1131. warnings.warn(message % args)
  1132. if fix:
  1133. sql('UPDATE Cache SET size = ?'
  1134. ' WHERE rowid = ?',
  1135. (real_size, rowid),
  1136. )
  1137. continue
  1138. warnings.warn('file not found: %s' % full_path)
  1139. if fix:
  1140. sql('DELETE FROM Cache WHERE rowid = ?', (rowid,))
  1141. # Check file system against Cache.filename.
  1142. for dirpath, _, files in os.walk(self._directory):
  1143. paths = [op.join(dirpath, filename) for filename in files]
  1144. error = set(paths) - filenames
  1145. for full_path in error:
  1146. if DBNAME in full_path:
  1147. continue
  1148. message = 'unknown file: %s' % full_path
  1149. warnings.warn(message, UnknownFileWarning)
  1150. if fix:
  1151. os.remove(full_path)
  1152. # Check for empty directories.
  1153. for dirpath, dirs, files in os.walk(self._directory):
  1154. if not (dirs or files):
  1155. message = 'empty directory: %s' % dirpath
  1156. warnings.warn(message, EmptyDirWarning)
  1157. if fix:
  1158. os.rmdir(dirpath)
  1159. # Check Settings.count against count of Cache rows.
  1160. self.reset('count')
  1161. (count,), = sql('SELECT COUNT(key) FROM Cache').fetchall()
  1162. if self.count != count:
  1163. message = 'Settings.count != COUNT(Cache.key); %d != %d'
  1164. warnings.warn(message % (self.count, count))
  1165. if fix:
  1166. sql('UPDATE Settings SET value = ? WHERE key = ?',
  1167. (count, 'count'),
  1168. )
  1169. # Check Settings.size against sum of Cache.size column.
  1170. self.reset('size')
  1171. select_size = 'SELECT COALESCE(SUM(size), 0) FROM Cache'
  1172. (size,), = sql(select_size).fetchall()
  1173. if self.size != size:
  1174. message = 'Settings.size != SUM(Cache.size); %d != %d'
  1175. warnings.warn(message % (self.size, size))
  1176. if fix:
  1177. sql('UPDATE Settings SET value = ? WHERE key =?',
  1178. (size, 'size'),
  1179. )
  1180. return warns
  1181. def create_tag_index(self):
  1182. """Create tag index on cache database.
  1183. It is better to initialize cache with `tag_index=True` than use this.
  1184. :raises Timeout: if database timeout expires
  1185. """
  1186. sql = self._sql
  1187. sql('CREATE INDEX IF NOT EXISTS Cache_tag_rowid ON Cache(tag, rowid)')
  1188. self.reset('tag_index', 1)
  1189. def drop_tag_index(self):
  1190. """Drop tag index on cache database.
  1191. :raises Timeout: if database timeout expires
  1192. """
  1193. sql = self._sql
  1194. sql('DROP INDEX IF EXISTS Cache_tag_rowid')
  1195. self.reset('tag_index', 0)
  1196. def evict(self, tag):
  1197. """Remove items with matching `tag` from cache.
  1198. Removing items is an iterative process. In each iteration, a subset of
  1199. items is removed. Concurrent writes may occur between iterations.
  1200. If a :exc:`Timeout` occurs, the first element of the exception's
  1201. `args` attribute will be the number of items removed before the
  1202. exception occurred.
  1203. :param str tag: tag identifying items
  1204. :return: count of rows removed
  1205. :raises Timeout: if database timeout expires
  1206. """
  1207. select = (
  1208. 'SELECT rowid, filename FROM Cache'
  1209. ' WHERE tag = ? AND rowid > ?'
  1210. ' ORDER BY rowid LIMIT ?'
  1211. )
  1212. args = [tag, 0, 100]
  1213. return self._select_delete(select, args, arg_index=1)
  1214. def expire(self, now=None):
  1215. """Remove expired items from cache.
  1216. Removing items is an iterative process. In each iteration, a subset of
  1217. items is removed. Concurrent writes may occur between iterations.
  1218. If a :exc:`Timeout` occurs, the first element of the exception's
  1219. `args` attribute will be the number of items removed before the
  1220. exception occurred.
  1221. :param float now: current time (default None, ``time.time()`` used)
  1222. :return: count of items removed
  1223. :raises Timeout: if database timeout expires
  1224. """
  1225. select = (
  1226. 'SELECT rowid, expire_time, filename FROM Cache'
  1227. ' WHERE ? < expire_time AND expire_time < ?'
  1228. ' ORDER BY expire_time LIMIT ?'
  1229. )
  1230. args = [0, now or time.time(), 100]
  1231. return self._select_delete(select, args, row_index=1)
  1232. def cull(self):
  1233. """Cull items from cache until volume is less than size limit.
  1234. Removing items is an iterative process. In each iteration, a subset of
  1235. items is removed. Concurrent writes may occur between iterations.
  1236. If a :exc:`Timeout` occurs, the first element of the exception's
  1237. `args` attribute will be the number of items removed before the
  1238. exception occurred.
  1239. :return: count of items removed
  1240. :raises Timeout: if database timeout expires
  1241. """
  1242. now = time.time()
  1243. # Remove expired items.
  1244. count = self.expire(now)
  1245. # Remove items by policy.
  1246. select_policy = EVICTION_POLICY[self.eviction_policy]['cull']
  1247. if select_policy is None:
  1248. return
  1249. select_filename = select_policy.format(fields='filename', now=now)
  1250. try:
  1251. while self.volume() > self.size_limit:
  1252. with self._transact() as (sql, cleanup):
  1253. rows = sql(select_filename, (10,)).fetchall()
  1254. if not rows:
  1255. break
  1256. count += len(rows)
  1257. delete = (
  1258. 'DELETE FROM Cache WHERE rowid IN (%s)'
  1259. % select_policy.format(fields='rowid', now=now)
  1260. )
  1261. sql(delete, (10,))
  1262. for filename, in rows:
  1263. cleanup(filename)
  1264. except Timeout:
  1265. raise Timeout(count)
  1266. return count
  1267. def clear(self):
  1268. """Remove all items from cache.
  1269. Removing items is an iterative process. In each iteration, a subset of
  1270. items is removed. Concurrent writes may occur between iterations.
  1271. If a :exc:`Timeout` occurs, the first element of the exception's
  1272. `args` attribute will be the number of items removed before the
  1273. exception occurred.
  1274. :return: count of rows removed
  1275. :raises Timeout: if database timeout expires
  1276. """
  1277. select = (
  1278. 'SELECT rowid, filename FROM Cache'
  1279. ' WHERE rowid > ?'
  1280. ' ORDER BY rowid LIMIT ?'
  1281. )
  1282. args = [0, 100]
  1283. return self._select_delete(select, args)
  1284. def _select_delete(self, select, args, row_index=0, arg_index=0):
  1285. count = 0
  1286. delete = 'DELETE FROM Cache WHERE rowid IN (%s)'
  1287. try:
  1288. while True:
  1289. with self._transact() as (sql, cleanup):
  1290. rows = sql(select, args).fetchall()
  1291. if not rows:
  1292. break
  1293. count += len(rows)
  1294. sql(delete % ','.join(str(row[0]) for row in rows))
  1295. for row in rows:
  1296. args[arg_index] = row[row_index]
  1297. cleanup(row[-1])
  1298. except Timeout:
  1299. raise Timeout(count)
  1300. return count
  1301. def iterkeys(self, reverse=False):
  1302. """Iterate Cache keys in database sort order.
  1303. >>> cache = Cache('/tmp/diskcache')
  1304. >>> _ = cache.clear()
  1305. >>> for key in [4, 1, 3, 0, 2]:
  1306. ... cache[key] = key
  1307. >>> list(cache.iterkeys())
  1308. [0, 1, 2, 3, 4]
  1309. >>> list(cache.iterkeys(reverse=True))
  1310. [4, 3, 2, 1, 0]
  1311. :param bool reverse: reverse sort order (default False)
  1312. :return: iterator of Cache keys
  1313. """
  1314. sql = self._sql
  1315. limit = 100
  1316. _disk_get = self._disk.get
  1317. if reverse:
  1318. select = (
  1319. 'SELECT key, raw FROM Cache'
  1320. ' ORDER BY key DESC, raw DESC LIMIT 1'
  1321. )
  1322. iterate = (
  1323. 'SELECT key, raw FROM Cache'
  1324. ' WHERE key = ? AND raw < ? OR key < ?'
  1325. ' ORDER BY key DESC, raw DESC LIMIT ?'
  1326. )
  1327. else:
  1328. select = (
  1329. 'SELECT key, raw FROM Cache'
  1330. ' ORDER BY key ASC, raw ASC LIMIT 1'
  1331. )
  1332. iterate = (
  1333. 'SELECT key, raw FROM Cache'
  1334. ' WHERE key = ? AND raw > ? OR key > ?'
  1335. ' ORDER BY key ASC, raw ASC LIMIT ?'
  1336. )
  1337. row = sql(select).fetchall()
  1338. if row:
  1339. (key, raw), = row
  1340. else:
  1341. return
  1342. yield _disk_get(key, raw)
  1343. while True:
  1344. rows = sql(iterate, (key, raw, key, limit)).fetchall()
  1345. if not rows:
  1346. break
  1347. for key, raw in rows:
  1348. yield _disk_get(key, raw)
  1349. def _iter(self, ascending=True):
  1350. sql = self._sql
  1351. rows = sql('SELECT MAX(rowid) FROM Cache').fetchall()
  1352. (max_rowid,), = rows
  1353. yield # Signal ready.
  1354. if max_rowid is None:
  1355. return
  1356. bound = max_rowid + 1
  1357. limit = 100
  1358. _disk_get = self._disk.get
  1359. rowid = 0 if ascending else bound
  1360. select = (
  1361. 'SELECT rowid, key, raw FROM Cache'
  1362. ' WHERE ? < rowid AND rowid < ?'
  1363. ' ORDER BY rowid %s LIMIT ?'
  1364. ) % ('ASC' if ascending else 'DESC')
  1365. while True:
  1366. if ascending:
  1367. args = (rowid, bound, limit)
  1368. else:
  1369. args = (0, rowid, limit)
  1370. rows = sql(select, args).fetchall()
  1371. if not rows:
  1372. break
  1373. for rowid, key, raw in rows:
  1374. yield _disk_get(key, raw)
  1375. def __iter__(self):
  1376. "Iterate keys in cache including expired items."
  1377. iterator = self._iter()
  1378. next(iterator)
  1379. return iterator
  1380. def __reversed__(self):
  1381. "Reverse iterate keys in cache including expired items."
  1382. iterator = self._iter(ascending=False)
  1383. next(iterator)
  1384. return iterator
  1385. def stats(self, enable=True, reset=False):
  1386. """Return cache statistics hits and misses.
  1387. :param bool enable: enable collecting statistics (default True)
  1388. :param bool reset: reset hits and misses to 0 (default False)
  1389. :return: (hits, misses)
  1390. """
  1391. # pylint: disable=E0203,W0201
  1392. result = (self.reset('hits'), self.reset('misses'))
  1393. if reset:
  1394. self.reset('hits', 0)
  1395. self.reset('misses', 0)
  1396. self.reset('statistics', enable)
  1397. return result
  1398. def volume(self):
  1399. """Return estimated total size of cache on disk.
  1400. :return: size in bytes
  1401. """
  1402. (page_count,), = self._sql('PRAGMA page_count').fetchall()
  1403. total_size = self._page_size * page_count + self.reset('size')
  1404. return total_size
  1405. def close(self):
  1406. """Close database connection.
  1407. """
  1408. con = getattr(self._local, 'con', None)
  1409. if con is None:
  1410. return
  1411. con.close()
  1412. try:
  1413. delattr(self._local, 'con')
  1414. except AttributeError:
  1415. pass
  1416. def __enter__(self):
  1417. return self
  1418. def __exit__(self, *exception):
  1419. self.close()
  1420. def __len__(self):
  1421. "Count of items in cache including expired items."
  1422. return self.reset('count')
  1423. def __getstate__(self):
  1424. return (self.directory, self.timeout, type(self.disk))
  1425. def __setstate__(self, state):
  1426. self.__init__(*state)
  1427. def reset(self, key, value=ENOVAL, update=True):
  1428. """Reset `key` and `value` item from Settings table.
  1429. Use `reset` to update the value of Cache settings correctly. Cache
  1430. settings are stored in the Settings table of the SQLite database. If
  1431. `update` is ``False`` then no attempt is made to update the database.
  1432. If `value` is not given, it is reloaded from the Settings
  1433. table. Otherwise, the Settings table is updated.
  1434. Settings with the ``disk_`` prefix correspond to Disk
  1435. attributes. Updating the value will change the unprefixed attribute on
  1436. the associated Disk instance.
  1437. Settings with the ``sqlite_`` prefix correspond to SQLite
  1438. pragmas. Updating the value will execute the corresponding PRAGMA
  1439. statement.
  1440. SQLite PRAGMA statements may be executed before the Settings table
  1441. exists in the database by setting `update` to ``False``.
  1442. :param str key: Settings key for item
  1443. :param value: value for item (optional)
  1444. :param bool update: update database Settings table (default True)
  1445. :return: updated value for item
  1446. :raises Timeout: if database timeout expires
  1447. """
  1448. sql = self._sql
  1449. sql_retry = self._sql_retry
  1450. if value is ENOVAL:
  1451. select = 'SELECT value FROM Settings WHERE key = ?'
  1452. (value,), = sql_retry(select, (key,)).fetchall()
  1453. setattr(self, key, value)
  1454. return value
  1455. if update:
  1456. statement = 'UPDATE Settings SET value = ? WHERE key = ?'
  1457. sql_retry(statement, (value, key))
  1458. if key.startswith('sqlite_'):
  1459. pragma = key[7:]
  1460. # 2016-02-17 GrantJ - PRAGMA and isolation_level=None
  1461. # don't always play nicely together. Retry setting the
  1462. # PRAGMA. I think some PRAGMA statements expect to
  1463. # immediately take an EXCLUSIVE lock on the database. I
  1464. # can't find any documentation for this but without the
  1465. # retry, stress will intermittently fail with multiple
  1466. # processes.
  1467. # 2018-11-05 GrantJ - Avoid setting pragma values that
  1468. # are already set. Pragma settings like auto_vacuum and
  1469. # journal_mode can take a long time or may not work after
  1470. # tables have been created.
  1471. start = time.time()
  1472. while True:
  1473. try:
  1474. try:
  1475. (old_value,), = sql('PRAGMA %s' % (pragma)).fetchall()
  1476. update = old_value != value
  1477. except ValueError:
  1478. update = True
  1479. if update:
  1480. sql('PRAGMA %s = %s' % (pragma, value)).fetchall()
  1481. break
  1482. except sqlite3.OperationalError as exc:
  1483. if str(exc) != 'database is locked':
  1484. raise
  1485. diff = time.time() - start
  1486. if diff > 60:
  1487. raise
  1488. time.sleep(0.001)
  1489. elif key.startswith('disk_'):
  1490. attr = key[5:]
  1491. setattr(self._disk, attr, value)
  1492. setattr(self, key, value)
  1493. return value