Merge pull request #3719 from elpaso/auth_tests_more_master

Auth tests more master
This commit is contained in:
Alessandro Pasotti 2016-11-04 11:36:18 +01:00 committed by GitHub
commit 16a88e6c36
6 changed files with 308 additions and 24 deletions

View File

@ -160,5 +160,7 @@ IF (WITH_SERVER)
ADD_PYTHON_TEST(PyQgsServerAccessControl test_qgsserver_accesscontrol.py)
ADD_PYTHON_TEST(PyQgsServerWFST test_qgsserver_wfst.py)
ADD_PYTHON_TEST(PyQgsOfflineEditingWFS test_offline_editing_wfs.py)
ADD_PYTHON_TEST(PyQgsAuthManagerEndpointTest test_authmanager_endpoint.py)
ADD_PYTHON_TEST(PyQgsAuthManagerPasswordOWSTest test_authmanager_password_ows.py)
#ADD_PYTHON_TEST(PyQgsAuthManagerPKIOWSTest test_authmanager_pki_ows.py)
ADD_PYTHON_TEST(PyQgsAuthManagerPKIPostgresTest test_authmanager_pki_postgres.py)
ENDIF (WITH_SERVER)

View File

@ -3,7 +3,7 @@
QGIS Server HTTP wrapper
This script launches a QGIS Server listening on port 8081 or on the port
specified on the environment variable QGIS_SERVER_PORT
specified on the environment variable QGIS_SERVER_PORT.
QGIS_SERVER_HOST (defaults to 127.0.0.1)
For testing purposes, HTTP Basic can be enabled by setting the following
@ -13,6 +13,21 @@ environment variables:
* QGIS_SERVER_USERNAME (default ="username")
* QGIS_SERVER_PASSWORD (default ="password")
PKI authentication with HTTPS can be enabled with:
* QGIS_SERVER_PKI_CERTIFICATE (server certificate)
* QGIS_SERVER_PKI_KEY (server private key)
* QGIS_SERVER_PKI_AUTHORITY (root CA)
* QGIS_SERVER_PKI_USERNAME (valid username)
Sample run:
QGIS_SERVER_PKI_USERNAME=Gerardus QGIS_SERVER_PORT=47547 QGIS_SERVER_HOST=localhost \
QGIS_SERVER_PKI_KEY=/home/$USER/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_key.pem \
QGIS_SERVER_PKI_CERTIFICATE=/home/$USER/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_cert.pem \
QGIS_SERVER_PKI_AUTHORITY=/home/$USER/dev/QGIS/tests/testdata/auth_system/certs_keys/chains_subissuer-issuer-root_issuer2-root2.pem \
python3 /home/$USER/dev/QGIS/tests/src/python/qgis_wrapped_server.py
.. note:: This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
@ -32,6 +47,7 @@ __revision__ = '$Format:%H$'
import os
import sys
import signal
import ssl
import urllib.parse
from http.server import BaseHTTPRequestHandler, HTTPServer
from qgis.core import QgsApplication
@ -39,6 +55,21 @@ from qgis.server import QgsServer
QGIS_SERVER_PORT = int(os.environ.get('QGIS_SERVER_PORT', '8081'))
QGIS_SERVER_HOST = os.environ.get('QGIS_SERVER_HOST', '127.0.0.1')
# PKI authentication
QGIS_SERVER_PKI_CERTIFICATE = os.environ.get('QGIS_SERVER_PKI_CERTIFICATE')
QGIS_SERVER_PKI_KEY = os.environ.get('QGIS_SERVER_PKI_KEY')
QGIS_SERVER_PKI_AUTHORITY = os.environ.get('QGIS_SERVER_PKI_AUTHORITY')
QGIS_SERVER_PKI_USERNAME = os.environ.get('QGIS_SERVER_PKI_USERNAME')
# Check if PKI - https is enabled
https = (QGIS_SERVER_PKI_CERTIFICATE is not None and
os.path.isfile(QGIS_SERVER_PKI_CERTIFICATE) and
QGIS_SERVER_PKI_KEY is not None and
os.path.isfile(QGIS_SERVER_PKI_KEY) and
QGIS_SERVER_PKI_AUTHORITY is not None and
os.path.isfile(QGIS_SERVER_PKI_AUTHORITY) and
QGIS_SERVER_PKI_USERNAME)
qgs_app = QgsApplication([], False)
qgs_server = QgsServer()
@ -75,6 +106,8 @@ class Handler(BaseHTTPRequestHandler):
for k, v in self.headers.items():
qgs_server.putenv('HTTP_%s' % k.replace(' ', '-').replace('-', '_').replace(' ', '-').upper(), v)
qgs_server.putenv('SERVER_PORT', str(self.server.server_port))
if https:
qgs_server.putenv('HTTPS', 'ON')
qgs_server.putenv('SERVER_NAME', self.server.server_name)
qgs_server.putenv('REQUEST_URI', self.path)
parsed_path = urllib.parse.urlparse(self.path)
@ -101,8 +134,16 @@ class Handler(BaseHTTPRequestHandler):
if __name__ == '__main__':
server = HTTPServer((QGIS_SERVER_HOST, QGIS_SERVER_PORT), Handler)
print('Starting server on %s:%s, use <Ctrl-C> to stop' %
(QGIS_SERVER_HOST, server.server_port), flush=True)
if https:
server.socket = ssl.wrap_socket(server.socket,
certfile=QGIS_SERVER_PKI_CERTIFICATE,
keyfile=QGIS_SERVER_PKI_KEY,
ca_certs=QGIS_SERVER_PKI_AUTHORITY,
cert_reqs=ssl.CERT_REQUIRED,
server_side=True,
ssl_version=ssl.PROTOCOL_TLSv1)
print('Starting server on %s://%s:%s, use <Ctrl-C> to stop' %
('https' if https else 'http', QGIS_SERVER_HOST, server.server_port), flush=True)
def signal_handler(signal, frame):
global qgs_app

