mirror of
				https://github.com/qgis/QGIS.git
				synced 2025-10-26 00:04:03 -04:00 
			
		
		
		
	running GDAL algorithms If a subset string is set, we must export the subset of the layer for use by the GDAL command* Fixes #35981 * well, we probably **should** just build the gdal command to include the SQL definition of the subset filter, but that's non-trivial, so this fix is a good simple solution for now
		
			
				
	
	
		
			354 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			354 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| """
 | |
| ***************************************************************************
 | |
|     GdalAlgorithmTests.py
 | |
|     ---------------------
 | |
|     Date                 : January 2016
 | |
|     Copyright            : (C) 2016 by Matthias Kuhn
 | |
|     Email                : matthias@opengis.ch
 | |
| ***************************************************************************
 | |
| *                                                                         *
 | |
| *   This program is free software; you can redistribute it and/or modify  *
 | |
| *   it under the terms of the GNU General Public License as published by  *
 | |
| *   the Free Software Foundation; either version 2 of the License, or     *
 | |
| *   (at your option) any later version.                                   *
 | |
| *                                                                         *
 | |
| ***************************************************************************
 | |
| """
 | |
| 
 | |
| __author__ = 'Matthias Kuhn'
 | |
| __date__ = 'January 2016'
 | |
| __copyright__ = '(C) 2016, Matthias Kuhn'
 | |
| 
 | |
| import nose2
 | |
| import os
 | |
| import shutil
 | |
| import tempfile
 | |
| 
 | |
| from qgis.core import (QgsProcessingContext,
 | |
|                        QgsProcessingFeedback,
 | |
|                        QgsCoordinateReferenceSystem,
 | |
|                        QgsApplication,
 | |
|                        QgsFeature,
 | |
|                        QgsGeometry,
 | |
|                        QgsPointXY,
 | |
|                        QgsProject,
 | |
|                        QgsVectorLayer,
 | |
|                        QgsRectangle,
 | |
|                        QgsProjUtils,
 | |
|                        QgsProcessingException,
 | |
|                        QgsProcessingFeatureSourceDefinition)
 | |
| 
 | |
| from qgis.testing import (start_app,
 | |
|                           unittest)
 | |
| 
 | |
| from processing.algs.gdal.GdalUtils import GdalUtils
 | |
| from processing.algs.gdal.ogr2ogr import ogr2ogr
 | |
| from processing.algs.gdal.OgrToPostGis import OgrToPostGis
 | |
| 
 | |
| testDataPath = os.path.join(os.path.dirname(__file__), 'testdata')
 | |
| 
 | |
| 
 | |
| class TestGdalAlgorithms(unittest.TestCase):
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
|         start_app()
 | |
|         from processing.core.Processing import Processing
 | |
|         Processing.initialize()
 | |
|         cls.cleanup_paths = []
 | |
| 
 | |
|     @classmethod
 | |
|     def tearDownClass(cls):
 | |
|         for path in cls.cleanup_paths:
 | |
|             shutil.rmtree(path)
 | |
| 
 | |
|     def testCommandName(self):
 | |
|         # Test that algorithms report a valid commandName
 | |
|         p = QgsApplication.processingRegistry().providerById('gdal')
 | |
|         for a in p.algorithms():
 | |
|             if a.id() in ('gdal:buildvirtualvector'):
 | |
|                 # build virtual vector is an exception
 | |
|                 continue
 | |
|             self.assertTrue(a.commandName(), 'Algorithm {} has no commandName!'.format(a.id()))
 | |
| 
 | |
|     def testCommandNameInTags(self):
 | |
|         # Test that algorithms commandName is present in provided tags
 | |
|         p = QgsApplication.processingRegistry().providerById('gdal')
 | |
|         for a in p.algorithms():
 | |
|             if not a.commandName():
 | |
|                 continue
 | |
|             self.assertTrue(a.commandName() in a.tags(), 'Algorithm {} commandName not found in tags!'.format(a.id()))
 | |
| 
 | |
|     def testNoParameters(self):
 | |
|         # Test that algorithms throw QgsProcessingExceptions and not base Python
 | |
|         # exceptions when no parameters specified
 | |
|         p = QgsApplication.processingRegistry().providerById('gdal')
 | |
