Plex plugin to to play various online streams (mostly Latvian).

universaldetector.py 6.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. ######################## BEGIN LICENSE BLOCK ########################
  2. # The Original Code is Mozilla Universal charset detector code.
  3. #
  4. # The Initial Developer of the Original Code is
  5. # Netscape Communications Corporation.
  6. # Portions created by the Initial Developer are Copyright (C) 2001
  7. # the Initial Developer. All Rights Reserved.
  8. #
  9. # Contributor(s):
  10. # Mark Pilgrim - port to Python
  11. # Shy Shalom - original C code
  12. #
  13. # This library is free software; you can redistribute it and/or
  14. # modify it under the terms of the GNU Lesser General Public
  15. # License as published by the Free Software Foundation; either
  16. # version 2.1 of the License, or (at your option) any later version.
  17. #
  18. # This library is distributed in the hope that it will be useful,
  19. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  20. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  21. # Lesser General Public License for more details.
  22. #
  23. # You should have received a copy of the GNU Lesser General Public
  24. # License along with this library; if not, write to the Free Software
  25. # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  26. # 02110-1301 USA
  27. ######################### END LICENSE BLOCK #########################
  28. from . import constants
  29. import sys
  30. import codecs
  31. from .latin1prober import Latin1Prober # windows-1252
  32. from .mbcsgroupprober import MBCSGroupProber # multi-byte character sets
  33. from .sbcsgroupprober import SBCSGroupProber # single-byte character sets
  34. from .escprober import EscCharSetProber # ISO-2122, etc.
  35. import re
  36. MINIMUM_THRESHOLD = 0.20
  37. ePureAscii = 0
  38. eEscAscii = 1
  39. eHighbyte = 2
  40. class UniversalDetector:
  41. def __init__(self):
  42. self._highBitDetector = re.compile(b'[\x80-\xFF]')
  43. self._escDetector = re.compile(b'(\033|~{)')
  44. self._mEscCharSetProber = None
  45. self._mCharSetProbers = []
  46. self.reset()
  47. def reset(self):
  48. self.result = {'encoding': None, 'confidence': 0.0}
  49. self.done = False
  50. self._mStart = True
  51. self._mGotData = False
  52. self._mInputState = ePureAscii
  53. self._mLastChar = b''
  54. if self._mEscCharSetProber:
  55. self._mEscCharSetProber.reset()
  56. for prober in self._mCharSetProbers:
  57. prober.reset()
  58. def feed(self, aBuf):
  59. if self.done:
  60. return
  61. aLen = len(aBuf)
  62. if not aLen:
  63. return
  64. if not self._mGotData:
  65. # If the data starts with BOM, we know it is UTF
  66. if aBuf[:3] == codecs.BOM_UTF8:
  67. # EF BB BF UTF-8 with BOM
  68. self.result = {'encoding': "UTF-8-SIG", 'confidence': 1.0}
  69. elif aBuf[:4] == codecs.BOM_UTF32_LE:
  70. # FF FE 00 00 UTF-32, little-endian BOM
  71. self.result = {'encoding': "UTF-32LE", 'confidence': 1.0}
  72. elif aBuf[:4] == codecs.BOM_UTF32_BE:
  73. # 00 00 FE FF UTF-32, big-endian BOM
  74. self.result = {'encoding': "UTF-32BE", 'confidence': 1.0}
  75. elif aBuf[:4] == b'\xFE\xFF\x00\x00':
  76. # FE FF 00 00 UCS-4, unusual octet order BOM (3412)
  77. self.result = {
  78. 'encoding': "X-ISO-10646-UCS-4-3412",
  79. 'confidence': 1.0
  80. }
  81. elif aBuf[:4] == b'\x00\x00\xFF\xFE':
  82. # 00 00 FF FE UCS-4, unusual octet order BOM (2143)
  83. self.result = {
  84. 'encoding': "X-ISO-10646-UCS-4-2143",
  85. 'confidence': 1.0
  86. }
  87. elif aBuf[:2] == codecs.BOM_LE:
  88. # FF FE UTF-16, little endian BOM
  89. self.result = {'encoding': "UTF-16LE", 'confidence': 1.0}
  90. elif aBuf[:2] == codecs.BOM_BE:
  91. # FE FF UTF-16, big endian BOM
  92. self.result = {'encoding': "UTF-16BE", 'confidence': 1.0}
  93. self._mGotData = True
  94. if self.result['encoding'] and (self.result['confidence'] > 0.0):
  95. self.done = True
  96. return
  97. if self._mInputState == ePureAscii:
  98. if self._highBitDetector.search(aBuf):
  99. self._mInputState = eHighbyte
  100. elif ((self._mInputState == ePureAscii) and
  101. self._escDetector.search(self._mLastChar + aBuf)):
  102. self._mInputState = eEscAscii
  103. self._mLastChar = aBuf[-1:]
  104. if self._mInputState == eEscAscii:
  105. if not self._mEscCharSetProber:
  106. self._mEscCharSetProber = EscCharSetProber()
  107. if self._mEscCharSetProber.feed(aBuf) == constants.eFoundIt:
  108. self.result = {'encoding': self._mEscCharSetProber.get_charset_name(),
  109. 'confidence': self._mEscCharSetProber.get_confidence()}
  110. self.done = True
  111. elif self._mInputState == eHighbyte:
  112. if not self._mCharSetProbers:
  113. self._mCharSetProbers = [MBCSGroupProber(), SBCSGroupProber(),
  114. Latin1Prober()]
  115. for prober in self._mCharSetProbers:
  116. if prober.feed(aBuf) == constants.eFoundIt:
  117. self.result = {'encoding': prober.get_charset_name(),
  118. 'confidence': prober.get_confidence()}
  119. self.done = True
  120. break
  121. def close(self):
  122. if self.done:
  123. return
  124. if not self._mGotData:
  125. if constants._debug:
  126. sys.stderr.write('no data received!\n')
  127. return
  128. self.done = True
  129. if self._mInputState == ePureAscii:
  130. self.result = {'encoding': 'ascii', 'confidence': 1.0}
  131. return self.result
  132. if self._mInputState == eHighbyte:
  133. proberConfidence = None
  134. maxProberConfidence = 0.0
  135. maxProber = None
  136. for prober in self._mCharSetProbers:
  137. if not prober:
  138. continue
  139. proberConfidence = prober.get_confidence()
  140. if proberConfidence > maxProberConfidence:
  141. maxProberConfidence = proberConfidence
  142. maxProber = prober
  143. if maxProber and (maxProberConfidence > MINIMUM_THRESHOLD):
  144. self.result = {'encoding': maxProber.get_charset_name(),
  145. 'confidence': maxProber.get_confidence()}
  146. return self.result
  147. if constants._debug:
  148. sys.stderr.write('no probers hit minimum threshhold\n')
  149. for prober in self._mCharSetProbers[0].mProbers:
  150. if not prober:
  151. continue
  152. sys.stderr.write('%s confidence = %s\n' %
  153. (prober.get_charset_name(),
  154. prober.get_confidence()))