mirror of
https://github.com/qgis/QGIS.git
synced 2025-02-27 00:33:48 -05:00
261 lines
6.6 KiB
Python
261 lines
6.6 KiB
Python
"""Common functionality."""
|
|
import os.path
|
|
import tempfile
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
|
|
import six
|
|
|
|
from nose2.compat import unittest
|
|
from nose2 import discover, util
|
|
|
|
|
|
HERE = os.path.abspath(os.path.dirname(__file__))
|
|
SUPPORT = os.path.join(HERE, 'functional', 'support')
|
|
|
|
|
|
class TestCase(unittest.TestCase):
|
|
|
|
"""TestCase extension.
|
|
|
|
If the class variable _RUN_IN_TEMP is True (default: False), tests will be
|
|
performed in a temporary directory, which is deleted afterwards.
|
|
"""
|
|
_RUN_IN_TEMP = False
|
|
|
|
def setUp(self):
|
|
super(TestCase, self).setUp()
|
|
|
|
if self._RUN_IN_TEMP:
|
|
self._orig_dir = os.getcwd()
|
|
work_dir = self._work_dir = tempfile.mkdtemp()
|
|
os.chdir(self._work_dir)
|
|
# Make sure it's possible to import modules from current directory
|
|
sys.path.insert(0, work_dir)
|
|
|
|
def tearDown(self):
|
|
super(TestCase, self).tearDown()
|
|
|
|
if self._RUN_IN_TEMP:
|
|
os.chdir(self._orig_dir)
|
|
shutil.rmtree(self._work_dir, ignore_errors=True)
|
|
|
|
|
|
class FunctionalTestCase(unittest.TestCase):
|
|
tags = ['functional']
|
|
|
|
def assertTestRunOutputMatches(self, proc, stdout=None, stderr=None):
|
|
cmd_stdout, cmd_stderr = None, None
|
|
try:
|
|
cmd_stdout, cmd_stderr = self._output[proc.pid]
|
|
except AttributeError:
|
|
self._output = {}
|
|
except KeyError:
|
|
pass
|
|
if cmd_stdout is None:
|
|
cmd_stdout, cmd_stderr = proc.communicate()
|
|
self._output[proc.pid] = cmd_stdout, cmd_stderr
|
|
testf = self.assertRegex if hasattr(self, 'assertRegex') \
|
|
else self.assertRegexpMatches
|
|
if stdout:
|
|
testf(util.safe_decode(cmd_stdout), stdout)
|
|
if stderr:
|
|
testf(util.safe_decode(cmd_stderr), stderr)
|
|
|
|
def runIn(self, testdir, *args, **kw):
|
|
return run_nose2(*args, cwd=testdir, **kw)
|
|
|
|
def runModuleAsMain(self, testmodule):
|
|
return run_module_as_main(testmodule)
|
|
|
|
|
|
class _FakeEventBase(object):
|
|
|
|
"""Baseclass for fake Events."""
|
|
|
|
def __init__(self):
|
|
self.handled = False
|
|
self.version = '0.1'
|
|
self.metadata = {}
|
|
|
|
|
|
class FakeHandleFileEvent(_FakeEventBase):
|
|
|
|
"""Fake HandleFileEvent."""
|
|
|
|
def __init__(self, name):
|
|
super(FakeHandleFileEvent, self).__init__()
|
|
|
|
self.loader = Stub() # FIXME
|
|
self.name = name
|
|
self.path = os.path.split(name)[1]
|
|
self.extraTests = []
|
|
|
|
|
|
class FakeStartTestEvent(_FakeEventBase):
|
|
|
|
"""Fake StartTestEvent."""
|
|
|
|
def __init__(self, test):
|
|
super(FakeStartTestEvent, self).__init__()
|
|
self.test = test
|
|
self.result = test.defaultTestResult()
|
|
import time
|
|
self.startTime = time.time()
|
|
|
|
|
|
class FakeLoadFromNameEvent(_FakeEventBase):
|
|
|
|
"""Fake LoadFromNameEvent."""
|
|
|
|
def __init__(self, name):
|
|
super(FakeLoadFromNameEvent, self).__init__()
|
|
self.name = name
|
|
|
|
|
|
class FakeLoadFromNamesEvent(_FakeEventBase):
|
|
|
|
"""Fake LoadFromNamesEvent."""
|
|
|
|
def __init__(self, names):
|
|
super(FakeLoadFromNamesEvent, self).__init__()
|
|
self.names = names
|
|
|
|
|
|
class FakeStartTestRunEvent(_FakeEventBase):
|
|
|
|
"""Fake StartTestRunEvent"""
|
|
|
|
def __init__(self, runner=None, suite=None, result=None, startTime=None,
|
|
executeTests=None):
|
|
super(FakeStartTestRunEvent, self).__init__()
|
|
self.suite = suite
|
|
self.runner = runner
|
|
self.result = result
|
|
self.startTime = startTime
|
|
self.executeTests = executeTests
|
|
|
|
|
|
class Stub(object):
|
|
|
|
"""Stub object for use in tests"""
|
|
|
|
def __getattr__(self, attr):
|
|
return Stub()
|
|
|
|
def __call__(self, *arg, **kw):
|
|
return Stub()
|
|
|
|
|
|
def support_file(*path_parts):
|
|
return os.path.abspath(os.path.join(SUPPORT, *path_parts))
|
|
|
|
|
|
def run_nose2(*nose2_args, **nose2_kwargs):
|
|
if 'cwd' in nose2_kwargs:
|
|
cwd = nose2_kwargs.pop('cwd')
|
|
if not os.path.isabs(cwd):
|
|
nose2_kwargs['cwd'] = support_file(cwd)
|
|
return NotReallyAProc(nose2_args, **nose2_kwargs)
|
|
|
|
|
|
def run_module_as_main(test_module):
|
|
if not os.path.isabs(test_module):
|
|
test_module = support_file(test_module)
|
|
return subprocess.Popen([sys.executable, test_module],
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
|
|
|
|
class NotReallyAProc(object):
|
|
|
|
def __init__(self, args, cwd=None, **kwargs):
|
|
self.args = args
|
|
self.chdir = cwd
|
|
self.kwargs = kwargs
|
|
self.result = None
|
|
|
|
def __enter__(self):
|
|
self._stdout = sys.__stdout__
|
|
self._stderr = sys.__stderr__
|
|
self.cwd = os.getcwd()
|
|
if self.chdir:
|
|
os.chdir(self.chdir)
|
|
self.stdout = sys.stdout = sys.__stdout__ = six.StringIO()
|
|
self.stderr = sys.stderr = sys.__stderr__ = six.StringIO()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
sys.stdout = sys.__stdout__ = self._stdout
|
|
sys.stderr = sys.__stderr__ = self._stderr
|
|
if self.chdir:
|
|
os.chdir(self.cwd)
|
|
return False
|
|
|
|
def communicate(self):
|
|
with self:
|
|
try:
|
|
self.result = discover(
|
|
argv=('nose2',) + self.args, exit=False,
|
|
**self.kwargs)
|
|
except SystemExit as e:
|
|
pass
|
|
return self.stdout.getvalue(), self.stderr.getvalue()
|
|
|
|
@property
|
|
def pid(self):
|
|
return id(self)
|
|
|
|
def poll(self):
|
|
if self.result is None:
|
|
return 1
|
|
return not self.result.result.wasSuccessful()
|
|
|
|
|
|
class RedirectStdStreams(object):
|
|
|
|
"""
|
|
Context manager that replaces the stdin/out streams with StringIO
|
|
buffers.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.stdout = six.StringIO()
|
|
self.stderr = six.StringIO()
|
|
|
|
def __enter__(self):
|
|
self.old_stdout, self.old_stderr = sys.stdout, sys.stderr
|
|
self.old_stdout.flush()
|
|
self.old_stderr.flush()
|
|
sys.stdout, sys.stderr = self.stdout, self.stderr
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
self.stdout.flush()
|
|
self.stderr.flush()
|
|
sys.stdout = self.old_stdout
|
|
sys.stderr = self.old_stderr
|
|
|
|
|
|
# mock multprocessing Connection
|
|
class Conn(object):
|
|
|
|
def __init__(self, items):
|
|
self.items = items
|
|
self.sent = []
|
|
self.closed = False
|
|
|
|
def recv(self):
|
|
if self.closed:
|
|
raise EOFError("closed")
|
|
try:
|
|
return self.items.pop(0)
|
|
except:
|
|
raise EOFError("EOF")
|
|
|
|
def send(self, item):
|
|
self.sent.append(item)
|
|
|
|
def close(self):
|
|
self.closed = True
|