|         context = QgsProcessingContext()
 | |
|         feedback = QgsProcessingFeedback()
 | |
|         for a in p.algorithms():
 | |
|             try:
 | |
|                 a.getConsoleCommands({}, context, feedback)
 | |
|             except QgsProcessingException:
 | |
|                 pass
 | |
| 
 | |
|     def testGetOgrCompatibleSourceFromMemoryLayer(self):
 | |
|         # create a memory layer and add to project and context
 | |
|         layer = QgsVectorLayer("Point?field=fldtxt:string&field=fldint:integer",
 | |
|                                "testmem", "memory")
 | |
|         self.assertTrue(layer.isValid())
 | |
|         pr = layer.dataProvider()
 | |
|         f = QgsFeature()
 | |
|         f.setAttributes(["test", 123])
 | |
|         f.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(100, 200)))
 | |
|         f2 = QgsFeature()
 | |
|         f2.setAttributes(["test2", 457])
 | |
|         f2.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(100, 200)))
 | |
|         self.assertTrue(pr.addFeatures([f, f2]))
 | |
|         self.assertEqual(layer.featureCount(), 2)
 | |
|         QgsProject.instance().addMapLayer(layer)
 | |
|         context = QgsProcessingContext()
 | |
|         context.setProject(QgsProject.instance())
 | |
| 
 | |
|         alg = QgsApplication.processingRegistry().createAlgorithmById('gdal:buffervectors')
 | |
|         self.assertIsNotNone(alg)
 | |
|         parameters = {'INPUT': 'testmem'}
 | |
|         feedback = QgsProcessingFeedback()
 | |
|         # check that memory layer is automatically saved out to geopackage when required by GDAL algorithms
 | |
|         ogr_data_path, ogr_layer_name = alg.getOgrCompatibleSource('INPUT', parameters, context, feedback,
 | |
|                                                                    executing=True)
 | |
|         self.assertTrue(ogr_data_path)
 | |
|         self.assertTrue(ogr_data_path.endswith('.gpkg'))
 | |
|         self.assertTrue(os.path.exists(ogr_data_path))
 | |
|         self.assertTrue(ogr_layer_name)
 | |
| 
 | |
|         # make sure that layer has correct features
 | |
|         res = QgsVectorLayer(ogr_data_path, 'res')
 | |
|         self.assertTrue(res.isValid())
 | |
|         self.assertEqual(res.featureCount(), 2)
 | |
| 
 | |
|         # with memory layers - if not executing layer source should be ignored and replaced
 | |
|         # with a dummy path, because:
 | |
|         # - it has no meaning for the gdal command outside of QGIS, memory layers don't exist!
 | |
|         # - we don't want to force an export of the whole memory layer to a temp file just to show the command preview
 | |
|         # this might be very slow!
 | |
|         ogr_data_path, ogr_layer_name = alg.getOgrCompatibleSource('INPUT', parameters, context, feedback,
 | |
|                                                                    executing=False)
 | |
|         self.assertEqual(ogr_data_path, 'path_to_data_file')
 | |
|         self.assertEqual(ogr_layer_name, 'layer_name')
 | |
| 
 | |
|         QgsProject.instance().removeMapLayer(layer)
 | |
| 
 | |
|     def testGetOgrCompatibleSourceFromOgrLayer(self):
 | |
|         p = QgsProject()
 | |
|         source = os.path.join(testDataPath, 'points.gml')
 | |
|         vl = QgsVectorLayer(source)
 | |
|         self.assertTrue(vl.isValid())
 | |
|         p.addMapLayer(vl)
 | |
| 
 | |
|         context = QgsProcessingContext()
 | |
|         context.setProject(p)
 | |
|         feedback = QgsProcessingFeedback()
 | |
| 
 | |
|         alg = ogr2ogr()
 | |
|         alg.initAlgorithm()
 | |
|         path, layer = alg.getOgrCompatibleSource('INPUT', {'INPUT': vl.id()}, context, feedback, True)
 | |
|         self.assertEqual(path, source)
 | |
|         path, layer = alg.getOgrCompatibleSource('INPUT', {'INPUT': vl.id()}, context, feedback, False)
 | |
|         self.assertEqual(path, source)
 | |
| 
 | |
