QGIS/python/plugins/processing/tests/GdalAlgorithmsGeneralTest.py
2024-11-29 15:38:02 +01:00

426 lines
16 KiB
Python

"""
***************************************************************************
GdalAlgorithmTests.py
---------------------
Date : January 2016
Copyright : (C) 2016 by Matthias Kuhn
Email : matthias@opengis.ch
***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************
"""
__author__ = "Matthias Kuhn"
__date__ = "January 2016"
__copyright__ = "(C) 2016, Matthias Kuhn"
import nose2
import os
import shutil
import tempfile
from qgis.core import (
QgsProcessingContext,
QgsProcessingFeedback,
QgsCoordinateReferenceSystem,
QgsApplication,
QgsFeature,
QgsGeometry,
QgsPointXY,
QgsProject,
QgsVectorLayer,
QgsRectangle,
QgsProjUtils,
QgsProcessingException,
QgsProcessingFeatureSourceDefinition,
)
from qgis.testing import QgisTestCase, start_app
from processing.algs.gdal.GdalUtils import GdalUtils
from processing.algs.gdal.ogr2ogr import ogr2ogr
from processing.algs.gdal.OgrToPostGis import OgrToPostGis
testDataPath = os.path.join(os.path.dirname(__file__), "testdata")
class TestGdalAlgorithms(QgisTestCase):
@classmethod
def setUpClass(cls):
start_app()
from processing.core.Processing import Processing
Processing.initialize()
cls.cleanup_paths = []
@classmethod
def tearDownClass(cls):
for path in cls.cleanup_paths:
shutil.rmtree(path)
def testCommandName(self):
# Test that algorithms report a valid commandName
p = QgsApplication.processingRegistry().providerById("gdal")
for a in p.algorithms():
if a.id() in ("gdal:buildvirtualvector"):
# build virtual vector is an exception
continue
self.assertTrue(a.commandName(), f"Algorithm {a.id()} has no commandName!")
def testCommandNameInTags(self):
# Test that algorithms commandName is present in provided tags
p = QgsApplication.processingRegistry().providerById("gdal")
for a in p.algorithms():
if not a.commandName():
continue
self.assertTrue(
a.commandName() in a.tags(),
f"Algorithm {a.id()} commandName not found in tags!",
)
def testNoParameters(self):
# Test that algorithms throw QgsProcessingException and not base Python
# exceptions when no parameters specified
p = QgsApplication.processingRegistry().providerById("gdal")
context = QgsProcessingContext()
feedback = QgsProcessingFeedback()
for a in p.algorithms():
try:
a.getConsoleCommands({}, context, feedback)
except QgsProcessingException:
pass
def testGetOgrCompatibleSourceFromMemoryLayer(self):
# create a memory layer and add to project and context
layer = QgsVectorLayer(
"Point?field=fldtxt:string&field=fldint:integer", "testmem", "memory"
)
self.assertTrue(layer.isValid())
pr = layer.dataProvider()
f = QgsFeature()
f.setAttributes(["test", 123])
f.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(100, 200)))
f2 = QgsFeature()
f2.setAttributes(["test2", 457])
f2.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(100, 200)))
self.assertTrue(pr.addFeatures([f, f2]))
self.assertEqual(layer.featureCount(), 2)
QgsProject.instance().addMapLayer(layer)
context = QgsProcessingContext()
context.setProject(QgsProject.instance())
alg = QgsApplication.processingRegistry().createAlgorithmById(
"gdal:buffervectors"
)
self.assertIsNotNone(alg)
parameters = {"INPUT": "testmem"}
feedback = QgsProcessingFeedback()
# check that memory layer is automatically saved out to geopackage when required by GDAL algorithms
input_details = alg.getOgrCompatibleSource(
"INPUT", parameters, context, feedback, executing=True
)
self.assertTrue(input_details.connection_string)
self.assertTrue(input_details.connection_string.endswith(".gpkg"))
self.assertTrue(os.path.exists(input_details.connection_string))
self.assertTrue(input_details.connection_string)
# make sure that layer has correct features
res = QgsVectorLayer(input_details.connection_string, "res")
self.assertTrue(res.isValid())
self.assertEqual(res.featureCount(), 2)
# with memory layers - if not executing layer source should be ignored and replaced
# with a dummy path, because:
# - it has no meaning for the gdal command outside of QGIS, memory layers don't exist!
# - we don't want to force an export of the whole memory layer to a temp file just to show the command preview
# this might be very slow!
input_details = alg.getOgrCompatibleSource(
"INPUT", parameters, context, feedback, executing=False
)
self.assertEqual(input_details.connection_string, "path_to_data_file")
self.assertEqual(input_details.layer_name, "layer_name")
QgsProject.instance().removeMapLayer(layer)
def testGetOgrCompatibleSourceFromOgrLayer(self):
p = QgsProject()
source = os.path.join(testDataPath, "points.gml")
vl = QgsVectorLayer(source)
self.assertTrue(vl.isValid())
p.addMapLayer(vl)
context = QgsProcessingContext()
context.setProject(p)
feedback = QgsProcessingFeedback()
alg = ogr2ogr()
alg.initAlgorithm()
input_details = alg.getOgrCompatibleSource(
"INPUT", {"INPUT": vl.id()}, context, feedback, True
)
self.assertEqual(input_details.connection_string, source)
input_details = alg.getOgrCompatibleSource(
"INPUT", {"INPUT": vl.id()}, context, feedback, False
)
self.assertEqual(input_details.connection_string, source)
# with selected features only - if not executing, the 'selected features only' setting
# should be ignored (because it has no meaning for the gdal command outside of QGIS!)
parameters = {"INPUT": QgsProcessingFeatureSourceDefinition(vl.id(), True)}
input_details = alg.getOgrCompatibleSource(
"INPUT", parameters, context, feedback, False
)
self.assertEqual(input_details.connection_string, source)
# with subset string
vl.setSubsetString("x")
input_details = alg.getOgrCompatibleSource(
"INPUT", parameters, context, feedback, False
)
self.assertEqual(input_details.connection_string, source)
# subset of layer must be exported
input_details = alg.getOgrCompatibleSource(
"INPUT", parameters, context, feedback, True
)
self.assertNotEqual(input_details.connection_string, source)
self.assertTrue(input_details.connection_string)
self.assertTrue(input_details.connection_string.endswith(".gpkg"))
self.assertTrue(os.path.exists(input_details.connection_string))
self.assertTrue(input_details.layer_name)
# geopackage with layer
source = os.path.join(testDataPath, "custom", "circular_strings.gpkg")
vl2 = QgsVectorLayer(source + "|layername=circular_strings")
self.assertTrue(vl2.isValid())
p.addMapLayer(vl2)
input_details = alg.getOgrCompatibleSource(
"INPUT", {"INPUT": vl2.id()}, context, feedback, True
)
self.assertEqual(input_details.connection_string, source)
self.assertEqual(input_details.layer_name, "circular_strings")
vl3 = QgsVectorLayer(source + "|layername=circular_strings_with_line")
self.assertTrue(vl3.isValid())
p.addMapLayer(vl3)
input_details = alg.getOgrCompatibleSource(
"INPUT", {"INPUT": vl3.id()}, context, feedback, True
)
self.assertEqual(input_details.connection_string, source)
self.assertEqual(input_details.layer_name, "circular_strings_with_line")
def testGetOgrCompatibleSourceFromFeatureSource(self):
# create a memory layer and add to project and context
layer = QgsVectorLayer(
"Point?field=fldtxt:string&field=fldint:integer", "testmem", "memory"
)
self.assertTrue(layer.isValid())
pr = layer.dataProvider()
f = QgsFeature()
f.setAttributes(["test", 123])
f.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(100, 200)))
f2 = QgsFeature()
f2.setAttributes(["test2", 457])
f2.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(100, 200)))
self.assertTrue(pr.addFeatures([f, f2]))
self.assertEqual(layer.featureCount(), 2)
# select first feature
layer.selectByIds([next(layer.getFeatures()).id()])
self.assertEqual(len(layer.selectedFeatureIds()), 1)
QgsProject.instance().addMapLayer(layer)
context = QgsProcessingContext()
context.setProject(QgsProject.instance())
alg = QgsApplication.processingRegistry().createAlgorithmById(
"gdal:buffervectors"
)
self.assertIsNotNone(alg)
parameters = {"INPUT": QgsProcessingFeatureSourceDefinition("testmem", True)}
feedback = QgsProcessingFeedback()
# check that memory layer is automatically saved out to geopackage when required by GDAL algorithms
input_details = alg.getOgrCompatibleSource(
"INPUT", parameters, context, feedback, executing=True
)
self.assertTrue(input_details.connection_string)
self.assertTrue(input_details.connection_string.endswith(".gpkg"))
self.assertTrue(os.path.exists(input_details.connection_string))
self.assertTrue(input_details.layer_name)
# make sure that layer has only selected feature
res = QgsVectorLayer(input_details.connection_string, "res")
self.assertTrue(res.isValid())
self.assertEqual(res.featureCount(), 1)
QgsProject.instance().removeMapLayer(layer)
def testOgrOutputLayerName(self):
self.assertEqual(GdalUtils.ogrOutputLayerName("/home/me/out.shp"), "out")
self.assertEqual(
GdalUtils.ogrOutputLayerName("d:/test/test_out.shp"), "test_out"
)
self.assertEqual(
GdalUtils.ogrOutputLayerName("d:/test/TEST_OUT.shp"), "TEST_OUT"
)
self.assertEqual(
GdalUtils.ogrOutputLayerName("d:/test/test_out.gpkg"), "test_out"
)
def testOgrLayerNameExtraction(self):
with tempfile.TemporaryDirectory() as outdir:
def _copyFile(dst):
shutil.copyfile(
os.path.join(testDataPath, "custom", "weighted.csv"), dst
)
# OGR provider - single layer
_copyFile(os.path.join(outdir, "a.csv"))
name = GdalUtils.ogrLayerName(outdir)
self.assertEqual(name, "a")
# OGR provider - multiple layers
_copyFile(os.path.join(outdir, "b.csv"))
name1 = GdalUtils.ogrLayerName(outdir + "|layerid=0")
name2 = GdalUtils.ogrLayerName(outdir + "|layerid=1")
self.assertEqual(sorted([name1, name2]), ["a", "b"])
name = GdalUtils.ogrLayerName(outdir + "|layerid=2")
self.assertIsNone(name)
# OGR provider - layername takes precedence
name = GdalUtils.ogrLayerName(outdir + "|layername=f")
self.assertEqual(name, "f")
name = GdalUtils.ogrLayerName(outdir + "|layerid=0|layername=f")
self.assertEqual(name, "f")
name = GdalUtils.ogrLayerName(outdir + "|layername=f|layerid=0")
self.assertEqual(name, "f")
# SQLite provider
name = GdalUtils.ogrLayerName(
"dbname='/tmp/x.sqlite' table=\"t\" (geometry) sql="
)
self.assertEqual(name, "t")
# PostgreSQL provider
name = GdalUtils.ogrLayerName(
'port=5493 sslmode=disable key=\'edge_id\' srid=0 type=LineString table="city_data"."edge" (geom) sql='
)
self.assertEqual(name, "city_data.edge")
def test_gdal_connection_details_from_uri(self):
context = QgsProcessingContext()
output_details = GdalUtils.gdal_connection_details_from_uri(
"d:/test/test.shp", context
)
self.assertEqual(output_details.connection_string, "d:/test/test.shp")
self.assertEqual(output_details.format, '"ESRI Shapefile"')
output_details = GdalUtils.gdal_connection_details_from_uri(
"d:/test/test.mif", context
)
self.assertEqual(output_details.connection_string, "d:/test/test.mif")
self.assertEqual(output_details.format, '"MapInfo File"')
def testConnectionString(self):
alg = OgrToPostGis()
alg.initAlgorithm()
parameters = {}
feedback = QgsProcessingFeedback()
context = QgsProcessingContext()
# NOTE: defaults are debatable, see
# https://github.com/qgis/QGIS/pull/3607#issuecomment-253971020
self.assertEqual(
alg.getConnectionString(parameters, context),
"host=localhost port=5432 active_schema=public",
)
parameters["HOST"] = "remote"
self.assertEqual(
alg.getConnectionString(parameters, context),
"host=remote port=5432 active_schema=public",
)
parameters["HOST"] = ""
self.assertEqual(
alg.getConnectionString(parameters, context),
"port=5432 active_schema=public",
)
parameters["PORT"] = "5555"
self.assertEqual(
alg.getConnectionString(parameters, context),
"port=5555 active_schema=public",
)
parameters["PORT"] = ""
self.assertEqual(
alg.getConnectionString(parameters, context), "active_schema=public"
)
parameters["USER"] = "usr"
self.assertEqual(
alg.getConnectionString(parameters, context),
"active_schema=public user=usr",
)
parameters["PASSWORD"] = "pwd"
self.assertEqual(
alg.getConnectionString(parameters, context),
"password=pwd active_schema=public user=usr",
)
def testCrsConversion(self):
self.assertFalse(GdalUtils.gdal_crs_string(QgsCoordinateReferenceSystem()))
self.assertEqual(
GdalUtils.gdal_crs_string(QgsCoordinateReferenceSystem("EPSG:3111")),
"EPSG:3111",
)
self.assertEqual(
GdalUtils.gdal_crs_string(QgsCoordinateReferenceSystem("POSTGIS:3111")),
"EPSG:3111",
)
self.assertEqual(
GdalUtils.gdal_crs_string(
QgsCoordinateReferenceSystem(
"proj4: +proj=utm +zone=36 +south +a=6378249.145 +b=6356514.966398753 +towgs84=-143,-90,-294,0,0,0,0 +units=m +no_defs"
)
),
"EPSG:20936",
)
crs = QgsCoordinateReferenceSystem()
crs.createFromProj(
"+proj=utm +zone=36 +south +a=600000 +b=70000 +towgs84=-143,-90,-294,0,0,0,0 +units=m +no_defs"
)
self.assertTrue(crs.isValid())
# proj 6, WKT should be used
self.assertEqual(
GdalUtils.gdal_crs_string(crs)[:40],
'BOUNDCRS[SOURCECRS[PROJCRS["unknown",BAS',
)
self.assertEqual(
GdalUtils.gdal_crs_string(QgsCoordinateReferenceSystem("ESRI:102003")),
"ESRI:102003",
)
def testEscapeAndJoin(self):
self.assertEqual(
GdalUtils.escapeAndJoin([1, "a", "a b", "a&b", "a(b)", ";"]),
'1 a "a b" "a&b" "a(b)" ";"',
)
self.assertEqual(
GdalUtils.escapeAndJoin([1, "-srcnodata", "--srcnodata", "-9999 9999"]),
'1 -srcnodata --srcnodata "-9999 9999"',
)
if __name__ == "__main__":
nose2.main()