QGIS/python/plugins/processing/tests/GdalAlgorithmsGeneralTest.py
2024-06-27 13:07:25 +10:00

345 lines
16 KiB
Python

"""
***************************************************************************
GdalAlgorithmTests.py
---------------------
Date : January 2016
Copyright : (C) 2016 by Matthias Kuhn
Email : matthias@opengis.ch
***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************
"""
__author__ = 'Matthias Kuhn'
__date__ = 'January 2016'
__copyright__ = '(C) 2016, Matthias Kuhn'
import nose2
import os
import shutil
import tempfile
from qgis.core import (QgsProcessingContext,
QgsProcessingFeedback,
QgsCoordinateReferenceSystem,
QgsApplication,
QgsFeature,
QgsGeometry,
QgsPointXY,
QgsProject,
QgsVectorLayer,
QgsRectangle,
QgsProjUtils,
QgsProcessingException,
QgsProcessingFeatureSourceDefinition)
from qgis.testing import (QgisTestCase,
start_app)
from processing.algs.gdal.GdalUtils import GdalUtils
from processing.algs.gdal.ogr2ogr import ogr2ogr
from processing.algs.gdal.OgrToPostGis import OgrToPostGis
testDataPath = os.path.join(os.path.dirname(__file__), 'testdata')
class TestGdalAlgorithms(QgisTestCase):
@classmethod
def setUpClass(cls):
start_app()
from processing.core.Processing import Processing
Processing.initialize()
cls.cleanup_paths = []
@classmethod
def tearDownClass(cls):
for path in cls.cleanup_paths:
shutil.rmtree(path)
def testCommandName(self):
# Test that algorithms report a valid commandName
p = QgsApplication.processingRegistry().providerById('gdal')
for a in p.algorithms():
if a.id() in ('gdal:buildvirtualvector'):
# build virtual vector is an exception
continue
self.assertTrue(a.commandName(), f'Algorithm {a.id()} has no commandName!')
def testCommandNameInTags(self):
# Test that algorithms commandName is present in provided tags
p = QgsApplication.processingRegistry().providerById('gdal')
for a in p.algorithms():
if not a.commandName():
continue
self.assertTrue(a.commandName() in a.tags(), f'Algorithm {a.id()} commandName not found in tags!')
def testNoParameters(self):
# Test that algorithms throw QgsProcessingException and not base Python
# exceptions when no parameters specified
p = QgsApplication.processingRegistry().providerById('gdal')
context = QgsProcessingContext()
feedback = QgsProcessingFeedback()
for a in p.algorithms():
try:
a.getConsoleCommands({}, context, feedback)
except QgsProcessingException:
pass
def testGetOgrCompatibleSourceFromMemoryLayer(self):
# create a memory layer and add to project and context
layer = QgsVectorLayer("Point?field=fldtxt:string&field=fldint:integer",
"testmem", "memory")
self.assertTrue(layer.isValid())
pr = layer.dataProvider()
f = QgsFeature()
f.setAttributes(["test", 123])
f.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(100, 200)))
f2 = QgsFeature()
f2.setAttributes(["test2", 457])
f2.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(100, 200)))
self.assertTrue(pr.addFeatures([f, f2]))
self.assertEqual(layer.featureCount(), 2)
QgsProject.instance().addMapLayer(layer)
context = QgsProcessingContext()
context.setProject(QgsProject.instance())
alg = QgsApplication.processingRegistry().createAlgorithmById('gdal:buffervectors')
self.assertIsNotNone(alg)
parameters = {'INPUT': 'testmem'}
feedback = QgsProcessingFeedback()
# check that memory layer is automatically saved out to geopackage when required by GDAL algorithms
input_details = alg.getOgrCompatibleSource('INPUT', parameters, context, feedback,
executing=True)
self.assertTrue(input_details.connection_string)
self.assertTrue(input_details.connection_string.endswith('.gpkg'))
self.assertTrue(os.path.exists(input_details.connection_string))
self.assertTrue(input_details.connection_string)
# make sure that layer has correct features
res = QgsVectorLayer(input_details.connection_string, 'res')
self.assertTrue(res.isValid())
self.assertEqual(res.featureCount(), 2)
# with memory layers - if not executing layer source should be ignored and replaced
# with a dummy path, because:
# - it has no meaning for the gdal command outside of QGIS, memory layers don't exist!
# - we don't want to force an export of the whole memory layer to a temp file just to show the command preview
# this might be very slow!
input_details = alg.getOgrCompatibleSource('INPUT', parameters, context, feedback,
executing=False)
self.assertEqual(input_details.connection_string, 'path_to_data_file')
self.assertEqual(input_details.layer_name, 'layer_name')
QgsProject.instance().removeMapLayer(layer)
def testGetOgrCompatibleSourceFromOgrLayer(self):
p = QgsProject()
source = os.path.join(testDataPath, 'points.gml')
vl = QgsVectorLayer(source)
self.assertTrue(vl.isValid())
p.addMapLayer(vl)
context = QgsProcessingContext()
context.setProject(p)
feedback = QgsProcessingFeedback()
alg = ogr2ogr()
alg.initAlgorithm()
input_details = alg.getOgrCompatibleSource('INPUT', {'INPUT': vl.id()}, context, feedback, True)
self.assertEqual(input_details.connection_string, source)
input_details = alg.getOgrCompatibleSource('INPUT', {'INPUT': vl.id()}, context, feedback, False)
self.assertEqual(input_details.connection_string, source)
# with selected features only - if not executing, the 'selected features only' setting
# should be ignored (because it has no meaning for the gdal command outside of QGIS!)
parameters = {'INPUT': QgsProcessingFeatureSourceDefinition(vl.id(), True)}
input_details = alg.getOgrCompatibleSource('INPUT', parameters, context, feedback, False)
self.assertEqual(input_details.connection_string, source)
# with subset string
vl.setSubsetString('x')
input_details = alg.getOgrCompatibleSource('INPUT', parameters, context, feedback, False)
self.assertEqual(input_details.connection_string, source)
# subset of layer must be exported
input_details = alg.getOgrCompatibleSource('INPUT', parameters, context, feedback, True)
self.assertNotEqual(input_details.connection_string, source)
self.assertTrue(input_details.connection_string)
self.assertTrue(input_details.connection_string.endswith('.gpkg'))
self.assertTrue(os.path.exists(input_details.connection_string))
self.assertTrue(input_details.layer_name)
# geopackage with layer
source = os.path.join(testDataPath, 'custom', 'circular_strings.gpkg')
vl2 = QgsVectorLayer(source + '|layername=circular_strings')
self.assertTrue(vl2.isValid())
p.addMapLayer(vl2)
input_details = alg.getOgrCompatibleSource('INPUT', {'INPUT': vl2.id()}, context, feedback, True)
self.assertEqual(input_details.connection_string, source)
self.assertEqual(input_details.layer_name, 'circular_strings')
vl3 = QgsVectorLayer(source + '|layername=circular_strings_with_line')
self.assertTrue(vl3.isValid())
p.addMapLayer(vl3)
input_details = alg.getOgrCompatibleSource('INPUT', {'INPUT': vl3.id()}, context, feedback, True)
self.assertEqual(input_details.connection_string, source)
self.assertEqual(input_details.layer_name, 'circular_strings_with_line')
def testGetOgrCompatibleSourceFromFeatureSource(self):
# create a memory layer and add to project and context
layer = QgsVectorLayer("Point?field=fldtxt:string&field=fldint:integer",
"testmem", "memory")
self.assertTrue(layer.isValid())
pr = layer.dataProvider()
f = QgsFeature()
f.setAttributes(["test", 123])
f.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(100, 200)))
f2 = QgsFeature()
f2.setAttributes(["test2", 457])
f2.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(100, 200)))
self.assertTrue(pr.addFeatures([f, f2]))
self.assertEqual(layer.featureCount(), 2)
# select first feature
layer.selectByIds([next(layer.getFeatures()).id()])
self.assertEqual(len(layer.selectedFeatureIds()), 1)
QgsProject.instance().addMapLayer(layer)
context = QgsProcessingContext()
context.setProject(QgsProject.instance())
alg = QgsApplication.processingRegistry().createAlgorithmById('gdal:buffervectors')
self.assertIsNotNone(alg)
parameters = {'INPUT': QgsProcessingFeatureSourceDefinition('testmem', True)}
feedback = QgsProcessingFeedback()
# check that memory layer is automatically saved out to geopackage when required by GDAL algorithms
input_details = alg.getOgrCompatibleSource('INPUT', parameters, context, feedback,
executing=True)
self.assertTrue(input_details.connection_string)
self.assertTrue(input_details.connection_string.endswith('.gpkg'))
self.assertTrue(os.path.exists(input_details.connection_string))
self.assertTrue(input_details.layer_name)
# make sure that layer has only selected feature
res = QgsVectorLayer(input_details.connection_string, 'res')
self.assertTrue(res.isValid())
self.assertEqual(res.featureCount(), 1)
QgsProject.instance().removeMapLayer(layer)
def testOgrOutputLayerName(self):
self.assertEqual(GdalUtils.ogrOutputLayerName('/home/me/out.shp'), 'out')
self.assertEqual(GdalUtils.ogrOutputLayerName('d:/test/test_out.shp'), 'test_out')
self.assertEqual(GdalUtils.ogrOutputLayerName('d:/test/TEST_OUT.shp'), 'TEST_OUT')
self.assertEqual(GdalUtils.ogrOutputLayerName('d:/test/test_out.gpkg'), 'test_out')
def testOgrLayerNameExtraction(self):
with tempfile.TemporaryDirectory() as outdir:
def _copyFile(dst):
shutil.copyfile(os.path.join(testDataPath, 'custom', 'weighted.csv'), dst)
# OGR provider - single layer
_copyFile(os.path.join(outdir, 'a.csv'))
name = GdalUtils.ogrLayerName(outdir)
self.assertEqual(name, 'a')
# OGR provider - multiple layers
_copyFile(os.path.join(outdir, 'b.csv'))
name1 = GdalUtils.ogrLayerName(outdir + '|layerid=0')
name2 = GdalUtils.ogrLayerName(outdir + '|layerid=1')
self.assertEqual(sorted([name1, name2]), ['a', 'b'])
name = GdalUtils.ogrLayerName(outdir + '|layerid=2')
self.assertIsNone(name)
# OGR provider - layername takes precedence
name = GdalUtils.ogrLayerName(outdir + '|layername=f')
self.assertEqual(name, 'f')
name = GdalUtils.ogrLayerName(outdir + '|layerid=0|layername=f')
self.assertEqual(name, 'f')
name = GdalUtils.ogrLayerName(outdir + '|layername=f|layerid=0')
self.assertEqual(name, 'f')
# SQLite provider
name = GdalUtils.ogrLayerName('dbname=\'/tmp/x.sqlite\' table="t" (geometry) sql=')
self.assertEqual(name, 't')
# PostgreSQL provider
name = GdalUtils.ogrLayerName(
'port=5493 sslmode=disable key=\'edge_id\' srid=0 type=LineString table="city_data"."edge" (geom) sql=')
self.assertEqual(name, 'city_data.edge')
def test_gdal_connection_details_from_uri(self):
context = QgsProcessingContext()
output_details = GdalUtils.gdal_connection_details_from_uri('d:/test/test.shp', context)
self.assertEqual(output_details.connection_string, 'd:/test/test.shp')
self.assertEqual(output_details.format, '"ESRI Shapefile"')
output_details = GdalUtils.gdal_connection_details_from_uri('d:/test/test.mif', context)
self.assertEqual(output_details.connection_string, 'd:/test/test.mif')
self.assertEqual(output_details.format, '"MapInfo File"')
def testConnectionString(self):
alg = OgrToPostGis()
alg.initAlgorithm()
parameters = {}
feedback = QgsProcessingFeedback()
context = QgsProcessingContext()
# NOTE: defaults are debatable, see
# https://github.com/qgis/QGIS/pull/3607#issuecomment-253971020
self.assertEqual(alg.getConnectionString(parameters, context),
"host=localhost port=5432 active_schema=public")
parameters['HOST'] = 'remote'
self.assertEqual(alg.getConnectionString(parameters, context),
"host=remote port=5432 active_schema=public")
parameters['HOST'] = ''
self.assertEqual(alg.getConnectionString(parameters, context),
"port=5432 active_schema=public")
parameters['PORT'] = '5555'
self.assertEqual(alg.getConnectionString(parameters, context),
"port=5555 active_schema=public")
parameters['PORT'] = ''
self.assertEqual(alg.getConnectionString(parameters, context),
"active_schema=public")
parameters['USER'] = 'usr'
self.assertEqual(alg.getConnectionString(parameters, context),
"active_schema=public user=usr")
parameters['PASSWORD'] = 'pwd'
self.assertEqual(alg.getConnectionString(parameters, context),
"password=pwd active_schema=public user=usr")
def testCrsConversion(self):
self.assertFalse(GdalUtils.gdal_crs_string(QgsCoordinateReferenceSystem()))
self.assertEqual(GdalUtils.gdal_crs_string(QgsCoordinateReferenceSystem('EPSG:3111')), 'EPSG:3111')
self.assertEqual(GdalUtils.gdal_crs_string(QgsCoordinateReferenceSystem('POSTGIS:3111')), 'EPSG:3111')
self.assertEqual(GdalUtils.gdal_crs_string(QgsCoordinateReferenceSystem(
'proj4: +proj=utm +zone=36 +south +a=6378249.145 +b=6356514.966398753 +towgs84=-143,-90,-294,0,0,0,0 +units=m +no_defs')),
'EPSG:20936')
crs = QgsCoordinateReferenceSystem()
crs.createFromProj(
'+proj=utm +zone=36 +south +a=600000 +b=70000 +towgs84=-143,-90,-294,0,0,0,0 +units=m +no_defs')
self.assertTrue(crs.isValid())
# proj 6, WKT should be used
self.assertEqual(GdalUtils.gdal_crs_string(crs)[:40], 'BOUNDCRS[SOURCECRS[PROJCRS["unknown",BAS')
self.assertEqual(GdalUtils.gdal_crs_string(QgsCoordinateReferenceSystem('ESRI:102003')), 'ESRI:102003')
def testEscapeAndJoin(self):
self.assertEqual(GdalUtils.escapeAndJoin([1, "a", "a b", "a&b", "a(b)", ";"]), '1 a "a b" "a&b" "a(b)" ";"')
self.assertEqual(GdalUtils.escapeAndJoin([1, "-srcnodata", "--srcnodata", "-9999 9999"]), '1 -srcnodata --srcnodata "-9999 9999"')
if __name__ == '__main__':
nose2.main()