|         # with selected features only - if not executing, the 'selected features only' setting
 | |
|         # should be ignored (because it has no meaning for the gdal command outside of QGIS!)
 | |
|         parameters = {'INPUT': QgsProcessingFeatureSourceDefinition(vl.id(), True)}
 | |
|         path, layer = alg.getOgrCompatibleSource('INPUT', parameters, context, feedback, False)
 | |
|         self.assertEqual(path, source)
 | |
| 
 | |
|         # with subset string
 | |
|         vl.setSubsetString('x')
 | |
|         path, layer = alg.getOgrCompatibleSource('INPUT', parameters, context, feedback, False)
 | |
|         self.assertEqual(path, source)
 | |
|         # subset of layer must be exported
 | |
|         path, layer = alg.getOgrCompatibleSource('INPUT', parameters, context, feedback, True)
 | |
|         self.assertNotEqual(path, source)
 | |
|         self.assertTrue(path)
 | |
|         self.assertTrue(path.endswith('.gpkg'))
 | |
|         self.assertTrue(os.path.exists(path))
 | |
|         self.assertTrue(layer)
 | |
| 
 | |
|         # geopackage with layer
 | |
|         source = os.path.join(testDataPath, 'custom', 'circular_strings.gpkg')
 | |
|         vl2 = QgsVectorLayer(source + '|layername=circular_strings')
 | |
|         self.assertTrue(vl2.isValid())
 | |
|         p.addMapLayer(vl2)
 | |
|         path, layer = alg.getOgrCompatibleSource('INPUT', {'INPUT': vl2.id()}, context, feedback, True)
 | |
|         self.assertEqual(path, source)
 | |
|         self.assertEqual(layer, 'circular_strings')
 | |
|         vl3 = QgsVectorLayer(source + '|layername=circular_strings_with_line')
 | |
|         self.assertTrue(vl3.isValid())
 | |
|         p.addMapLayer(vl3)
 | |
|         path, layer = alg.getOgrCompatibleSource('INPUT', {'INPUT': vl3.id()}, context, feedback, True)
 | |
|         self.assertEqual(path, source)
 | |
|         self.assertEqual(layer, 'circular_strings_with_line')
 | |
| 
 | |
|     def testGetOgrCompatibleSourceFromFeatureSource(self):
 | |
|         # create a memory layer and add to project and context
 | |
|         layer = QgsVectorLayer("Point?field=fldtxt:string&field=fldint:integer",
 | |
|                                "testmem", "memory")
 | |
|         self.assertTrue(layer.isValid())
 | |
|         pr = layer.dataProvider()
 | |
|         f = QgsFeature()
 | |
|         f.setAttributes(["test", 123])
 | |
|         f.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(100, 200)))
 | |
|         f2 = QgsFeature()
 | |
|         f2.setAttributes(["test2", 457])
 | |
|         f2.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(100, 200)))
 | |
|         self.assertTrue(pr.addFeatures([f, f2]))
 | |
|         self.assertEqual(layer.featureCount(), 2)
 | |
|         # select first feature
 | |
|         layer.selectByIds([next(layer.getFeatures()).id()])
 | |
|         self.assertEqual(len(layer.selectedFeatureIds()), 1)
 | |
|         QgsProject.instance().addMapLayer(layer)
 | |
|         context = QgsProcessingContext()
 | |
|         context.setProject(QgsProject.instance())
 | |
| 
 | |
|         alg = QgsApplication.processingRegistry().createAlgorithmById('gdal:buffervectors')
 | |
|         self.assertIsNotNone(alg)
 | |
|         parameters = {'INPUT': QgsProcessingFeatureSourceDefinition('testmem', True)}
 | |
|         feedback = QgsProcessingFeedback()
 | |
|         # check that memory layer is automatically saved out to geopackage when required by GDAL algorithms
 | |
|         ogr_data_path, ogr_layer_name = alg.getOgrCompatibleSource('INPUT', parameters, context, feedback,
 | |
|                                                                    executing=True)
 | |
|         self.assertTrue(ogr_data_path)
 | |
|         self.assertTrue(ogr_data_path.endswith('.gpkg'))
 | |
|         self.assertTrue(os.path.exists(ogr_data_path))
 | |
|         self.assertTrue(ogr_layer_name)
 | |
| 
 | |
|         # make sure that layer has only selected feature
 | |
|         res = QgsVectorLayer(ogr_data_path, 'res')
 | |
|         self.assertTrue(res.isValid())
 | |
|         self.assertEqual(res.featureCount(), 1)
 | |
| 
 | |
|         QgsProject.instance().removeMapLayer(layer)
 | |
| 
 | |
|     def testOgrOutputLayerName(self):
 | |
|         self.assertEqual(GdalUtils.ogrOutputLayerName('/home/me/out.shp'), 'out')
 | |
|         self.assertEqual(GdalUtils.ogrOutputLayerName('d:/test/test_out.shp'), 'test_out')
 | |
|         self.assertEqual(GdalUtils.ogrOutputLayerName('d:/test/TEST_OUT.shp'), 'TEST_OUT')
 | |
|         self.assertEqual(GdalUtils.ogrOutputLayerName('d:/test/test_out.gpkg'), 'test_out')
 | |
| 
 | |
|     def testOgrLayerNameExtraction(self):
 | |
|         with tempfile.TemporaryDirectory() as outdir:
 | |
|             def _copyFile(dst):
 | |
|                 shutil.copyfile(os.path.join(testDataPath, 'custom', 'grass7', 'weighted.csv'), dst)
 | |
| 
 | |
|             # OGR provider - single layer
 | |
|             _copyFile(os.path.join(outdir, 'a.csv'))
 | |
|             name = GdalUtils.ogrLayerName(outdir)
 | |
|             self.assertEqual(name, 'a')
 | |
| 
 | |
|             # OGR provider - multiple layers
 | |
|             _copyFile(os.path.join(outdir, 'b.csv'))
 | |
|             name1 = GdalUtils.ogrLayerName(outdir + '|layerid=0')
 | |
|             name2 = GdalUtils.ogrLayerName(outdir + '|layerid=1')
 | |
|             self.assertEqual(sorted([name1, name2]), ['a', 'b'])
 | |
| 
 | |
|             name = GdalUtils.ogrLayerName(outdir + '|layerid=2')
 | |
|             self.assertIsNone(name)
 | |
| 
 | |
|             # OGR provider - layername takes precedence
 | |
|             name = GdalUtils.ogrLayerName(outdir + '|layername=f')
 | |
|             self.assertEqual(name, 'f')
 | |
| 
 | |
|             name = GdalUtils.ogrLayerName(outdir + '|layerid=0|layername=f')
 | |
|             self.assertEqual(name, 'f')
 | |
| 
 | |
|             name = GdalUtils.ogrLayerName(outdir + '|layername=f|layerid=0')
 | |
|             self.assertEqual(name, 'f')
 | |
| 
 | |
|             # SQLite provider
 | |
|             name = GdalUtils.ogrLayerName('dbname=\'/tmp/x.sqlite\' table="t" (geometry) sql=')
 | |
|             self.assertEqual(name, 't')
 | |
| 
 | |
|             # PostgreSQL provider
 | |
|             name = GdalUtils.ogrLayerName(
 | |
|                 'port=5493 sslmode=disable key=\'edge_id\' srid=0 type=LineString table="city_data"."edge" (geom) sql=')
 | |
|             self.assertEqual(name, 'city_data.edge')
 | |
| 
 | |
|     def testOgrConnectionStringAndFormat(self):
 | |
|         context = QgsProcessingContext()
 | |
|         output, outputFormat = GdalUtils.ogrConnectionStringAndFormat('d:/test/test.shp', context)
 | |
|         self.assertEqual(output, 'd:/test/test.shp')
 | |
|         self.assertEqual(outputFormat, '"ESRI Shapefile"')
 | |
|         output, outputFormat = GdalUtils.ogrConnectionStringAndFormat('d:/test/test.mif', context)
 | |
|         self.assertEqual(output, 'd:/test/test.mif')
 | |
|         self.assertEqual(outputFormat, '"MapInfo File"')
 | |
| 
 | |
|     def testConnectionString(self):
 | |
|         alg = OgrToPostGis()
 | |
|         alg.initAlgorithm()
 | |
|         parameters = {}
 | |
|         feedback = QgsProcessingFeedback()
 | |