View File

@ -8,7 +8,7 @@ and QGIS Server WFS/WMS that check if QGIS can use a stored auth manager auth
configuration to access an HTTP Basic protected endpoint.
From build dir, run: ctest -R PyQgsAuthManagerEndpointTest -V
From build dir, run: ctest -R PyQgsAuthManagerPasswordOWSTest -V
.. note:: This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -30,7 +30,6 @@ __copyright__ = 'Copyright 2016, The QGIS Project'
# This will get replaced with a git SHA1 when you do a git archive
__revision__ = '$Format:%H$'
from urllib.parse import quote
from shutil import rmtree
from utilities import unitTestDataPath, waitServer
@ -45,11 +44,10 @@ from qgis.testing import (
unittest,
)
try:
QGIS_SERVER_ENDPOINT_PORT = os.environ['QGIS_SERVER_ENDPOINT_PORT']
except:
QGIS_SERVER_ENDPOINT_PORT = '0' # Auto
QGIS_SERVER_ENDPOINT_PORT = '0' # Auto
QGIS_AUTH_DB_DIR_PATH = tempfile.mkdtemp()
@ -74,7 +72,7 @@ class TestAuthManager(unittest.TestCase):
except KeyError:
pass
cls.testdata_path = unitTestDataPath('qgis_server') + '/'
cls.project_path = quote(cls.testdata_path + "test_project.qgs")
cls.project_path = cls.testdata_path + "test_project.qgs"
# Enable auth
#os.environ['QGIS_AUTH_PASSWORD_FILE'] = QGIS_AUTH_PASSWORD_FILE
authm = QgsAuthManager.instance()
@ -86,26 +84,30 @@ class TestAuthManager(unittest.TestCase):
cls.auth_config.setConfig('username', cls.username)
cls.auth_config.setConfig('password', cls.password)
assert (authm.storeAuthenticationConfig(cls.auth_config)[0])
cls.hostname = '127.0.0.1'
cls.protocol = 'http'
os.environ['QGIS_SERVER_HTTP_BASIC_AUTH'] = '1'
os.environ['QGIS_SERVER_USERNAME'] = cls.username
os.environ['QGIS_SERVER_PASSWORD'] = cls.password
os.environ['QGIS_SERVER_PORT'] = str(cls.port)
os.environ['QGIS_SERVER_HOST'] = cls.hostname
server_path = os.path.dirname(os.path.realpath(__file__)) + \
'/qgis_wrapped_server.py'
cls.server = subprocess.Popen([sys.executable, server_path],
env=os.environ, stdout=subprocess.PIPE)
line = cls.server.stdout.readline()
cls.port = int(re.findall(b':(\d+)', line)[0])
assert cls.port != 0
# Wait for the server process to start
assert waitServer('http://127.0.0.1:%s' % cls.port), "Server is not responding!"
assert waitServer('%s://%s:%s' % (cls.protocol, cls.hostname, cls.port)), "Server is not responding! %s://%s:%s" % (cls.protocol, cls.hostname, cls.port)
@classmethod
def tearDownClass(cls):
"""Run after all tests"""
cls.server.terminate()
cls.server.wait()
rmtree(QGIS_AUTH_DB_DIR_PATH)
del cls.server
@ -127,7 +129,7 @@ class TestAuthManager(unittest.TestCase):
parms = {
'srsname': 'EPSG:4326',
'typename': type_name,
'url': 'http://127.0.0.1:%s/?map=%s' % (cls.port, cls.project_path),
'url': '%s://%s:%s/?map=%s' % (cls.protocol, cls.hostname, cls.port, cls.project_path),
'version': 'auto',
'table': '',
}
@ -146,12 +148,12 @@ class TestAuthManager(unittest.TestCase):
layer_name = 'wms_' + layers.replace(',', '')
parms = {
'crs': 'EPSG:4326',
'url': 'http://127.0.0.1:%s/?map=%s' % (cls.port, cls.project_path),
'format': 'image/png',
'url': '%s://%s:%s/?map=%s' % (cls.protocol, cls.hostname, cls.port, cls.project_path),
# This is needed because of a really weird implementation in QGIS Server, that
# replaces _ in the the real layer name with spaces
'layers': urllib.parse.quote(layers).replace('_', ' '),
'layers': urllib.parse.quote(layers.replace('_', ' ')),
'styles': '',
'version': 'auto',
#'sql': '',
}
if authcfg is not None:
@ -173,7 +175,7 @@ class TestAuthManager(unittest.TestCase):
"""
Access the HTTP Basic protected layer with no credentials
"""
wfs_layer = self._getWFSLayer('testlayer_èé')
wfs_layer = self._getWFSLayer('testlayer èé')
self.assertFalse(wfs_layer.isValid())
wms_layer = self._getWMSLayer('testlayer_èé')
self.assertFalse(wms_layer.isValid())

