Convert all the twisted.trial tests to pytest_twisted. Also move off of unittest.TestCase as well. Seems there were several tests which weren't actually testing what they should, and also some code that wasn't doing what the broken test said it should.
Goals:
Remove twisted.trial tests
Move to pytest fixtures, rather than many classess and subclasses with setup and teardown functions
Move away from self.assertX to assert style tests
FIx broken tests
Going forward I think these should be the goals when adding/modifying tests:
* Don't use BaseTest or set_up tear_down methods any more. Fixtures should be used either in the test module/class, or make/improve the ones available in conftest.py
* For sure don't use unittest or twisted.trial, they mess up the pytest stuff.
* Prefer pytest_twisted.ensureDeferred with an async function over inlineCallbacks.
- I think the async function syntax is nicer, and it helps catch silly mistakes, e.g. await None is invalid, but yield None isn't, so if some function returns an unexpected thing we try to await on, it will be caught earlier. (I struggled debugging a test for quite a while, then caught it immediately when switching to the new syntax)
- Once the maybe_coroutine PR goes in, using the async syntax can also improve tracebacks when debugging tests.
Things that should probably be cleaned up going forward:
* Remove BaseTestCase
* Remove the subclasses like DaemonBase in favor of new fixtures.
* I think there are some other utility subclasses that could be removed too
* Perhaps use parameterization in the ui_entry tests, rather that the weird combination of subclasses and the set_var fixture I mixed in.
* Convert some of the callback stuff to pytest_twisted.ensureDeferred tests, just for nicer readability
Details relating to pytest fixtures conftest.py in root dir:
* https://github.com/pytest-dev/pytest/issues/5822#issuecomment-697331920
* https://docs.pytest.org/en/latest/deprecations.html#pytest-plugins-in-non-top-level-conftest-files
Closes: https://github.com/deluge-torrent/deluge/pull/354
292 lines
11 KiB
Python
292 lines
11 KiB
Python
#
|
||
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
|
||
# the additional special exception to link portions of this program with the OpenSSL library.
|
||
# See LICENSE for more details.
|
||
#
|
||
|
||
import os
|
||
import tempfile
|
||
from email.utils import formatdate
|
||
|
||
import pytest
|
||
import pytest_twisted
|
||
from twisted.internet import reactor
|
||
from twisted.internet.error import CannotListenError
|
||
from twisted.web.error import Error, PageRedirect
|
||
from twisted.web.http import NOT_MODIFIED
|
||
from twisted.web.resource import EncodingResourceWrapper, Resource
|
||
from twisted.web.server import GzipEncoderFactory, Site
|
||
from twisted.web.util import redirectTo
|
||
|
||
from deluge.httpdownloader import download_file
|
||
from deluge.log import setup_logger
|
||
|
||
temp_dir = tempfile.mkdtemp()
|
||
|
||
|
||
def fname(name):
|
||
return os.path.join(temp_dir, name)
|
||
|
||
|
||
class RedirectResource(Resource):
|
||
def render(self, request):
|
||
url = self.get_url().encode('utf8')
|
||
return redirectTo(url, request)
|
||
|
||
|
||
class RenameResource(Resource):
|
||
def render(self, request):
|
||
filename = request.args.get(b'filename', [b'renamed_file'])[0]
|
||
request.setHeader(b'Content-Type', b'text/plain')
|
||
request.setHeader(b'Content-Disposition', b'attachment; filename=' + filename)
|
||
return b'This file should be called ' + filename
|
||
|
||
|
||
class AttachmentResource(Resource):
|
||
def render(self, request):
|
||
content_type = b'text/plain'
|
||
charset = request.getHeader(b'content-charset')
|
||
if charset:
|
||
content_type += b'; charset=' + charset
|
||
request.setHeader(b'Content-Type', content_type)
|
||
request.setHeader(b'Content-Disposition', b'attachment')
|
||
append = request.getHeader(b'content-append') or b''
|
||
content = 'Attachment with no filename set{}'.format(append.decode('utf8'))
|
||
return (
|
||
content.encode(charset.decode('utf8'))
|
||
if charset
|
||
else content.encode('utf8')
|
||
)
|
||
|
||
|
||
class TorrentResource(Resource):
|
||
def render(self, request):
|
||
content_type = b'application/x-bittorrent'
|
||
charset = request.getHeader(b'content-charset')
|
||
if charset:
|
||
content_type += b'; charset=' + charset
|
||
request.setHeader(b'Content-Type', content_type)
|
||
request.setHeader(b'Content-Disposition', b'attachment; filename=test.torrent')
|
||
return 'Binary attachment ignore charset 世丕且\n'.encode()
|
||
|
||
|
||
class CookieResource(Resource):
|
||
def render(self, request):
|
||
request.setHeader(b'Content-Type', b'text/plain')
|
||
if request.getCookie(b'password') is None:
|
||
return b'Password cookie not set!'
|
||
|
||
if request.getCookie(b'password') == b'deluge':
|
||
return b'COOKIE MONSTER!'
|
||
|
||
return request.getCookie('password')
|
||
|
||
|
||
class GzipResource(Resource):
|
||
def getChild(self, path, request): # NOQA: N802
|
||
return EncodingResourceWrapper(self, [GzipEncoderFactory()])
|
||
|
||
def render(self, request):
|
||
message = request.args.get(b'msg', [b'EFFICIENCY!'])[0]
|
||
request.setHeader(b'Content-Type', b'text/plain')
|
||
return message
|
||
|
||
|
||
class PartialDownloadResource(Resource):
|
||
def __init__(self, *args, **kwargs):
|
||
Resource.__init__(self)
|
||
self.render_count = 0
|
||
|
||
def render(self, request):
|
||
# encoding = request.requestHeaders._rawHeaders.get('accept-encoding', None)
|
||
if self.render_count == 0:
|
||
request.setHeader(b'content-length', b'5')
|
||
else:
|
||
request.setHeader(b'content-length', b'3')
|
||
|
||
# if encoding == "deflate, gzip, x-gzip":
|
||
request.write('abc')
|
||
self.render_count += 1
|
||
return ''
|
||
|
||
|
||
class TopLevelResource(Resource):
|
||
def __init__(self):
|
||
Resource.__init__(self)
|
||
self.putChild(b'cookie', CookieResource())
|
||
self.putChild(b'gzip', GzipResource())
|
||
self.redirect_rsrc = RedirectResource()
|
||
self.putChild(b'redirect', self.redirect_rsrc)
|
||
self.putChild(b'rename', RenameResource())
|
||
self.putChild(b'attachment', AttachmentResource())
|
||
self.putChild(b'torrent', TorrentResource())
|
||
self.putChild(b'partial', PartialDownloadResource())
|
||
|
||
def getChild(self, path, request): # NOQA: N802
|
||
if not path:
|
||
return self
|
||
else:
|
||
return Resource.getChild(self, path, request)
|
||
|
||
def render(self, request):
|
||
if request.getHeader(b'If-Modified-Since'):
|
||
request.setResponseCode(NOT_MODIFIED)
|
||
return b'<h1>Deluge HTTP Downloader tests webserver here</h1>'
|
||
|
||
|
||
class TestDownloadFile:
|
||
def get_url(self, path=''):
|
||
return 'http://localhost:%d/%s' % (self.listen_port, path)
|
||
|
||
@pytest_twisted.async_yield_fixture(autouse=True)
|
||
async def setUp(self, request): # NOQA
|
||
self = request.instance
|
||
setup_logger('warning', fname('log_file'))
|
||
self.website = Site(TopLevelResource())
|
||
self.listen_port = 51242
|
||
self.website.resource.redirect_rsrc.get_url = self.get_url
|
||
for dummy in range(10):
|
||
try:
|
||
self.webserver = reactor.listenTCP(self.listen_port, self.website)
|
||
except CannotListenError as ex:
|
||
error = ex
|
||
self.listen_port += 1
|
||
else:
|
||
break
|
||
else:
|
||
raise error
|
||
|
||
yield
|
||
|
||
await self.webserver.stopListening()
|
||
|
||
def assert_contains(self, filename, contents):
|
||
with open(filename, encoding='utf8') as _file:
|
||
try:
|
||
assert _file.read() == contents
|
||
except Exception as ex:
|
||
pytest.fail(ex)
|
||
return filename
|
||
|
||
def assert_not_contains(self, filename, contents, file_mode=''):
|
||
with open(filename, encoding='utf8') as _file:
|
||
try:
|
||
assert _file.read() != contents
|
||
except Exception as ex:
|
||
pytest.fail(ex)
|
||
return filename
|
||
|
||
@pytest_twisted.ensureDeferred
|
||
async def test_download(self):
|
||
filename = await download_file(self.get_url(), fname('index.html'))
|
||
assert filename == fname('index.html')
|
||
|
||
@pytest_twisted.ensureDeferred
|
||
async def test_download_without_required_cookies(self):
|
||
url = self.get_url('cookie')
|
||
filename = await download_file(url, fname('none'))
|
||
self.assert_contains(filename, 'Password cookie not set!')
|
||
|
||
@pytest_twisted.ensureDeferred
|
||
async def test_download_with_required_cookies(self):
|
||
url = self.get_url('cookie')
|
||
cookie = {'cookie': 'password=deluge'}
|
||
filename = await download_file(url, fname('monster'), headers=cookie)
|
||
assert filename == fname('monster')
|
||
self.assert_contains(filename, 'COOKIE MONSTER!')
|
||
|
||
@pytest_twisted.ensureDeferred
|
||
async def test_download_with_rename(self):
|
||
url = self.get_url('rename?filename=renamed')
|
||
filename = await download_file(url, fname('original'))
|
||
assert filename == fname('renamed')
|
||
self.assert_contains(filename, 'This file should be called renamed')
|
||
|
||
@pytest_twisted.ensureDeferred
|
||
async def test_download_with_rename_exists(self):
|
||
open(fname('renamed'), 'w').close()
|
||
url = self.get_url('rename?filename=renamed')
|
||
filename = await download_file(url, fname('original'))
|
||
assert filename == fname('renamed-1')
|
||
self.assert_contains(filename, 'This file should be called renamed')
|
||
|
||
@pytest_twisted.ensureDeferred
|
||
async def test_download_with_rename_sanitised(self):
|
||
url = self.get_url('rename?filename=/etc/passwd')
|
||
filename = await download_file(url, fname('original'))
|
||
assert filename == fname('passwd')
|
||
self.assert_contains(filename, 'This file should be called /etc/passwd')
|
||
|
||
@pytest_twisted.ensureDeferred
|
||
async def test_download_with_attachment_no_filename(self):
|
||
url = self.get_url('attachment')
|
||
filename = await download_file(url, fname('original'))
|
||
assert filename == fname('original')
|
||
self.assert_contains(filename, 'Attachment with no filename set')
|
||
|
||
@pytest_twisted.ensureDeferred
|
||
async def test_download_with_rename_prevented(self):
|
||
url = self.get_url('rename?filename=spam')
|
||
filename = await download_file(url, fname('forced'), force_filename=True)
|
||
assert filename == fname('forced')
|
||
self.assert_contains(filename, 'This file should be called spam')
|
||
|
||
@pytest_twisted.ensureDeferred
|
||
async def test_download_with_gzip_encoding(self):
|
||
url = self.get_url('gzip?msg=success')
|
||
filename = await download_file(url, fname('gzip_encoded'))
|
||
self.assert_contains(filename, 'success')
|
||
|
||
@pytest_twisted.ensureDeferred
|
||
async def test_download_with_gzip_encoding_disabled(self):
|
||
url = self.get_url('gzip?msg=unzip')
|
||
filename = await download_file(
|
||
url, fname('gzip_encoded'), allow_compression=False
|
||
)
|
||
self.assert_contains(filename, 'unzip')
|
||
|
||
@pytest_twisted.ensureDeferred
|
||
async def test_page_redirect_unhandled(self):
|
||
url = self.get_url('redirect')
|
||
with pytest.raises(PageRedirect):
|
||
await download_file(url, fname('none'), handle_redirects=False)
|
||
|
||
@pytest_twisted.ensureDeferred
|
||
async def test_page_redirect(self):
|
||
url = self.get_url('redirect')
|
||
filename = await download_file(url, fname('none'), handle_redirects=True)
|
||
assert filename == fname('none')
|
||
|
||
@pytest_twisted.ensureDeferred
|
||
async def test_page_not_found(self):
|
||
with pytest.raises(Error):
|
||
await download_file(self.get_url('page/not/found'), fname('none'))
|
||
|
||
@pytest.mark.xfail(reason="Doesn't seem like httpdownloader ever implemented this.")
|
||
@pytest_twisted.ensureDeferred
|
||
async def test_page_not_modified(self):
|
||
headers = {'If-Modified-Since': formatdate(usegmt=True)}
|
||
with pytest.raises(Error) as exc_info:
|
||
await download_file(self.get_url(), fname('index.html'), headers=headers)
|
||
assert exc_info.value.status == NOT_MODIFIED
|
||
|
||
@pytest_twisted.ensureDeferred
|
||
async def test_download_text_reencode_charset(self):
|
||
"""Re-encode as UTF-8 specified charset for text content-type header"""
|
||
url = self.get_url('attachment')
|
||
filepath = fname('test.txt')
|
||
headers = {'content-charset': 'Windows-1251', 'content-append': 'бвгде'}
|
||
filename = await download_file(url, filepath, headers=headers)
|
||
assert filename == filepath
|
||
self.assert_contains(filename, 'Attachment with no filename setбвгде')
|
||
|
||
@pytest_twisted.ensureDeferred
|
||
async def test_download_binary_ignore_charset(self):
|
||
"""Ignore charset for binary content-type header e.g. torrent files"""
|
||
url = self.get_url('torrent')
|
||
headers = {'content-charset': 'Windows-1251'}
|
||
filepath = fname('test.torrent')
|
||
filename = await download_file(url, fname('test.torrent'), headers=headers)
|
||
assert filename == filepath
|
||
self.assert_contains(filename, 'Binary attachment ignore charset 世丕且\n')
|