|         context = QgsProcessingContext()
 | |
| 
 | |
|         # NOTE: defaults are debatable, see
 | |
|         # https://github.com/qgis/QGIS/pull/3607#issuecomment-253971020
 | |
|         self.assertEqual(alg.getConnectionString(parameters, context),
 | |
|                          "host=localhost port=5432 active_schema=public")
 | |
| 
 | |
|         parameters['HOST'] = 'remote'
 | |
|         self.assertEqual(alg.getConnectionString(parameters, context),
 | |
|                          "host=remote port=5432 active_schema=public")
 | |
| 
 | |
|         parameters['HOST'] = ''
 | |
|         self.assertEqual(alg.getConnectionString(parameters, context),
 | |
|                          "port=5432 active_schema=public")
 | |
| 
 | |
|         parameters['PORT'] = '5555'
 | |
|         self.assertEqual(alg.getConnectionString(parameters, context),
 | |
|                          "port=5555 active_schema=public")
 | |
| 
 | |
|         parameters['PORT'] = ''
 | |
|         self.assertEqual(alg.getConnectionString(parameters, context),
 | |
|                          "active_schema=public")
 | |
| 
 | |
|         parameters['USER'] = 'usr'
 | |
|         self.assertEqual(alg.getConnectionString(parameters, context),
 | |
|                          "active_schema=public user=usr")
 | |
| 
 | |
|         parameters['PASSWORD'] = 'pwd'
 | |
|         self.assertEqual(alg.getConnectionString(parameters, context),
 | |
|                          "password=pwd active_schema=public user=usr")
 | |
| 
 | |
|     def testCrsConversion(self):
 | |
|         self.assertFalse(GdalUtils.gdal_crs_string(QgsCoordinateReferenceSystem()))
 | |
|         self.assertEqual(GdalUtils.gdal_crs_string(QgsCoordinateReferenceSystem('EPSG:3111')), 'EPSG:3111')
 | |
|         self.assertEqual(GdalUtils.gdal_crs_string(QgsCoordinateReferenceSystem('POSTGIS:3111')), 'EPSG:3111')
 | |
|         self.assertEqual(GdalUtils.gdal_crs_string(QgsCoordinateReferenceSystem(
 | |
|             'proj4: +proj=utm +zone=36 +south +a=6378249.145 +b=6356514.966398753 +towgs84=-143,-90,-294,0,0,0,0 +units=m +no_defs')),
 | |
|             'EPSG:20936')
 | |
|         crs = QgsCoordinateReferenceSystem()
 | |
|         crs.createFromProj(
 | |
|             '+proj=utm +zone=36 +south +a=600000 +b=70000 +towgs84=-143,-90,-294,0,0,0,0 +units=m +no_defs')
 | |
|         self.assertTrue(crs.isValid())
 | |
| 
 | |
|         if QgsProjUtils.projVersionMajor() >= 6:
 | |
|             # proj 6, WKT should be used
 | |
|             self.assertEqual(GdalUtils.gdal_crs_string(crs)[:40], 'BOUNDCRS[SOURCECRS[PROJCRS["unknown",BAS')
 | |
| 
 | |
|             self.assertEqual(GdalUtils.gdal_crs_string(QgsCoordinateReferenceSystem('ESRI:102003')), 'ESRI:102003')
 | |
|         else:
 | |
|             self.assertEqual(GdalUtils.gdal_crs_string(crs),
 | |
|                              '+proj=utm +zone=36 +south +a=600000 +b=70000 +towgs84=-143,-90,-294,0,0,0,0 +units=m +no_defs')
 | |
|             # check that newlines are stripped
 | |
|             crs = QgsCoordinateReferenceSystem()
 | |
|             crs.createFromProj(
 | |
|                 '+proj=utm +zone=36 +south\n     +a=600000 +b=70000 \r\n    +towgs84=-143,-90,-294,0,0,0,0 +units=m\n+no_defs')
 | |
|             self.assertTrue(crs.isValid())
 | |
|             self.assertEqual(GdalUtils.gdal_crs_string(crs),
 | |
|                              '+proj=utm +zone=36 +south      +a=600000 +b=70000       +towgs84=-143,-90,-294,0,0,0,0 +units=m +no_defs')
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     nose2.main()
 |