syncplay/syncplay/utils.py
2017-09-07 19:37:03 +02:00

407 lines
14 KiB
Python

import time
import re
import datetime
from syncplay import constants
from syncplay.messages import getMessage
import sys
import os
import itertools
import hashlib
import random
import string
import urllib
import ast
import unicodedata
import platform
import subprocess
folderSearchEnabled = True
def retry(ExceptionToCheck, tries=4, delay=3, backoff=2, logger=None):
"""Retry calling the decorated function using an exponential backoff.
http://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/
original from: http://wiki.python.org/moin/PythonDecoratorLibrary#Retry
:param ExceptionToCheck: the exception to check. may be a tuple of
excpetions to check
:type ExceptionToCheck: Exception or tuple
:param tries: number of times to try (not retry) before giving up
:type tries: int
:param delay: initial delay between retries in seconds
:type delay: int
:param backoff: backoff multiplier e.g. value of 2 will double the delay
each retry
:type backoff: int
:param logger: logger to use. If None, print
:type logger: logging.Logger instance
"""
def deco_retry(f):
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay
try_one_last_time = True
while mtries > 1:
try:
return f(*args, **kwargs)
try_one_last_time = False
break
except ExceptionToCheck, e:
if logger:
msg = getMessage("retrying-notification").format(str(e), mdelay)
logger.warning(msg)
time.sleep(mdelay)
mtries -= 1
mdelay *= backoff
if try_one_last_time:
return f(*args, **kwargs)
return
return f_retry # true decorator
return deco_retry
def parseTime(timeStr):
regex = re.compile(constants.PARSE_TIME_REGEX)
parts = regex.match(timeStr)
if not parts:
return
parts = parts.groupdict()
time_params = {}
for (name, param) in parts.iteritems():
if param:
if name == "miliseconds":
time_params["microseconds"] = int(param) * 1000
else:
time_params[name] = int(param)
return datetime.timedelta(**time_params).total_seconds()
def formatTime(timeInSeconds, weeksAsTitles=True):
if timeInSeconds < 0:
timeInSeconds = -timeInSeconds
sign = '-'
else:
sign = ''
timeInSeconds = round(timeInSeconds)
weeks = timeInSeconds // 604800
if weeksAsTitles and weeks > 0:
title = weeks
weeks = 0
else:
title = 0
days = (timeInSeconds % 604800) // 86400
hours = (timeInSeconds % 86400) // 3600
minutes = (timeInSeconds % 3600) // 60
seconds = timeInSeconds % 60
if weeks > 0:
formattedTime = '{0:}{1:.0f}w, {2:.0f}d, {3:02.0f}:{4:02.0f}:{5:02.0f}'.format(sign, weeks, days, hours, minutes, seconds)
elif days > 0:
formattedTime = '{0:}{1:.0f}d, {2:02.0f}:{3:02.0f}:{4:02.0f}'.format(sign, days, hours, minutes, seconds)
elif hours > 0:
formattedTime = '{0:}{1:02.0f}:{2:02.0f}:{3:02.0f}'.format(sign, hours, minutes, seconds)
else:
formattedTime = '{0:}{1:02.0f}:{2:02.0f}'.format(sign, minutes, seconds)
if title > 0:
formattedTime = "{0:} (Title {1:.0f})".format(formattedTime, title)
return formattedTime
def formatSize (bytes, precise=False):
if bytes == 0: # E.g. when file size privacy is enabled
return "???"
try:
megabytes = int(bytes) / 1048576.0 # Technically this is a mebibyte, but whatever
if precise:
megabytes = round(megabytes, 1)
else:
megabytes = int(megabytes)
return str(megabytes) + getMessage("megabyte-suffix")
except: # E.g. when filesize is hashed
return "???"
def isASCII(s):
return all(ord(c) < 128 for c in s)
def findWorkingDir():
frozen = getattr(sys, 'frozen', '')
if not frozen:
path = os.path.dirname(os.path.dirname(__file__))
elif frozen in ('dll', 'console_exe', 'windows_exe'):
path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
elif frozen in ('macosx_app'):
path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))))
else:
path = ""
return path
def limitedPowerset(s, minLength):
return itertools.chain.from_iterable(itertools.combinations(s, r) for r in xrange(len(s), minLength, -1))
def blackholeStdoutForFrozenWindow():
if getattr(sys, 'frozen', '') == "windows_exe":
class Stderr(object):
softspace = 0
_file = None
_error = None
def write(self, text, fname='.syncplay.log'):
if self._file is None and self._error is None:
if os.name <> 'nt':
path = os.path.join(os.getenv('HOME', '.'), fname)
else:
path = os.path.join(os.getenv('APPDATA', '.'), fname)
self._file = open(path, 'a')
#TODO: Handle errors.
if self._file is not None:
self._file.write(text)
self._file.flush()
def flush(self):
if self._file is not None:
self._file.flush()
sys.stderr = Stderr()
del Stderr
class Blackhole(object):
softspace = 0
def write(self, text):
pass
def flush(self):
pass
sys.stdout = Blackhole()
del Blackhole
def truncateText(unicodeText, maxLength):
try:
unicodeText = unicodedata.normalize('NFC', unicodeText)
except:
pass
try:
maxSaneLength= maxLength*5
if len(unicodeText) > maxSaneLength:
unicodeText = unicode(unicodeText.encode("utf-8")[:maxSaneLength], "utf-8", errors="ignore")
while len(unicodeText) > maxLength:
unicodeText = unicode(unicodeText.encode("utf-8")[:-1], "utf-8", errors="ignore")
return unicodeText
except:
pass
return ""
# Relate to file hashing / difference checking:
def stripfilename(filename, stripURL):
if filename:
try:
filename = filename.encode('utf-8')
except UnicodeDecodeError:
pass
filename = urllib.unquote(filename)
if stripURL:
try:
filename = urllib.unquote(filename.split(u"/")[-1])
except UnicodeDecodeError:
filename = urllib.unquote(filename.split("/")[-1])
return re.sub(constants.FILENAME_STRIP_REGEX, "", filename)
else:
return ""
def stripRoomName(RoomName):
if RoomName:
try:
return re.sub(constants.ROOM_NAME_STRIP_REGEX, "\g<roomnamebase>", RoomName)
except IndexError:
return RoomName
else:
return ""
def hashFilename(filename, stripURL = False):
if isURL(filename):
stripURL = True
strippedFilename = stripfilename(filename, stripURL)
try:
strippedFilename = strippedFilename.encode('utf-8')
except UnicodeDecodeError:
pass
filenameHash = hashlib.sha256(strippedFilename).hexdigest()[:12]
return filenameHash
def hashFilesize(size):
return hashlib.sha256(str(size)).hexdigest()[:12]
def sameHashed(string1raw, string1hashed, string2raw, string2hashed):
try:
if string1raw.lower() == string2raw.lower():
return True
except AttributeError:
pass
if string1raw == string2raw:
return True
elif string1raw == string2hashed:
return True
elif string1hashed == string2raw:
return True
elif string1hashed == string2hashed:
return True
def sameFilename (filename1, filename2):
try:
filename1 = filename1.encode('utf-8')
except UnicodeDecodeError:
pass
try:
filename2 = filename2.encode('utf-8')
except UnicodeDecodeError:
pass
stripURL = True if isURL(filename1) ^ isURL(filename2) else False
if filename1 == constants.PRIVACY_HIDDENFILENAME or filename2 == constants.PRIVACY_HIDDENFILENAME:
return True
elif sameHashed(stripfilename(filename1, stripURL), hashFilename(filename1, stripURL), stripfilename(filename2, stripURL), hashFilename(filename2, stripURL)):
return True
else:
return False
def sameFilesize (filesize1, filesize2):
if filesize1 == 0 or filesize2 == 0:
return True
elif sameHashed(filesize1, hashFilesize(filesize1), filesize2, hashFilesize(filesize2)):
return True
else:
return False
def sameFileduration (duration1, duration2):
if not constants.SHOW_DURATION_NOTIFICATION:
return True
elif abs(round(duration1) - round(duration2)) < constants.DIFFERENT_DURATION_THRESHOLD:
return True
else:
return False
def meetsMinVersion(version, minVersion):
def versiontotuple(ver):
return tuple(map(int, ver.split(".")))
return versiontotuple(version) >= versiontotuple(minVersion)
def isURL(path):
if path is None:
return False
elif "://" in path:
return True
else:
return False
def getPlayerArgumentsByPathAsArray(arguments, path):
if arguments and not isinstance(arguments, (str, unicode)) and arguments.has_key(path):
return arguments[path]
else:
return None
def getPlayerArgumentsByPathAsText(arguments, path):
argsToReturn = getPlayerArgumentsByPathAsArray(arguments, path)
return " ".join(argsToReturn) if argsToReturn else ""
def getListAsMultilineString(pathArray):
return u"\n".join(pathArray) if pathArray else ""
def convertMultilineStringToList(multilineString):
return unicode.split(multilineString,u"\n") if multilineString else ""
def playlistIsValid(files):
if len(files) > constants.PLAYLIST_MAX_ITEMS:
return False
elif sum(map(len, files)) > constants.PLAYLIST_MAX_CHARACTERS:
return False
return True
def getDomainFromURL(URL):
try:
URL = URL.split("//")[-1].split("/")[0]
if URL.startswith("www."):
URL = URL[4:]
return URL
except:
return None
def open_system_file_browser(path):
if isURL(path):
return
path = os.path.dirname(path)
if platform.system() == "Windows":
os.startfile(path)
elif platform.system() == "Darwin":
subprocess.Popen(["open", path])
else:
subprocess.Popen(["xdg-open", path])
def getListOfPublicServers():
try:
import urllib, syncplay, sys, messages, json
params = urllib.urlencode({'version': syncplay.version, 'milestone': syncplay.milestone, 'release_number': syncplay.release_number,
'language': messages.messages["CURRENT"]})
f = urllib.urlopen(constants.SYNCPLAY_PUBLIC_SERVER_LIST_URL.format(params))
response = f.read()
response = response.replace("<p>","").replace("</p>","").replace("<br />","").replace("&#8220;","'").replace("&#8221;","'").replace(":&#8217;","'").replace("&#8217;","'").replace("&#8242;","'").replace("\n","").replace("\r","") # Fix Wordpress
response = ast.literal_eval(response)
if response:
return response
else:
raise IOError
except:
raise IOError(getMessage("failed-to-load-server-list-error"))
class RoomPasswordProvider(object):
CONTROLLED_ROOM_REGEX = re.compile("^\+(.*):(\w{12})$")
PASSWORD_REGEX = re.compile("[A-Z]{2}-\d{3}-\d{3}")
@staticmethod
def isControlledRoom(roomName):
return bool(re.match(RoomPasswordProvider.CONTROLLED_ROOM_REGEX, roomName))
@staticmethod
def check(roomName, password, salt):
if not password or not re.match(RoomPasswordProvider.PASSWORD_REGEX, password):
raise ValueError()
if not roomName:
raise NotControlledRoom()
match = re.match(RoomPasswordProvider.CONTROLLED_ROOM_REGEX, roomName)
if not match:
raise NotControlledRoom()
roomHash = match.group(2)
computedHash = RoomPasswordProvider._computeRoomHash(match.group(1), password, salt)
return roomHash == computedHash
@staticmethod
def getControlledRoomName(roomName, password, salt):
return "+" + roomName + ":" + RoomPasswordProvider._computeRoomHash(roomName, password, salt)
@staticmethod
def _computeRoomHash(roomName, password, salt):
roomName = roomName.encode('utf8')
salt = hashlib.sha256(salt).hexdigest()
provisionalHash = hashlib.sha256(roomName + salt).hexdigest()
return hashlib.sha1(provisionalHash + salt + password).hexdigest()[:12].upper()
class RandomStringGenerator(object):
@staticmethod
def generate_room_password():
parts = (
RandomStringGenerator._get_random_letters(2),
RandomStringGenerator._get_random_numbers(3),
RandomStringGenerator._get_random_numbers(3)
)
return "{}-{}-{}".format(*parts)
@staticmethod
def generate_server_salt():
parts = (
RandomStringGenerator._get_random_letters(10),
)
return "{}".format(*parts)
@staticmethod
def _get_random_letters(quantity):
return ''.join(random.choice(string.ascii_uppercase) for _ in xrange(quantity))
@staticmethod
def _get_random_numbers(quantity):
return ''.join(random.choice(string.digits) for _ in xrange(quantity))
class NotControlledRoom(Exception):
pass