Server: pass full url string and data to handleRequest()

Fix tests by removing passing url pieces from environment
This commit is contained in:
David Marteau 2017-04-21 16:10:12 +02:00
parent 11bb234506
commit 018d2a24fb
7 changed files with 106 additions and 124 deletions

View File

@ -190,10 +190,11 @@ class QgsServer
* but can be also passed in args and in this case overrides the environment
* variable.
*
* @param queryString QString containing the query string
* @param requestMethod QString that indicates the method. Only "GET" or "POST" are supported.
* @param data array of bytes containing post data
* @return the response headers and body QPair of QByteArray
*/
QPair<QByteArray, QByteArray> handleRequest( const QString& queryString );
QPair<QByteArray, QByteArray> handleRequest( const QString &urlstr, const QString &requestMethod = QString(), const char *data = nullptr );
/** Returns a pointer to the server interface */
QgsServerInterface* serverInterface();

View File

@ -426,7 +426,7 @@ void QgsServer::handleRequest( QgsServerRequest &request, QgsServerResponse &res
}
}
QPair<QByteArray, QByteArray> QgsServer::handleRequest( const QString &queryString )
QPair<QByteArray, QByteArray> QgsServer::handleRequest( const QString &urlstr, const QString &requestMethod, const char *data )
{
/*
* This is mainly for python bindings, passing QUERY_STRING
@ -435,26 +435,24 @@ QPair<QByteArray, QByteArray> QgsServer::handleRequest( const QString &queryStri
* XXX To be removed because query string is now handled in QgsServerRequest
*
*/
if ( ! queryString.isEmpty() )
putenv( QStringLiteral( "QUERY_STRING" ), queryString );
QUrl url( urlstr );
QgsServerRequest::Method method = QgsServerRequest::GetMethod;
QByteArray ba;
// XXX This is mainly used in tests
char *requestMethod = getenv( "REQUEST_METHOD" );
if ( requestMethod && strcmp( requestMethod, "POST" ) == 0 )
if ( !requestMethod.isEmpty() && requestMethod.compare( QStringLiteral( "POST" ), Qt::CaseInsensitive ) == 0 )
{
method = QgsServerRequest::PostMethod;
const char *data = getenv( "REQUEST_BODY" );
if ( data )
{
ba.append( data );
}
}
QUrl url;
url.setQuery( queryString );
else if ( !requestMethod.isEmpty() && requestMethod.compare( QStringLiteral( "GET" ), Qt::CaseInsensitive ) != 0 )
{
throw QgsServerException( QStringLiteral( "Invalid method in handleRequest(): only GET or POST is supported" ) );
}
QgsBufferServerRequest request( url, method, &ba );
QgsBufferServerResponse response;

View File

@ -78,10 +78,12 @@ class SERVER_EXPORT QgsServer
* but can be also passed in args and in this case overrides the environment
* variable.
*
* \param queryString QString containing the query string
* \param urlstr QString containing the request url (simple quely string must be preceded by '?')
* \param requestMethod QString that indicates the method. Only "GET" or "POST" are supported.
* \param data array of bytes containing post data
* \returns the response headers and body QPair of QByteArray
*/
QPair<QByteArray, QByteArray> handleRequest( const QString &queryString );
QPair<QByteArray, QByteArray> handleRequest( const QString &urlstr, const QString &requestMethod = QString(), const char *data = nullptr );
//! Returns a pointer to the server interface
QgsServerInterfaceImpl *serverInterface() { return sServerInterface; }

View File

@ -104,13 +104,7 @@ class Handler(BaseHTTPRequestHandler):
# CGI vars:
for k, v in self.headers.items():
qgs_server.putenv('HTTP_%s' % k.replace(' ', '-').replace('-', '_').replace(' ', '-').upper(), v)
qgs_server.putenv('SERVER_PORT', str(self.server.server_port))
if https:
qgs_server.putenv('HTTPS', 'ON')
qgs_server.putenv('SERVER_NAME', self.server.server_name)
qgs_server.putenv('REQUEST_URI', self.path)
parsed_path = urllib.parse.urlparse(self.path)
headers, body = qgs_server.handleRequest(parsed_path.query)
headers, body = qgs_server.handleRequest(self.path)
headers_dict = dict(h.split(': ', 1) for h in headers.decode().split('\n') if h)
try:
self.send_response(int(headers_dict['Status'].split(' ')[0]))

View File

@ -123,7 +123,7 @@ class TestQgsServer(unittest.TestCase):
# Test response when project is specified but without service
project = self.testdata_path + "test_project_wfs.qgs"
qs = 'MAP=%s' % (urllib.parse.quote(project))
qs = '?MAP=%s' % (urllib.parse.quote(project))
header, body = [_v for _v in self.server.handleRequest(qs)]
response = self.strip_version_xmlns(header + body)
expected = self.strip_version_xmlns(b'Content-Length: 206\nContent-Type: text/xml; charset=utf-8\n\n<ServiceExceptionReport version="1.3.0" xmlns="http://www.opengis.net/ogc">\n <ServiceException code="Service configuration error">Service unknown or unsupported</ServiceException>\n</ServiceExceptionReport>\n')
@ -191,7 +191,7 @@ class TestQgsServer(unittest.TestCase):
self.assertTrue(filter2 in serverIface.filters()[100])
self.assertEqual(filter1, serverIface.filters()[101][0])
self.assertEqual(filter2, serverIface.filters()[200][0])
header, body = [_v for _v in self.server.handleRequest('service=simple')]
header, body = [_v for _v in self.server.handleRequest('?service=simple')]
response = header + body
expected = b'Content-Length: 62\nContent-type: text/plain\n\nHello from SimpleServer!Hello from Filter1!Hello from Filter2!'
self.assertEqual(response, expected)
@ -203,7 +203,7 @@ class TestQgsServer(unittest.TestCase):
self.assertTrue(filter2 in serverIface.filters()[100])
self.assertEqual(filter1, serverIface.filters()[101][0])
self.assertEqual(filter2, serverIface.filters()[200][0])
header, body = [_v for _v in self.server.handleRequest('service=simple')]
header, body = [_v for _v in self.server.handleRequest('?service=simple')]
response = header + body
expected = b'Content-Length: 62\nContent-type: text/plain\n\nHello from SimpleServer!Hello from Filter1!Hello from Filter2!'
self.assertEqual(response, expected)
@ -213,7 +213,7 @@ class TestQgsServer(unittest.TestCase):
project = self.testdata_path + "test_project.qgs"
assert os.path.exists(project), "Project file not found: " + project
query_string = 'MAP=%s&SERVICE=WMS&VERSION=1.3&REQUEST=%s' % (urllib.parse.quote(project), request)
query_string = '?MAP=%s&SERVICE=WMS&VERSION=1.3&REQUEST=%s' % (urllib.parse.quote(project), request)
if extra is not None:
query_string += extra
header, body = self.server.handleRequest(query_string)
@ -278,7 +278,7 @@ class TestQgsServer(unittest.TestCase):
project = self.testdata_path + "test_project_inspire.qgs"
assert os.path.exists(project), "Project file not found: " + project
query_string = 'MAP=%s&SERVICE=WMS&VERSION=1.3.0&REQUEST=%s' % (urllib.parse.quote(project), request)
query_string = '?MAP=%s&SERVICE=WMS&VERSION=1.3.0&REQUEST=%s' % (urllib.parse.quote(project), request)
header, body = self.server.handleRequest(query_string)
response = header + body
f = open(self.testdata_path + request.lower() + '_inspire.txt', 'rb')
@ -307,7 +307,7 @@ class TestQgsServer(unittest.TestCase):
project = self.testdata_path + "test_project_wfs.qgs"
assert os.path.exists(project), "Project file not found: " + project
query_string = 'MAP=%s&SERVICE=WFS&VERSION=1.0.0&REQUEST=%s' % (urllib.parse.quote(project), request)
query_string = '?MAP=%s&SERVICE=WFS&VERSION=1.0.0&REQUEST=%s' % (urllib.parse.quote(project), request)
header, body = self.server.handleRequest(query_string)
self.assert_headers(header, body)
response = header + body
@ -337,7 +337,7 @@ class TestQgsServer(unittest.TestCase):
project = self.testdata_path + "test_project_wfs.qgs"
assert os.path.exists(project), "Project file not found: " + project
query_string = 'MAP=%s&SERVICE=WFS&VERSION=1.0.0&REQUEST=%s' % (urllib.parse.quote(project), request)
query_string = '?MAP=%s&SERVICE=WFS&VERSION=1.0.0&REQUEST=%s' % (urllib.parse.quote(project), request)
header, body = self.server.handleRequest(query_string)
self.result_compare(
'wfs_getfeature_' + requestid + '.txt',
@ -351,7 +351,7 @@ class TestQgsServer(unittest.TestCase):
def test_wfs_getcapabilities_url(self):
# empty url in project
project = os.path.join(self.testdata_path, "test_project_without_urls.qgs")
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(project),
"SERVICE": "WFS",
"VERSION": "1.3.0",
@ -367,7 +367,7 @@ class TestQgsServer(unittest.TestCase):
# url well defined in project
project = os.path.join(self.testdata_path, "test_project_with_urls.qgs")
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(project),
"SERVICE": "WFS",
"VERSION": "1.3.0",
@ -418,12 +418,8 @@ class TestQgsServer(unittest.TestCase):
project = self.testdata_path + "test_project_wfs.qgs"
assert os.path.exists(project), "Project file not found: " + project
query_string = 'MAP={}'.format(urllib.parse.quote(project))
self.server.putenv("REQUEST_METHOD", "POST")
self.server.putenv("REQUEST_BODY", request)
header, body = self.server.handleRequest(query_string)
self.server.putenv("REQUEST_METHOD", '')
self.server.putenv("REQUEST_BODY", '')
query_string = '?MAP={}'.format(urllib.parse.quote(project))
header, body = self.server.handleRequest(query_string, requestMethod="POST", data=request)
self.result_compare(
'wfs_getfeature_{}.txt'.format(requestid),
@ -458,7 +454,7 @@ class TestQgsServer(unittest.TestCase):
self.wfs_getfeature_post_compare(id, req)
def test_wms_getmap_basic(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -476,7 +472,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetMap_Basic")
def test_wms_getmap_transparent(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -495,7 +491,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetMap_Transparent")
def test_wms_getmap_background(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -513,7 +509,7 @@ class TestQgsServer(unittest.TestCase):
r, h = self._result(self.server.handleRequest(qs))
self._img_diff_error(r, h, "WMS_GetMap_Background")
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -534,7 +530,7 @@ class TestQgsServer(unittest.TestCase):
def test_wms_getcapabilities_url(self):
# empty url in project
project = os.path.join(self.testdata_path, "test_project_without_urls.qgs")
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(project),
"SERVICE": "WMS",
"VERSION": "1.3.0",
@ -553,7 +549,7 @@ class TestQgsServer(unittest.TestCase):
# url well defined in project
project = os.path.join(self.testdata_path, "test_project_with_urls.qgs")
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(project),
"SERVICE": "WMS",
"VERSION": "1.3.0",
@ -572,7 +568,7 @@ class TestQgsServer(unittest.TestCase):
def test_wms_getmap_invalid_size(self):
project = os.path.join(self.testdata_path, "test_project_with_size.qgs")
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(project),
"SERVICE": "WMS",
"VERSION": "1.3.0",
@ -590,7 +586,7 @@ class TestQgsServer(unittest.TestCase):
self.assertEqual(self.strip_version_xmlns(r), expected)
def test_wms_getmap_order(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -608,7 +604,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetMap_LayerOrder")
def test_wms_getmap_srs(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -627,7 +623,7 @@ class TestQgsServer(unittest.TestCase):
def test_wms_getmap_style(self):
# default style
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -645,7 +641,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetMap_StyleDefault")
# custom style
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -663,7 +659,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetMap_StyleCustom")
def test_wms_getmap_filter(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -682,7 +678,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetMap_Filter")
def test_wms_getmap_selection(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -701,7 +697,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetMap_Selection")
def test_wms_getmap_opacities(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -720,7 +716,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetMap_Opacities")
def test_wms_getprint_basic(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -739,7 +735,7 @@ class TestQgsServer(unittest.TestCase):
@unittest.skip('Randomly failing to draw the map layer')
def test_wms_getprint_srs(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -757,7 +753,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetPrint_SRS")
def test_wms_getprint_scale(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -776,7 +772,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetPrint_Scale")
def test_wms_getprint_grid(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -796,7 +792,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetPrint_Grid")
def test_wms_getprint_rotation(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -815,7 +811,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetPrint_Rotation")
def test_wms_getprint_selection(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -845,7 +841,7 @@ class TestQgsServer(unittest.TestCase):
# 'HEIGHT': '20', # optional
'LAYER': 'testlayer%20èé',
}
qs = '&'.join(["%s=%s" % (k, v) for k, v in parms.items()])
qs = '?' + '&'.join(["%s=%s" % (k, v) for k, v in parms.items()])
h, r = self.server.handleRequest(qs)
self.assertEqual(-1, h.find(b'Content-Type: text/xml; charset=utf-8'), "Header: %s\nResponse:\n%s" % (h, r))
self.assertNotEqual(-1, h.find(b'Content-Type: image/png'), "Header: %s\nResponse:\n%s" % (h, r))
@ -863,7 +859,7 @@ class TestQgsServer(unittest.TestCase):
'LAYER': u'testlayer%20èé',
'LAYERTITLE': 'TRUE',
}
qs = '&'.join([u"%s=%s" % (k, v) for k, v in parms.items()])
qs = '?' + '&'.join([u"%s=%s" % (k, v) for k, v in parms.items()])
r, h = self._result(self.server.handleRequest(qs))
self._img_diff_error(r, h, "WMS_GetLegendGraphic_test", 250, QSize(15, 15))
@ -878,12 +874,12 @@ class TestQgsServer(unittest.TestCase):
'LAYER': u'testlayer%20èé',
'LAYERTITLE': 'FALSE',
}
qs = '&'.join([u"%s=%s" % (k, v) for k, v in parms.items()])
qs = '?' + '&'.join([u"%s=%s" % (k, v) for k, v in parms.items()])
r, h = self._result(self.server.handleRequest(qs))
self._img_diff_error(r, h, "WMS_GetLegendGraphic_test_layertitle_false", 250, QSize(15, 15))
def test_wms_GetLegendGraphic_Basic(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -900,7 +896,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetLegendGraphic_Basic")
def test_wms_GetLegendGraphic_Transparent(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -918,7 +914,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetLegendGraphic_Transparent")
def test_wms_GetLegendGraphic_Background(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -935,7 +931,7 @@ class TestQgsServer(unittest.TestCase):
r, h = self._result(self.server.handleRequest(qs))
self._img_diff_error(r, h, "WMS_GetLegendGraphic_Background")
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -953,7 +949,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetLegendGraphic_Background_Hex")
def test_wms_GetLegendGraphic_BoxSpace(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -971,7 +967,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetLegendGraphic_BoxSpace")
def test_wms_GetLegendGraphic_SymbolSpace(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -989,7 +985,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetLegendGraphic_SymbolSpace")
def test_wms_GetLegendGraphic_IconLabelSpace(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -1007,7 +1003,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetLegendGraphic_IconLabelSpace")
def test_wms_GetLegendGraphic_SymbolSize(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -1026,7 +1022,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetLegendGraphic_SymbolSize")
def test_wms_GetLegendGraphic_BBox(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -1044,7 +1040,7 @@ class TestQgsServer(unittest.TestCase):
self._img_diff_error(r, h, "WMS_GetLegendGraphic_BBox")
def test_wms_GetLegendGraphic_BBox2(self):
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WMS",
"VERSION": "1.1.1",
@ -1066,7 +1062,7 @@ class TestQgsServer(unittest.TestCase):
project = self.projectPath
assert os.path.exists(project), "Project file not found: " + project
query_string = 'MAP=%s&SERVICE=WCS&VERSION=1.0.0&REQUEST=%s' % (urllib.parse.quote(project), request)
query_string = '?MAP=%s&SERVICE=WCS&VERSION=1.0.0&REQUEST=%s' % (urllib.parse.quote(project), request)
header, body = self.server.handleRequest(query_string)
self.assert_headers(header, body)
response = header + body
@ -1096,7 +1092,7 @@ class TestQgsServer(unittest.TestCase):
def test_wcs_getcapabilities_url(self):
# empty url in project
project = os.path.join(self.testdata_path, "test_project_without_urls.qgs")
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(project),
"SERVICE": "WCS",
"VERSION": "1.0.0",
@ -1115,7 +1111,7 @@ class TestQgsServer(unittest.TestCase):
# url well defined in project
project = os.path.join(self.testdata_path, "test_project_with_urls.qgs")
qs = "&".join(["%s=%s" % i for i in list({
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(project),
"SERVICE": "WCS",
"VERSION": "1.0.0",
@ -1161,6 +1157,7 @@ class TestQgsServer(unittest.TestCase):
self.assertEqual(
headers.get("Content-Type"), "image/png",
"Content type is wrong: %s" % headers.get("Content-Type"))
test, report = self._img_diff(response, image, max_diff, max_size_diff)
with open(os.path.join(tempfile.gettempdir(), image + "_result.png"), "rb") as rendered_file:

View File

@ -1372,9 +1372,10 @@ class TestQgsServerAccessControl(unittest.TestCase):
str(response).find("<qgs:pk>") != -1,
"Project based layer subsetString not respected in GetFeature with restricted access\n%s" % response)
def _handle_request(self, restricted, *args):
def _handle_request(self, restricted, query_string, **kwargs):
self._accesscontrol._active = restricted
result = self._result(self._server.handleRequest(*args))
qs = "?" + query_string if query_string is not None else None
result = self._result(self._server.handleRequest(qs, **kwargs))
return result
def _result(self, data):
@ -1388,34 +1389,22 @@ class TestQgsServerAccessControl(unittest.TestCase):
return data[1], headers
def _get_fullaccess(self, query_string):
self._server.putenv("REQUEST_METHOD", "GET")
result = self._handle_request(False, query_string)
self._server.putenv("REQUEST_METHOD", '')
return result
def _get_restricted(self, query_string):
self._server.putenv("REQUEST_METHOD", "GET")
result = self._handle_request(True, query_string)
self._server.putenv("REQUEST_METHOD", '')
return result
def _post_fullaccess(self, data, query_string=None):
self._server.putenv("REQUEST_METHOD", "POST")
self._server.putenv("REQUEST_BODY", data)
self._server.putenv("QGIS_PROJECT_FILE", self.projectPath)
result = self._handle_request(False, query_string)
self._server.putenv("REQUEST_METHOD", '')
self._server.putenv("REQUEST_BODY", '')
result = self._handle_request(False, query_string, requestMethod='POST', data=data)
self._server.putenv("QGIS_PROJECT_FILE", '')
return result
def _post_restricted(self, data, query_string=None):
self._server.putenv("REQUEST_METHOD", "POST")
self._server.putenv("REQUEST_BODY", data)
self._server.putenv("QGIS_PROJECT_FILE", self.projectPath)
result = self._handle_request(True, query_string)
self._server.putenv("REQUEST_METHOD", '')
self._server.putenv("REQUEST_BODY", '')
result = self._handle_request(True, query_string, requestMethod='POST', data=data)
self._server.putenv("QGIS_PROJECT_FILE", '')
return result
@ -1437,6 +1426,7 @@ class TestQgsServerAccessControl(unittest.TestCase):
self.assertEqual(
headers.get("Content-Type"), "image/png",
"Content type is wrong: %s" % headers.get("Content-Type"))
test, report = self._img_diff(response, image, max_diff, max_size_diff)
with open(os.path.join(tempfile.gettempdir(), image + "_result.png"), "rb") as rendered_file:

View File

@ -306,51 +306,51 @@ class TestQgsServerSecurity(unittest.TestCase):
return False
def handle_request_wfs_getfeature_filter(self, filter_xml):
qs = "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.project),
"SERVICE": "WFS",
"VERSION": "1.1.1",
"REQUEST": "GetFeature",
"TYPENAME": "point",
"STYLES": "",
"CRS": "EPSG:32613",
"FILTER": filter_xml}.items())])
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.project),
"SERVICE": "WFS",
"VERSION": "1.1.1",
"REQUEST": "GetFeature",
"TYPENAME": "point",
"STYLES": "",
"CRS": "EPSG:32613",
"FILTER": filter_xml}.items())])
return self.server.handleRequest(qs)
def handle_request_wms_getfeatureinfo(self, filter_sql):
qs = "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.project),
"SERVICE": "WMS",
"VERSION": "1.1.1",
"REQUEST": "GetFeatureInfo",
"QUERY_LAYERS": "point",
"LAYERS": "point",
"STYLES": "",
"FORMAT": "image/png",
"HEIGHT": "500",
"WIDTH": "500",
"BBOX": "606171,4822867,612834,4827375",
"CRS": "EPSG:32613",
"FILTER": filter_sql}.items())])
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.project),
"SERVICE": "WMS",
"VERSION": "1.1.1",
"REQUEST": "GetFeatureInfo",
"QUERY_LAYERS": "point",
"LAYERS": "point",
"STYLES": "",
"FORMAT": "image/png",
"HEIGHT": "500",
"WIDTH": "500",
"BBOX": "606171,4822867,612834,4827375",
"CRS": "EPSG:32613",
"FILTER": filter_sql}.items())])
return self._result(self.server.handleRequest(qs))
def handle_request_wms_getmap(self, sld):
qs = "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.project),
"SERVICE": "WMS",
"VERSION": "1.0.0",
"REQUEST": "GetMap",
"QUERY_LAYERS": "point",
"LAYERS": "point",
"STYLES": "",
"FORMAT": "image/png",
"HEIGHT": "500",
"WIDTH": "500",
"BBOX": "606171,4822867,612834,4827375",
"CRS": "EPSG:32613",
"SLD": sld}.items())])
qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.project),
"SERVICE": "WMS",
"VERSION": "1.0.0",
"REQUEST": "GetMap",
"QUERY_LAYERS": "point",
"LAYERS": "point",
"STYLES": "",
"FORMAT": "image/png",
"HEIGHT": "500",
"WIDTH": "500",
"BBOX": "606171,4822867,612834,4827375",
"CRS": "EPSG:32613",
"SLD": sld}.items())])
return self._result(self.server.handleRequest(qs))