View File

@ -0,0 +1,237 @@
# -*- coding: utf-8 -*-
"""
Tests for auth manager PKI access to postgres.
This is an integration test for QGIS Desktop Auth Manager postgres provider that
checks if QGIS can use a stored auth manager auth configuration to access
a PKI protected postgres.
Configuration form the environment:
* QGIS_POSTGRES_SERVER_PORT (default: 55432)
* QGIS_POSTGRES_EXECUTABLE_PATH (default: /usr/lib/postgresql/9.4/bin)
From build dir, run: ctest -R PyQgsAuthManagerPKIPostgresTest -V
or, if your postgresql path differs from the default:
QGIS_POSTGRES_EXECUTABLE_PATH=/usr/lib/postgresql/<your_version_goes_here>/bin \
ctest -R PyQgsAuthManagerPKIPostgresTest -V
.. note:: This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
"""
import os
import time
import signal
import stat
import subprocess
import tempfile
from shutil import rmtree
from utilities import unitTestDataPath
from qgis.core import (
QgsAuthManager,
QgsAuthMethodConfig,
QgsVectorLayer,
QgsDataSourceUri,
QgsWkbTypes,
)
from qgis.PyQt.QtNetwork import QSslCertificate
from qgis.testing import (
start_app,
unittest,
)
__author__ = 'Alessandro Pasotti'
__date__ = '25/10/2016'
__copyright__ = 'Copyright 2016, The QGIS Project'
# This will get replaced with a git SHA1 when you do a git archive
__revision__ = '$Format:%H$'
QGIS_POSTGRES_SERVER_PORT = os.environ.get('QGIS_POSTGRES_SERVER_PORT', '55432')
QGIS_POSTGRES_EXECUTABLE_PATH = os.environ.get('QGIS_POSTGRES_EXECUTABLE_PATH', '/usr/lib/postgresql/9.4/bin')
assert os.path.exists(QGIS_POSTGRES_EXECUTABLE_PATH)
QGIS_AUTH_DB_DIR_PATH = tempfile.mkdtemp()
# Postgres test path
QGIS_PG_TEST_PATH = tempfile.mkdtemp()
os.environ['QGIS_AUTH_DB_DIR_PATH'] = QGIS_AUTH_DB_DIR_PATH
qgis_app = start_app()
QGIS_POSTGRES_CONF_TEMPLATE = """
hba_file = '%(tempfolder)s/pg_hba.conf'
listen_addresses = '*'
port = %(port)s
max_connections = 100
unix_socket_directories = '%(tempfolder)s'
ssl = true
ssl_ciphers = 'DEFAULT:!LOW:!EXP:!MD5:@STRENGTH' # allowed SSL ciphers
ssl_cert_file = '%(server_cert)s'
ssl_key_file = '%(server_key)s'
ssl_ca_file = '%(sslrootcert_path)s'
password_encryption = on
"""
QGIS_POSTGRES_HBA_TEMPLATE = """
hostssl all all 0.0.0.0/0 cert clientcert=1
hostssl all all ::1/0 cert clientcert=1
host all all 127.0.0.1/32 trust
host all all ::1/32 trust
"""
class TestAuthManager(unittest.TestCase):
@classmethod
def setUpAuth(cls):
"""Run before all tests and set up authentication"""
authm = QgsAuthManager.instance()
assert (authm.setMasterPassword('masterpassword', True))
cls.pg_conf = os.path.join(cls.tempfolder, 'postgresql.conf')
cls.pg_hba = os.path.join(cls.tempfolder, 'pg_hba.conf')
# Client side
cls.sslrootcert_path = os.path.join(cls.certsdata_path, 'chains_subissuer-issuer-root_issuer2-root2.pem')
cls.sslcert = os.path.join(cls.certsdata_path, 'gerardus_cert.pem')
cls.sslkey = os.path.join(cls.certsdata_path, 'gerardus_key.pem')
assert os.path.isfile(cls.sslcert)
assert os.path.isfile(cls.sslkey)
assert os.path.isfile(cls.sslrootcert_path)
os.chmod(cls.sslcert, stat.S_IRUSR)
os.chmod(cls.sslkey, stat.S_IRUSR)
os.chmod(cls.sslrootcert_path, stat.S_IRUSR)
cls.auth_config = QgsAuthMethodConfig("PKI-Paths")
cls.auth_config.setConfig('certpath', cls.sslcert)
cls.auth_config.setConfig('keypath', cls.sslkey)
cls.auth_config.setName('test_pki_auth_config')
cls.username = 'Gerardus'
cls.sslrootcert = QSslCertificate.fromPath(cls.sslrootcert_path)
assert cls.sslrootcert is not None
authm.storeCertAuthorities(cls.sslrootcert)
authm.rebuildCaCertsCache()
authm.rebuildTrustedCaCertsCache()
authm.rebuildCertTrustCache()
assert (authm.storeAuthenticationConfig(cls.auth_config)[0])
assert cls.auth_config.isValid()
# Server side
cls.server_cert = os.path.join(cls.certsdata_path, 'localhost_ssl_cert.pem')
cls.server_key = os.path.join(cls.certsdata_path, 'localhost_ssl_key.pem')
cls.server_rootcert = cls.sslrootcert_path
os.chmod(cls.server_cert, stat.S_IRUSR)
os.chmod(cls.server_key, stat.S_IRUSR)
os.chmod(cls.server_rootcert, stat.S_IRUSR)
# Place conf in the data folder
with open(cls.pg_conf, 'w+') as f:
f.write(QGIS_POSTGRES_CONF_TEMPLATE % {
'port': cls.port,
'tempfolder': cls.tempfolder,
'server_cert': cls.server_cert,
'server_key': cls.server_key,
'sslrootcert_path': cls.sslrootcert_path,
})
with open(cls.pg_hba, 'w+') as f:
f.write(QGIS_POSTGRES_HBA_TEMPLATE)
@classmethod
def setUpClass(cls):
"""Run before all tests:
Creates an auth configuration"""
cls.port = QGIS_POSTGRES_SERVER_PORT
cls.dbname = 'test_pki'
cls.tempfolder = QGIS_PG_TEST_PATH
cls.certsdata_path = os.path.join(unitTestDataPath('auth_system'), 'certs_keys')
cls.hostname = 'localhost'
cls.data_path = os.path.join(cls.tempfolder, 'data')
os.mkdir(cls.data_path)
cls.setUpAuth()
subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'initdb'), '-D', cls.data_path])
cls.server = subprocess.Popen([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'postgres'), '-D',
cls.data_path, '-c',
"config_file=%s" % cls.pg_conf],
env=os.environ,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# Wait max 10 secs for the server to start
end = time.time() + 10
while True:
line = cls.server.stderr.readline()
print(line)
if line.find(b"database system is ready to accept") != -1:
break
if time.time() > end:
raise Exception("Timeout connecting to postgresql")
# Create a DB
subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'createdb'), '-h', 'localhost', '-p', cls.port, 'test_pki'])
# Inject test SQL from test path
test_sql = os.path.join(unitTestDataPath('provider'), 'testdata_pg.sql')
subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'psql'), '-h', 'localhost', '-p', cls.port, '-f', test_sql, cls.dbname])
# Create a role
subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'psql'), '-h', 'localhost', '-p', cls.port, '-c', 'CREATE ROLE "%s" WITH SUPERUSER LOGIN' % cls.username, cls.dbname])
@classmethod
def tearDownClass(cls):
"""Run after all tests"""
cls.server.terminate()
os.kill(cls.server.pid, signal.SIGABRT)
del cls.server
time.sleep(2)
rmtree(QGIS_AUTH_DB_DIR_PATH)
rmtree(cls.tempfolder)
def setUp(self):
"""Run before each test."""
pass
def tearDown(self):
"""Run after each test."""
pass
@classmethod
def _getPostGISLayer(cls, type_name, layer_name=None, authcfg=None):
"""
PG layer factory
"""
if layer_name is None:
layer_name = 'pg_' + type_name
uri = QgsDataSourceUri()
uri.setWkbType(QgsWkbTypes.Point)
uri.setConnection("localhost", cls.port, cls.dbname, "", "", QgsDataSourceUri.SslVerifyFull, authcfg)
uri.setKeyColumn('pk')
uri.setSrid('EPSG:4326')
uri.setDataSource('qgis_test', 'someData', "geom", "", "pk")
# Note: do not expand here!
layer = QgsVectorLayer(uri.uri(False), layer_name, 'postgres')
return layer
def testValidAuthAccess(self):
"""
Access the protected layer with valid credentials
"""
pg_layer = self._getPostGISLayer('testlayer_èé', authcfg=self.auth_config.id())
self.assertTrue(pg_layer.isValid())
def testInvalidAuthAccess(self):
"""
Access the protected layer with not valid credentials
"""
pg_layer = self._getPostGISLayer('testlayer_èé')
self.assertFalse(pg_layer.isValid())
if __name__ == '__main__':
unittest.main()

