# # common.py # # Copyright (C) 2007, 2008 Andrew Resch # # Deluge is free software. # # You may redistribute it and/or modify it under the terms of the # GNU General Public License, as published by the Free Software # Foundation; either version 3 of the License, or (at your option) # any later version. # # deluge is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. # See the GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with deluge. If not, write to: # The Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor # Boston, MA 02110-1301, USA. # """Common functions for various parts of Deluge to use.""" import os import time import subprocess import platform import sys try: import json except ImportError: import simplejson as json import pkg_resources import xdg, xdg.BaseDirectory LT_TORRENT_STATE = { "Queued": 0, "Checking": 1, "Downloading Metadata": 2, "Downloading": 3, "Finished": 4, "Seeding": 5, "Allocating": 6, 0: "Queued", 1: "Checking", 2: "Downloading Metadata", 3: "Downloading", 4: "Finished", 5: "Seeding", 6: "Allocating", } TORRENT_STATE = [ "Allocating", "Checking", "Downloading", "Seeding", "Paused", "Error", "Queued" ] FILE_PRIORITY = { 0: "Do Not Download", 1: "Normal Priority", 2: "High Priority", 5: "Highest Priority", "Do Not Download": 0, "Normal Priority": 1, "High Priority": 2, "Highest Priority": 5 } def get_version(): """ Returns the program version from the egg metadata :returns: the version of Deluge :rtype: string """ return pkg_resources.require("Deluge")[0].version def get_revision(): """ The svn revision of the build if available :returns: the svn revision, or "" :rtype: string """ revision = "" try: f = open(pkg_resources.resource_filename("deluge", os.path.join("data", "revision"))) revision = f.read() f.close() except IOError, e: pass return revision def get_default_config_dir(filename=None): """ :param filename: if None, only the config path is returned, if provided, a path including the filename will be returned :returns: a file path to the config directory and optional filename :rtype: string """ if windows_check(): if filename: return os.path.join(os.environ.get("APPDATA"), "deluge", filename) else: return os.path.join(os.environ.get("APPDATA"), "deluge") else: if filename: return os.path.join(xdg.BaseDirectory.save_config_path("deluge"), filename) else: return xdg.BaseDirectory.save_config_path("deluge") def get_default_download_dir(): """ :returns: the default download directory :rtype: string """ if windows_check(): return os.path.expanduser("~") else: return os.environ.get("HOME") def windows_check(): """ Checks if the current platform is Windows :returns: True or False :rtype: bool """ return platform.system() in ('Windows', 'Microsoft') def vista_check(): """ Checks if the current platform is Windows Vista :returns: True or False :rtype: bool """ return platform.release() == "Vista" def osx_check(): """ Checks if the current platform is Mac OS X :returns: True or False :rtype: bool """ return platform.system() == "Darwin" def get_pixmap(fname): """ Provides easy access to files in the deluge/data/pixmaps folder within the Deluge egg :param fname: the filename to look for :returns: a path to a pixmap file included with Deluge :rtype: string """ return pkg_resources.resource_filename("deluge", os.path.join("data", \ "pixmaps", fname)) def open_file(path): """ Opens a file or folder using the system configured program :param path: the path to the file or folder to open """ if windows_check(): os.startfile("%s" % path) else: subprocess.Popen(["xdg-open", "%s" % path]) def open_url_in_browser(url): """ Opens a url in the desktop's default browser :param url: the url to open """ import threading import webbrowser class BrowserThread(threading.Thread): def __init__(self, url): threading.Thread.__init__(self) self.url = url def run(self): webbrowser.open(self.url) BrowserThread(url).start() ## Formatting text functions def fsize(fsize_b): """ Formats the bytes value into a string with KiB, MiB or GiB units :param fsize_b: int, the filesize in bytes :returns: formatted string in KiB, MiB or GiB units :rtype: string **Usage** >>> fsize(112245) '109.6 KiB' """ fsize_kb = fsize_b / 1024.0 if fsize_kb < 1024: return "%.1f KiB" % fsize_kb fsize_mb = fsize_kb / 1024.0 if fsize_mb < 1024: return "%.1f MiB" % fsize_mb fsize_gb = fsize_mb / 1024.0 return "%.1f GiB" % fsize_gb def fpcnt(dec): """ Formats a string to display a percentage with two decimal places :param dec: float, the ratio in the range [0.0, 1.0] :returns: a formatted string representing a percentage :rtype: string **Usage** >>> fpcnt(0.9311) '93.11%' """ return '%.2f%%' % (dec * 100) def fspeed(bps): """ Formats a string to display a transfer speed utilizing :func:`fsize` :param bps: int, bytes per second :returns: a formatted string representing transfer speed :rtype: string **Usage** >>> fspeed(43134) '42.1 KiB/s' """ return '%s/s' % (fsize(bps)) def fpeer(num_peers, total_peers): """ Formats a string to show 'num_peers' ('total_peers') :param num_peers: int, the number of connected peers :param total_peers: int, the total number of peers :returns: a formatted string: num_peers (total_peers), if total_peers < 0, then it will not be shown :rtype: string **Usage** >>> fpeer(10, 20) '10 (20)' >>> fpeer(10, -1) '10' """ if total_peers > -1: return "%d (%d)" % (num_peers, total_peers) else: return "%d" % num_peers def ftime(seconds): """ Formats a string to show time in a human readable form :param seconds: int, the number of seconds :returns: a formatted time string, will return '' if seconds == 0 :rtype: string **Usage** >>> ftime(23011) '6h 23m' """ if seconds == 0: return "" if seconds < 60: return '%ds' % (seconds) minutes = seconds / 60 if minutes < 60: seconds = seconds % 60 return '%dm %ds' % (minutes, seconds) hours = minutes / 60 if hours < 24: minutes = minutes % 60 return '%dh %dm' % (hours, minutes) days = hours / 24 if days < 7: hours = hours % 24 return '%dd %dh' % (days, hours) weeks = days / 7 if weeks < 52: days = days % 7 return '%dw %dd' % (weeks, days) years = weeks / 52 weeks = weeks % 52 return '%dy %dw' % (years, weeks) def fdate(seconds): """ Formats a date string in the locale's date representation based on the systems timezone :param seconds: float, time in seconds since the Epoch :returns: a string in the locale's date representation or "" if seconds < 0 :rtype: string """ if seconds < 0: return "" return time.strftime("%x", time.localtime(seconds)) def is_url(url): """ A simple regex test to check if the URL is valid :param url: string, the url to test :returns: True or False :rtype: bool **Usage** >>> is_url("http://deluge-torrent.org") True """ import re return bool(re.search('^(https?|ftp|udp)://', url)) def is_magnet(uri): """ A check to determine if a uri is a valid bittorrent magnet uri :param uri: string, the uri to check :returns: True or False :rtype: bool **Usage** >>> is_magnet("magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN") True """ if uri[:20] == "magnet:?xt=urn:btih:": return True return False def fetch_url(url): """ Downloads a torrent file from a given URL and checks the file's validity :param url: string, the url of the .torrent file to fetch :returns: the filepath to the downloaded file :rtype: string """ import urllib from deluge.log import LOG as log try: filename, headers = urllib.urlretrieve(url) except IOError: log.debug("Network error while trying to fetch torrent from %s", url) else: if filename.endswith(".torrent") or headers["content-type"] ==\ "application/x-bittorrent": return filename else: log.debug("URL doesn't appear to be a valid torrent file: %s", url) return None def create_magnet_uri(infohash, name=None, trackers=[]): """ Creates a magnet uri :param infohash: string, the info-hash of the torrent :param name: string, the name of the torrent (optional) :param trackers: list of strings, the trackers to announce to (optional) :returns: a magnet uri string :rtype: string """ from base64 import b32encode uri = "magnet:?xt=urn:btih:" + b32encode(infohash.decode("hex")) if name: uri = uri + "&dn=" + name if trackers: for t in trackers: uri = uri + "&tr=" + t return uri def get_path_size(path): """ Gets the size in bytes of 'path' :param path: string, the path to check for size :returns: the size in bytes of the path or -1 if the path does not exist :rtype: int """ if not os.path.exists(path): return -1 if os.path.isfile(path): return os.path.getsize(path) dir_size = 0 for (p, dirs, files) in os.walk(path): for file in files: filename = os.path.join(p, file) dir_size += os.path.getsize(filename) return dir_size def free_space(path): """ Gets the free space available at 'path' :param path: string, the path to check :returns: the free space at path in bytes :rtype: int """ if windows_check(): import win32file sectors, bytes, free, total = map(long, win32file.GetDiskFreeSpace(path)) return (free * sectors * bytes) else: disk_data = os.statvfs(path) block_size = disk_data.f_bsize return disk_data.f_bavail * block_size def is_ip(ip): """ A simple test to see if 'ip' is valid :param ip: string, the ip to check :returns: True or False :rtype: bool ** Usage ** >>> is_ip("127.0.0.1") True """ import socket #first we test ipv4 try: if socket.inet_pton(socket.AF_INET, "%s" % (ip)): return True except socket.error: if not socket.has_ipv6: return False #now test ipv6 try: if socket.inet_pton(socket.AF_INET6, "%s" % (ip)): return True except socket.error: return False