Convert all the twisted.trial tests to pytest_twisted. Also move off of unittest.TestCase as well. Seems there were several tests which weren't actually testing what they should, and also some code that wasn't doing what the broken test said it should.
Goals:
Remove twisted.trial tests
Move to pytest fixtures, rather than many classess and subclasses with setup and teardown functions
Move away from self.assertX to assert style tests
FIx broken tests
Going forward I think these should be the goals when adding/modifying tests:
* Don't use BaseTest or set_up tear_down methods any more. Fixtures should be used either in the test module/class, or make/improve the ones available in conftest.py
* For sure don't use unittest or twisted.trial, they mess up the pytest stuff.
* Prefer pytest_twisted.ensureDeferred with an async function over inlineCallbacks.
- I think the async function syntax is nicer, and it helps catch silly mistakes, e.g. await None is invalid, but yield None isn't, so if some function returns an unexpected thing we try to await on, it will be caught earlier. (I struggled debugging a test for quite a while, then caught it immediately when switching to the new syntax)
- Once the maybe_coroutine PR goes in, using the async syntax can also improve tracebacks when debugging tests.
Things that should probably be cleaned up going forward:
* Remove BaseTestCase
* Remove the subclasses like DaemonBase in favor of new fixtures.
* I think there are some other utility subclasses that could be removed too
* Perhaps use parameterization in the ui_entry tests, rather that the weird combination of subclasses and the set_var fixture I mixed in.
* Convert some of the callback stuff to pytest_twisted.ensureDeferred tests, just for nicer readability
Details relating to pytest fixtures conftest.py in root dir:
* https://github.com/pytest-dev/pytest/issues/5822#issuecomment-697331920
* https://docs.pytest.org/en/latest/deprecations.html#pytest-plugins-in-non-top-level-conftest-files
Closes: https://github.com/deluge-torrent/deluge/pull/354
484 lines
18 KiB
Python
484 lines
18 KiB
Python
#
|
|
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
|
|
# the additional special exception to link portions of this program with the OpenSSL library.
|
|
# See LICENSE for more details.
|
|
#
|
|
|
|
from base64 import b64encode
|
|
from hashlib import sha1 as sha
|
|
|
|
import pytest
|
|
import pytest_twisted
|
|
from twisted.internet import defer, reactor, task
|
|
from twisted.internet.error import CannotListenError
|
|
from twisted.web.http import FORBIDDEN
|
|
from twisted.web.resource import EncodingResourceWrapper, Resource
|
|
from twisted.web.server import GzipEncoderFactory, Site
|
|
from twisted.web.static import File
|
|
|
|
import deluge.common
|
|
import deluge.component as component
|
|
import deluge.core.torrent
|
|
from deluge._libtorrent import lt
|
|
from deluge.conftest import BaseTestCase
|
|
from deluge.core.core import Core
|
|
from deluge.core.rpcserver import RPCServer
|
|
from deluge.error import AddTorrentError, InvalidTorrentError
|
|
|
|
from . import common
|
|
|
|
common.disable_new_release_check()
|
|
|
|
|
|
class CookieResource(Resource):
|
|
def render(self, request):
|
|
if request.getCookie(b'password') != b'deluge':
|
|
request.setResponseCode(FORBIDDEN)
|
|
return
|
|
|
|
request.setHeader(b'Content-Type', b'application/x-bittorrent')
|
|
with open(
|
|
common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent'), 'rb'
|
|
) as _file:
|
|
data = _file.read()
|
|
return data
|
|
|
|
|
|
class PartialDownload(Resource):
|
|
def getChild(self, path, request): # NOQA: N802
|
|
return EncodingResourceWrapper(self, [GzipEncoderFactory()])
|
|
|
|
def render(self, request):
|
|
with open(
|
|
common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent'), 'rb'
|
|
) as _file:
|
|
data = _file.read()
|
|
request.setHeader(b'Content-Length', str(len(data)))
|
|
request.setHeader(b'Content-Type', b'application/x-bittorrent')
|
|
return data
|
|
|
|
|
|
class RedirectResource(Resource):
|
|
def render(self, request):
|
|
request.redirect(b'/ubuntu-9.04-desktop-i386.iso.torrent')
|
|
return b''
|
|
|
|
|
|
class TopLevelResource(Resource):
|
|
def __init__(self):
|
|
Resource.__init__(self)
|
|
self.putChild(b'cookie', CookieResource())
|
|
self.putChild(b'partial', PartialDownload())
|
|
self.putChild(b'redirect', RedirectResource())
|
|
self.putChild(
|
|
b'ubuntu-9.04-desktop-i386.iso.torrent',
|
|
File(common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent')),
|
|
)
|
|
|
|
|
|
class TestCore(BaseTestCase):
|
|
def set_up(self):
|
|
self.rpcserver = RPCServer(listen=False)
|
|
self.core = Core()
|
|
self.core.config.config['lsd'] = False
|
|
self.clock = task.Clock()
|
|
self.core.torrentmanager.callLater = self.clock.callLater
|
|
self.listen_port = 51242
|
|
return component.start().addCallback(self.start_web_server)
|
|
|
|
def start_web_server(self, result):
|
|
website = Site(TopLevelResource())
|
|
for dummy in range(10):
|
|
try:
|
|
self.webserver = reactor.listenTCP(self.listen_port, website)
|
|
except CannotListenError as ex:
|
|
error = ex
|
|
self.listen_port += 1
|
|
else:
|
|
break
|
|
else:
|
|
raise error
|
|
|
|
return result
|
|
|
|
def tear_down(self):
|
|
def on_shutdown(result):
|
|
del self.rpcserver
|
|
del self.core
|
|
return self.webserver.stopListening()
|
|
|
|
return component.shutdown().addCallback(on_shutdown)
|
|
|
|
def add_torrent(self, filename, paused=False):
|
|
if not paused:
|
|
# Patch libtorrent flags starting torrents paused
|
|
self.patch(
|
|
deluge.core.torrentmanager,
|
|
'LT_DEFAULT_ADD_TORRENT_FLAGS',
|
|
lt.add_torrent_params_flags_t.flag_auto_managed
|
|
| lt.add_torrent_params_flags_t.flag_update_subscribe
|
|
| lt.add_torrent_params_flags_t.flag_apply_ip_filter,
|
|
)
|
|
options = {'add_paused': paused, 'auto_managed': False}
|
|
filepath = common.get_test_data_file(filename)
|
|
with open(filepath, 'rb') as _file:
|
|
filedump = b64encode(_file.read())
|
|
torrent_id = self.core.add_torrent_file(filename, filedump, options)
|
|
return torrent_id
|
|
|
|
@pytest_twisted.inlineCallbacks
|
|
def test_add_torrent_files(self):
|
|
options = {}
|
|
filenames = ['test.torrent', 'test_torrent.file.torrent']
|
|
files_to_add = []
|
|
for f in filenames:
|
|
filename = common.get_test_data_file(f)
|
|
with open(filename, 'rb') as _file:
|
|
filedump = b64encode(_file.read())
|
|
files_to_add.append((filename, filedump, options))
|
|
errors = yield self.core.add_torrent_files(files_to_add)
|
|
assert len(errors) == 0
|
|
|
|
@pytest_twisted.inlineCallbacks
|
|
def test_add_torrent_files_error_duplicate(self):
|
|
options = {}
|
|
filenames = ['test.torrent', 'test.torrent']
|
|
files_to_add = []
|
|
for f in filenames:
|
|
filename = common.get_test_data_file(f)
|
|
with open(filename, 'rb') as _file:
|
|
filedump = b64encode(_file.read())
|
|
files_to_add.append((filename, filedump, options))
|
|
errors = yield self.core.add_torrent_files(files_to_add)
|
|
assert len(errors) == 1
|
|
assert str(errors[0]).startswith('Torrent already in session')
|
|
|
|
@pytest_twisted.inlineCallbacks
|
|
def test_add_torrent_file(self):
|
|
options = {}
|
|
filename = common.get_test_data_file('test.torrent')
|
|
with open(filename, 'rb') as _file:
|
|
filedump = b64encode(_file.read())
|
|
torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options)
|
|
|
|
# Get the info hash from the test.torrent
|
|
from deluge.bencode import bdecode, bencode
|
|
|
|
with open(filename, 'rb') as _file:
|
|
info_hash = sha(bencode(bdecode(_file.read())[b'info'])).hexdigest()
|
|
assert torrent_id == info_hash
|
|
|
|
def test_add_torrent_file_invalid_filedump(self):
|
|
options = {}
|
|
filename = common.get_test_data_file('test.torrent')
|
|
with pytest.raises(AddTorrentError):
|
|
self.core.add_torrent_file(filename, False, options)
|
|
|
|
@pytest_twisted.inlineCallbacks
|
|
def test_add_torrent_url(self):
|
|
url = (
|
|
'http://localhost:%d/ubuntu-9.04-desktop-i386.iso.torrent'
|
|
% self.listen_port
|
|
)
|
|
options = {}
|
|
info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
|
|
|
|
torrent_id = yield self.core.add_torrent_url(url, options)
|
|
assert torrent_id == info_hash
|
|
|
|
@pytest_twisted.ensureDeferred
|
|
async def test_add_torrent_url_with_cookie(self):
|
|
url = 'http://localhost:%d/cookie' % self.listen_port
|
|
options = {}
|
|
headers = {'Cookie': 'password=deluge'}
|
|
info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
|
|
|
|
with pytest.raises(Exception):
|
|
await self.core.add_torrent_url(url, options)
|
|
|
|
result = await self.core.add_torrent_url(url, options, headers)
|
|
assert result == info_hash
|
|
|
|
@pytest_twisted.ensureDeferred
|
|
async def test_add_torrent_url_with_redirect(self):
|
|
url = 'http://localhost:%d/redirect' % self.listen_port
|
|
options = {}
|
|
info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
|
|
|
|
result = await self.core.add_torrent_url(url, options)
|
|
assert result == info_hash
|
|
|
|
@pytest_twisted.ensureDeferred
|
|
async def test_add_torrent_url_with_partial_download(self):
|
|
url = 'http://localhost:%d/partial' % self.listen_port
|
|
options = {}
|
|
info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
|
|
|
|
result = await self.core.add_torrent_url(url, options)
|
|
assert result == info_hash
|
|
|
|
@pytest_twisted.inlineCallbacks
|
|
def test_add_torrent_magnet(self):
|
|
info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
|
|
uri = deluge.common.create_magnet_uri(info_hash)
|
|
options = {}
|
|
torrent_id = yield self.core.add_torrent_magnet(uri, options)
|
|
assert torrent_id == info_hash
|
|
|
|
def test_resume_torrent(self):
|
|
tid1 = self.add_torrent('test.torrent', paused=True)
|
|
tid2 = self.add_torrent('test_torrent.file.torrent', paused=True)
|
|
# Assert paused
|
|
r1 = self.core.get_torrent_status(tid1, ['paused'])
|
|
assert r1['paused']
|
|
r2 = self.core.get_torrent_status(tid2, ['paused'])
|
|
assert r2['paused']
|
|
|
|
self.core.resume_torrent(tid2)
|
|
r1 = self.core.get_torrent_status(tid1, ['paused'])
|
|
assert r1['paused']
|
|
r2 = self.core.get_torrent_status(tid2, ['paused'])
|
|
assert not r2['paused']
|
|
|
|
def test_resume_torrent_list(self):
|
|
"""Backward compatibility for list of torrent_ids."""
|
|
torrent_id = self.add_torrent('test.torrent', paused=True)
|
|
self.core.resume_torrent([torrent_id])
|
|
result = self.core.get_torrent_status(torrent_id, ['paused'])
|
|
assert not result['paused']
|
|
|
|
def test_resume_torrents(self):
|
|
tid1 = self.add_torrent('test.torrent', paused=True)
|
|
tid2 = self.add_torrent('test_torrent.file.torrent', paused=True)
|
|
self.core.resume_torrents([tid1, tid2])
|
|
r1 = self.core.get_torrent_status(tid1, ['paused'])
|
|
assert not r1['paused']
|
|
r2 = self.core.get_torrent_status(tid2, ['paused'])
|
|
assert not r2['paused']
|
|
|
|
def test_resume_torrents_all(self):
|
|
"""With no torrent_ids param, resume all torrents"""
|
|
tid1 = self.add_torrent('test.torrent', paused=True)
|
|
tid2 = self.add_torrent('test_torrent.file.torrent', paused=True)
|
|
self.core.resume_torrents()
|
|
r1 = self.core.get_torrent_status(tid1, ['paused'])
|
|
assert not r1['paused']
|
|
r2 = self.core.get_torrent_status(tid2, ['paused'])
|
|
assert not r2['paused']
|
|
|
|
def test_pause_torrent(self):
|
|
tid1 = self.add_torrent('test.torrent')
|
|
tid2 = self.add_torrent('test_torrent.file.torrent')
|
|
# Assert not paused
|
|
r1 = self.core.get_torrent_status(tid1, ['paused'])
|
|
assert not r1['paused']
|
|
r2 = self.core.get_torrent_status(tid2, ['paused'])
|
|
assert not r2['paused']
|
|
|
|
self.core.pause_torrent(tid2)
|
|
r1 = self.core.get_torrent_status(tid1, ['paused'])
|
|
assert not r1['paused']
|
|
r2 = self.core.get_torrent_status(tid2, ['paused'])
|
|
assert r2['paused']
|
|
|
|
def test_pause_torrent_list(self):
|
|
"""Backward compatibility for list of torrent_ids."""
|
|
torrent_id = self.add_torrent('test.torrent')
|
|
result = self.core.get_torrent_status(torrent_id, ['paused'])
|
|
assert not result['paused']
|
|
self.core.pause_torrent([torrent_id])
|
|
result = self.core.get_torrent_status(torrent_id, ['paused'])
|
|
assert result['paused']
|
|
|
|
def test_pause_torrents(self):
|
|
tid1 = self.add_torrent('test.torrent')
|
|
tid2 = self.add_torrent('test_torrent.file.torrent')
|
|
|
|
self.core.pause_torrents([tid1, tid2])
|
|
r1 = self.core.get_torrent_status(tid1, ['paused'])
|
|
assert r1['paused']
|
|
r2 = self.core.get_torrent_status(tid2, ['paused'])
|
|
assert r2['paused']
|
|
|
|
def test_pause_torrents_all(self):
|
|
"""With no torrent_ids param, pause all torrents"""
|
|
tid1 = self.add_torrent('test.torrent')
|
|
tid2 = self.add_torrent('test_torrent.file.torrent')
|
|
|
|
self.core.pause_torrents()
|
|
r1 = self.core.get_torrent_status(tid1, ['paused'])
|
|
assert r1['paused']
|
|
r2 = self.core.get_torrent_status(tid2, ['paused'])
|
|
assert r2['paused']
|
|
|
|
def test_prefetch_metadata_existing(self):
|
|
"""Check another call with same magnet returns existing deferred."""
|
|
magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
|
|
expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', b'')
|
|
|
|
def on_result(result):
|
|
assert result == expected
|
|
|
|
d = self.core.prefetch_magnet_metadata(magnet)
|
|
d.addCallback(on_result)
|
|
d2 = self.core.prefetch_magnet_metadata(magnet)
|
|
d2.addCallback(on_result)
|
|
self.clock.advance(30)
|
|
return defer.DeferredList([d, d2])
|
|
|
|
@pytest_twisted.inlineCallbacks
|
|
def test_remove_torrent(self):
|
|
options = {}
|
|
filename = common.get_test_data_file('test.torrent')
|
|
with open(filename, 'rb') as _file:
|
|
filedump = b64encode(_file.read())
|
|
torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options)
|
|
removed = self.core.remove_torrent(torrent_id, True)
|
|
assert removed
|
|
assert len(self.core.get_session_state()) == 0
|
|
|
|
def test_remove_torrent_invalid(self):
|
|
with pytest.raises(InvalidTorrentError):
|
|
self.core.remove_torrent(
|
|
'torrentidthatdoesntexist',
|
|
True,
|
|
)
|
|
|
|
@pytest_twisted.inlineCallbacks
|
|
def test_remove_torrents(self):
|
|
options = {}
|
|
filename = common.get_test_data_file('test.torrent')
|
|
with open(filename, 'rb') as _file:
|
|
filedump = b64encode(_file.read())
|
|
torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options)
|
|
|
|
filename2 = common.get_test_data_file('unicode_filenames.torrent')
|
|
with open(filename2, 'rb') as _file:
|
|
filedump = b64encode(_file.read())
|
|
torrent_id2 = yield self.core.add_torrent_file_async(
|
|
filename2, filedump, options
|
|
)
|
|
d = self.core.remove_torrents([torrent_id, torrent_id2], True)
|
|
|
|
def test_ret(val):
|
|
assert val == []
|
|
|
|
d.addCallback(test_ret)
|
|
|
|
def test_session_state(val):
|
|
assert len(self.core.get_session_state()) == 0
|
|
|
|
d.addCallback(test_session_state)
|
|
yield d
|
|
|
|
@pytest_twisted.inlineCallbacks
|
|
def test_remove_torrents_invalid(self):
|
|
options = {}
|
|
filename = common.get_test_data_file('test.torrent')
|
|
with open(filename, 'rb') as _file:
|
|
filedump = b64encode(_file.read())
|
|
torrent_id = yield self.core.add_torrent_file_async(
|
|
filename, filedump, options
|
|
)
|
|
val = yield self.core.remove_torrents(
|
|
['invalidid1', 'invalidid2', torrent_id], False
|
|
)
|
|
assert len(val) == 2
|
|
assert val[0] == ('invalidid1', 'torrent_id invalidid1 not in session.')
|
|
assert val[1] == ('invalidid2', 'torrent_id invalidid2 not in session.')
|
|
|
|
def test_get_session_status(self):
|
|
status = self.core.get_session_status(
|
|
['net.recv_tracker_bytes', 'net.sent_tracker_bytes']
|
|
)
|
|
assert isinstance(status, dict)
|
|
assert status['net.recv_tracker_bytes'] == 0
|
|
assert status['net.sent_tracker_bytes'] == 0
|
|
|
|
def test_get_session_status_all(self):
|
|
status = self.core.get_session_status([])
|
|
assert isinstance(status, dict)
|
|
assert 'upload_rate' in status
|
|
assert 'net.recv_bytes' in status
|
|
|
|
def test_get_session_status_depr(self):
|
|
status = self.core.get_session_status(['num_peers', 'num_unchoked'])
|
|
assert isinstance(status, dict)
|
|
assert status['num_peers'] == 0
|
|
assert status['num_unchoked'] == 0
|
|
|
|
def test_get_session_status_rates(self):
|
|
status = self.core.get_session_status(['upload_rate', 'download_rate'])
|
|
assert isinstance(status, dict)
|
|
assert status['upload_rate'] == 0
|
|
|
|
def test_get_session_status_ratio(self):
|
|
status = self.core.get_session_status(['write_hit_ratio', 'read_hit_ratio'])
|
|
assert isinstance(status, dict)
|
|
assert status['write_hit_ratio'] == 0.0
|
|
assert status['read_hit_ratio'] == 0.0
|
|
|
|
def test_get_free_space(self):
|
|
space = self.core.get_free_space('.')
|
|
assert isinstance(space, int)
|
|
assert space >= 0
|
|
assert self.core.get_free_space('/someinvalidpath') == -1
|
|
|
|
@pytest.mark.slow
|
|
def test_test_listen_port(self):
|
|
d = self.core.test_listen_port()
|
|
|
|
def result(r):
|
|
assert r in (True, False)
|
|
|
|
d.addCallback(result)
|
|
return d
|
|
|
|
def test_sanitize_filepath(self):
|
|
pathlist = {
|
|
'\\backslash\\path\\': 'backslash/path',
|
|
' single_file ': 'single_file',
|
|
'..': '',
|
|
'/../..../': '',
|
|
' Def ////ad./ / . . /b d /file': 'Def/ad./. ./b d/file',
|
|
'/ test /\\.. /.file/': 'test/.file',
|
|
'mytorrent/subfold/file1': 'mytorrent/subfold/file1',
|
|
'Torrent/folder/': 'Torrent/folder',
|
|
}
|
|
|
|
for key in pathlist:
|
|
assert (
|
|
deluge.core.torrent.sanitize_filepath(key, folder=False)
|
|
== pathlist[key]
|
|
)
|
|
|
|
assert (
|
|
deluge.core.torrent.sanitize_filepath(key, folder=True)
|
|
== pathlist[key] + '/'
|
|
)
|
|
|
|
def test_get_set_config_values(self):
|
|
assert self.core.get_config_values(['abc', 'foo']) == {'foo': None, 'abc': None}
|
|
assert self.core.get_config_value('foobar') is None
|
|
self.core.set_config({'abc': 'def', 'foo': 10, 'foobar': 'barfoo'})
|
|
assert self.core.get_config_values(['foo', 'abc']) == {'foo': 10, 'abc': 'def'}
|
|
assert self.core.get_config_value('foobar') == 'barfoo'
|
|
|
|
def test_read_only_config_keys(self):
|
|
key = 'max_upload_speed'
|
|
self.core.read_only_config_keys = [key]
|
|
|
|
old_value = self.core.get_config_value(key)
|
|
self.core.set_config({key: old_value + 10})
|
|
new_value = self.core.get_config_value(key)
|
|
assert old_value == new_value
|
|
|
|
self.core.read_only_config_keys = None
|
|
|
|
def test__create_peer_id(self):
|
|
assert self.core._create_peer_id('2.0.0') == '-DE200s-'
|
|
assert self.core._create_peer_id('2.0.0.dev15') == '-DE200D-'
|
|
assert self.core._create_peer_id('2.0.1rc1') == '-DE201r-'
|
|
assert self.core._create_peer_id('2.11.0b2') == '-DE2B0b-'
|
|
assert self.core._create_peer_id('2.4.12b2.dev3') == '-DE24CD-'
|