View File

@ -502,11 +502,13 @@ class TestQgsServer(unittest.TestCase):
report, encoded_rendered_file.strip(), tempfile.gettempdir(), image
)
with open(os.path.join(tempfile.gettempdir(), image + "_result_diff.png"), "rb") as diff_file:
encoded_diff_file = base64.b64encode(diff_file.read())
message += "\nDiff:\necho '%s' | base64 -d > %s/%s_result_diff.png" % (
encoded_diff_file.strip(), tempfile.gettempdir(), image
)
# If the failure is in image sizes the diff file will not exists.
if os.path.exists(os.path.join(tempfile.gettempdir(), image + "_result_diff.png")):
with open(os.path.join(tempfile.gettempdir(), image + "_result_diff.png"), "rb") as diff_file:
encoded_diff_file = base64.b64encode(diff_file.read())
message += "\nDiff:\necho '%s' | base64 -d > %s/%s_result_diff.png" % (
encoded_diff_file.strip(), tempfile.gettempdir(), image
)
self.assertTrue(test, message)

View File

@ -19,9 +19,9 @@ import glob
import platform
import tempfile
try:
from urllib2 import urlopen, HTTPError
from urllib2 import urlopen, HTTPError, URLError
except ImportError:
from urllib.request import urlopen, HTTPError
from urllib.request import urlopen, HTTPError, URLError
from qgis.PyQt.QtCore import QDir
@ -847,7 +847,7 @@ def waitServer(url, timeout=10):
try:
urlopen(url, timeout=1)
return True
except HTTPError:
except (HTTPError, URLError):
return True
except Exception as e:
if now() > end: