QGIS/python/plugins/processing/tests/AlgorithmsTestBase.py
2025-07-25 12:36:35 +10:00

698 lines
28 KiB
Python

"""
***************************************************************************
AlgorithmsTest.py
---------------------
Date : January 2016
Copyright : (C) 2016 by Matthias Kuhn
Email : matthias@opengis.ch
***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************
"""
__author__ = "Matthias Kuhn"
__date__ = "January 2016"
__copyright__ = "(C) 2016, Matthias Kuhn"
import os
import yaml
import nose2
import shutil
import glob
import hashlib
import tempfile
import re
from osgeo import gdal
from osgeo.gdalconst import GA_ReadOnly
from numpy import nan_to_num
from copy import deepcopy
from qgis.PyQt.QtCore import QT_VERSION
from qgis.core import (
Qgis,
QgsVectorLayer,
QgsRasterLayer,
QgsCoordinateReferenceSystem,
QgsFeatureRequest,
QgsMapLayer,
QgsMeshLayer,
QgsProject,
QgsApplication,
QgsProcessingContext,
QgsProcessingUtils,
QgsProcessingFeedback,
QgsProperty,
)
from qgis.analysis import QgsNativeAlgorithms
from qgis.testing import _UnexpectedSuccess, QgisTestCase, start_app
from utilities import unitTestDataPath
import processing
gdal.UseExceptions()
def GDAL_COMPUTE_VERSION(maj, min, rev):
return (maj) * 1000000 + (min) * 10000 + (rev) * 100
def processingTestDataPath():
return os.path.join(os.path.dirname(__file__), "testdata")
class AlgorithmsTest:
def test_algorithms(self):
"""
This is the main test function. All others will be executed based on the definitions in testdata/algorithm_tests.yaml
"""
with open(
os.path.join(processingTestDataPath(), self.definition_file())
) as stream:
algorithm_tests = yaml.load(stream, Loader=yaml.SafeLoader)
if "tests" in algorithm_tests and algorithm_tests["tests"] is not None:
for idx, algtest in enumerate(algorithm_tests["tests"]):
condition = algtest.get("condition")
if condition:
geos_condition = condition.get("geos")
if geos_condition:
less_than_condition = geos_condition.get("less_than")
if less_than_condition:
if Qgis.geosVersionInt() >= less_than_condition:
print(
"!!! Skipping {}, requires GEOS < {}, have version {}".format(
algtest["name"],
less_than_condition,
Qgis.geosVersionInt(),
)
)
continue
at_least_condition = geos_condition.get("at_least")
if at_least_condition:
if Qgis.geosVersionInt() < at_least_condition:
print(
"!!! Skipping {}, requires GEOS >= {}, have version {}".format(
algtest["name"],
at_least_condition,
Qgis.geosVersionInt(),
)
)
continue
gdal_condition = condition.get("gdal")
if gdal_condition:
less_than_condition = gdal_condition.get("less_than")
if less_than_condition:
if (
int(gdal.VersionInfo("VERSION_NUM"))
>= less_than_condition
):
print(
"!!! Skipping {}, requires GDAL < {}, have version {}".format(
algtest["name"],
less_than_condition,
gdal.VersionInfo("VERSION_NUM"),
)
)
continue
at_least_condition = gdal_condition.get("at_least")
if at_least_condition:
if (
int(gdal.VersionInfo("VERSION_NUM"))
< at_least_condition
):
print(
"!!! Skipping {}, requires GDAL >= {}, have version {}".format(
algtest["name"],
at_least_condition,
gdal.VersionInfo("VERSION_NUM"),
)
)
continue
qt_condition = condition.get("qt")
if qt_condition:
less_than_condition = qt_condition.get("less_than")
if less_than_condition:
if QT_VERSION >= less_than_condition:
print(
"!!! Skipping {}, requires Qt < {}, have version {}".format(
algtest["name"], less_than_condition, QT_VERSION
)
)
continue
at_least_condition = qt_condition.get("at_least")
if at_least_condition:
if QT_VERSION < at_least_condition:
print(
"!!! Skipping {}, requires Qt >= {}, have version {}".format(
algtest["name"], at_least_condition, QT_VERSION
)
)
continue
print(
'About to start {} of {}: "{}"'.format(
idx, len(algorithm_tests["tests"]), algtest["name"]
)
)
yield self.check_algorithm, algtest["name"], algtest
def check_algorithm(self, name, defs):
"""
Will run an algorithm definition and check if it generates the expected result
:param name: The identifier name used in the test output heading
:param defs: A python dict containing a test algorithm definition
"""
self.vector_layer_params = {}
QgsProject.instance().clear()
if "project" in defs:
full_project_path = os.path.join(processingTestDataPath(), defs["project"])
project_read_success = QgsProject.instance().read(full_project_path)
self.assertTrue(
project_read_success, "Failed to load project file: " + defs["project"]
)
if "project_crs" in defs:
QgsProject.instance().setCrs(
QgsCoordinateReferenceSystem(defs["project_crs"])
)
else:
QgsProject.instance().setCrs(QgsCoordinateReferenceSystem())
if "ellipsoid" in defs:
QgsProject.instance().setEllipsoid(defs["ellipsoid"])
else:
QgsProject.instance().setEllipsoid("")
params = self.load_params(defs["params"])
print('Running alg: "{}"'.format(defs["algorithm"]))
alg = QgsApplication.processingRegistry().createAlgorithmById(defs["algorithm"])
parameters = {}
if isinstance(params, list):
for param in zip(alg.parameterDefinitions(), params):
parameters[param[0].name()] = param[1]
else:
for k, p in params.items():
parameters[k] = p
for r, p in list(defs["results"].items()):
if "in_place_result" not in p or not p["in_place_result"]:
parameters[r] = self.load_result_param(p)
expectFailure = False
if "expectedFailure" in defs:
exec(("\n".join(defs["expectedFailure"][:-1])), globals(), locals())
expectFailure = eval(defs["expectedFailure"][-1])
if "expectedException" in defs:
expectFailure = True
# ignore user setting for invalid geometry handling
context = QgsProcessingContext()
context.setProject(QgsProject.instance())
if "ellipsoid" in defs:
# depending on the project settings, we can't always rely
# on QgsProject.ellipsoid() returning the same ellipsoid as was
# specified in the test definition. So just force ensure that the
# context's ellipsoid is the desired one
context.setEllipsoid(defs["ellipsoid"])
if "skipInvalid" in defs and defs["skipInvalid"]:
context.setInvalidGeometryCheck(
QgsFeatureRequest.InvalidGeometryCheck.GeometrySkipInvalid
)
feedback = QgsProcessingFeedback()
print(f"Algorithm parameters are {parameters}")
# first check that algorithm accepts the parameters we pass...
ok, msg = alg.checkParameterValues(parameters, context)
self.assertTrue(ok, f"Algorithm failed checkParameterValues with result {msg}")
if expectFailure:
try:
results, ok = alg.run(parameters, context, feedback)
self.check_results(results, context, parameters, defs["results"])
if ok:
raise _UnexpectedSuccess
except Exception:
pass
else:
results, ok = alg.run(parameters, context, feedback)
self.assertTrue(ok, f"params: {parameters}, results: {results}")
self.check_results(results, context, parameters, defs["results"])
def load_params(self, params):
"""
Loads an array of parameters
"""
if isinstance(params, list):
return [self.load_param(p) for p in params]
elif isinstance(params, dict):
return {key: self.load_param(p, key) for key, p in params.items()}
else:
return params
def load_param(self, param, id=None):
"""
Loads a parameter. If it's not a map, the parameter will be returned as-is. If it is a map, it will process the
parameter based on its key `type` and return the appropriate parameter to pass to the algorithm.
"""
try:
if param["type"] in ("vector", "raster", "table", "mesh"):
return self.load_layer(id, param).id()
elif param["type"] == "vrtlayers":
vals = []
for p in param["params"]:
p["layer"] = self.load_layer(
None, {"type": "vector", "name": p["layer"]}
)
vals.append(p)
return vals
elif param["type"] == "multi":
return [self.load_param(p) for p in param["params"]]
elif param["type"] == "file":
return self.filepath_from_param(param)
elif param["type"] == "interpolation":
prefix = processingTestDataPath()
tmp = ""
for r in param["name"].split("::|::"):
v = r.split("::~::")
tmp += "{}::~::{}::~::{}::~::{}::|::".format(
os.path.join(prefix, v[0]), v[1], v[2], v[3]
)
# trim final separator ::|::
return tmp[:-5]
elif param["type"] == "property":
if param.get("expression"):
return QgsProperty.fromExpression(param["expression"])
except TypeError:
# No type specified, use whatever is there
return param
raise KeyError(
"Unknown type '{}' specified for parameter".format(param["type"])
)
def load_result_param(self, param):
"""
Loads a result parameter. Creates a temporary destination where the result should go to and returns this location
so it can be sent to the algorithm as parameter.
"""
if param["type"] in ["vector", "file", "table", "regex"]:
outdir = tempfile.mkdtemp()
self.cleanup_paths.append(outdir)
if isinstance(param["name"], str):
basename = os.path.basename(param["name"])
else:
basename = os.path.basename(param["name"][0])
filepath = self.uri_path_join(outdir, basename)
return filepath
elif param["type"] == "rasterhash":
outdir = tempfile.mkdtemp()
self.cleanup_paths.append(outdir)
basename = "raster.tif"
filepath = os.path.join(outdir, basename)
return filepath
elif param["type"] == "directory":
outdir = tempfile.mkdtemp()
return outdir
raise KeyError(
"Unknown type '{}' specified for parameter".format(param["type"])
)
def load_layers(self, id, param):
layers = []
if param["type"] in ("vector", "table"):
if isinstance(param["name"], str) or "uri" in param:
layers.append(self.load_layer(id, param))
else:
for n in param["name"]:
layer_param = deepcopy(param)
layer_param["name"] = n
layers.append(self.load_layer(id, layer_param))
else:
layers.append(self.load_layer(id, param))
return layers
def load_layer(self, id, param):
"""
Loads a layer which was specified as parameter.
"""
filepath = self.filepath_from_param(param)
if "in_place" in param and param["in_place"]:
# check if alg modifies layer in place
tmpdir = tempfile.mkdtemp()
self.cleanup_paths.append(tmpdir)
path, file_name = os.path.split(filepath)
base, ext = os.path.splitext(file_name)
for file in glob.glob(os.path.join(path, f"{base}.*")):
shutil.copy(os.path.join(path, file), tmpdir)
filepath = os.path.join(tmpdir, file_name)
self.in_place_layers[id] = filepath
if param["type"] in ("vector", "table"):
gmlrex = r"\.gml\b"
if re.search(gmlrex, filepath, re.IGNORECASE):
# ewwwww - we have to force SRS detection for GML files, otherwise they'll be loaded
# with no srs
filepath += "|option:FORCE_SRS_DETECTION=YES"
if filepath in self.vector_layer_params:
return self.vector_layer_params[filepath]
options = QgsVectorLayer.LayerOptions()
options.loadDefaultStyle = False
lyr = QgsVectorLayer(filepath, param["name"], "ogr", options)
self.vector_layer_params[filepath] = lyr
elif param["type"] == "raster":
options = QgsRasterLayer.LayerOptions()
options.loadDefaultStyle = False
lyr = QgsRasterLayer(filepath, param["name"], "gdal", options)
elif param["type"] == "mesh":
lyr = QgsMeshLayer(filepath, param["name"], "mdal")
self.assertTrue(
lyr.isValid(), f'Could not load layer "{filepath}" from param {param}'
)
QgsProject.instance().addMapLayer(lyr)
return lyr
def filepath_from_param(self, param):
"""
Creates a filepath from a param
"""
prefix = processingTestDataPath()
if "location" in param and param["location"] == "qgs":
prefix = unitTestDataPath()
if "uri" in param:
path = param["uri"]
else:
path = param["name"]
if not path:
return None
return self.uri_path_join(prefix, path)
def uri_path_join(self, prefix, filepath):
if filepath.startswith("ogr:"):
if not prefix[-1] == os.path.sep:
prefix += os.path.sep
filepath = re.sub(r"dbname='", f"dbname='{prefix}", filepath)
else:
filepath = os.path.join(prefix, filepath)
return filepath
def check_results(self, results, context, params, expected):
"""
Checks if result produced by an algorithm matches with the expected specification.
"""
for id, expected_result in expected.items():
if expected_result["type"] in ("vector", "table"):
if "compare" in expected_result and not expected_result["compare"]:
# skipping the comparison, so just make sure output is valid
if isinstance(results[id], QgsMapLayer):
result_lyr = results[id]
else:
result_lyr = QgsProcessingUtils.mapLayerFromString(
results[id], context
)
self.assertTrue(result_lyr.isValid())
continue
expected_lyrs = self.load_layers(id, expected_result)
if "in_place_result" in expected_result:
result_lyr = QgsProcessingUtils.mapLayerFromString(
self.in_place_layers[id], context
)
self.assertTrue(result_lyr.isValid(), self.in_place_layers[id])
else:
try:
results[id]
except KeyError as e:
raise KeyError(
f"Expected result {str(e)} does not exist in {list(results.keys())}"
)
if isinstance(results[id], QgsMapLayer):
result_lyr = results[id]
else:
string = results[id]
gmlrex = r"\.gml\b"
if re.search(gmlrex, string, re.IGNORECASE):
# ewwwww - we have to force SRS detection for GML files, otherwise they'll be loaded
# with no srs
string += "|option:FORCE_SRS_DETECTION=YES"
result_lyr = QgsProcessingUtils.mapLayerFromString(
string, context
)
self.assertTrue(result_lyr, results[id])
compare = expected_result.get("compare", {})
pk = expected_result.get("pk", None)
if len(expected_lyrs) == 1:
self.assertLayersEqual(
expected_lyrs[0], result_lyr, compare=compare, pk=pk
)
else:
res = False
for l in expected_lyrs:
if self.checkLayersEqual(l, result_lyr, compare=compare, pk=pk):
res = True
break
self.assertTrue(
res, "Could not find matching layer in expected results"
)
elif "rasterhash" == expected_result["type"]:
print(f"id:{id} result:{results[id]}")
self.assertTrue(
os.path.exists(results[id]),
f"File does not exist: {results[id]}, {params}",
)
dataset = gdal.Open(results[id], GA_ReadOnly)
dataArray = nan_to_num(dataset.ReadAsArray(0))
strhash = hashlib.sha224(dataArray.data).hexdigest()
if not isinstance(expected_result["hash"], str):
self.assertIn(strhash, expected_result["hash"])
else:
self.assertEqual(strhash, expected_result["hash"])
elif "file" == expected_result["type"]:
result_filepath = results[id]
if isinstance(expected_result.get("name"), list):
# test to see if any match expected
for path in expected_result["name"]:
expected_filepath = self.filepath_from_param({"name": path})
if self.checkFilesEqual(expected_filepath, result_filepath):
break
else:
expected_filepath = self.filepath_from_param(
{"name": expected_result["name"][0]}
)
else:
expected_filepath = self.filepath_from_param(expected_result)
self.assertFilesEqual(expected_filepath, result_filepath)
elif "directory" == expected_result["type"]:
expected_dirpath = self.filepath_from_param(expected_result)
result_dirpath = results[id]
self.assertDirectoriesEqual(expected_dirpath, result_dirpath)
elif "regex" == expected_result["type"]:
with open(results[id]) as file:
data = file.read()
for rule in expected_result.get("rules", []):
self.assertRegex(data, rule)
class GenericAlgorithmsTest(QgisTestCase):
"""
General (non-provider specific) algorithm tests
"""
@classmethod
def setUpClass(cls):
start_app()
from processing.core.Processing import Processing
Processing.initialize()
cls.cleanup_paths = []
@classmethod
def tearDownClass(cls):
from processing.core.Processing import Processing
Processing.deinitialize()
for path in cls.cleanup_paths:
shutil.rmtree(path)
def testAlgorithmCompliance(self):
for p in QgsApplication.processingRegistry().providers():
print(f"testing provider {p.id()}")
for a in p.algorithms():
print(f"testing algorithm {a.id()}")
self.check_algorithm(a)
def check_algorithm(self, alg):
# check that calling helpUrl() works without error
alg.helpUrl()
if alg.provider().id() in ("qgis", "native", "3d", "pdal"):
if alg.id() not in (
"native:exportmeshedges",
"native:exportmeshfaces",
"native:exportmeshongrid",
"native:exportmeshvertices",
"native:intersection",
"native:meshcontours",
"native:meshexportcrosssection",
"native:meshexporttimeseries",
"native:meshrasterize",
"native:surfacetopolygon",
"qgis:advancedpythonfieldcalculator",
"qgis:barplot",
"qgis:boxplot",
"qgis:distancematrix",
"qgis:distancetonearesthublinetohub",
"qgis:distancetonearesthubpoints",
"qgis:eliminateselectedpolygons",
"qgis:executesql",
"qgis:generatepointspixelcentroidsalongline",
"qgis:hypsometriccurves",
"qgis:idwinterpolation",
"qgis:knearestconcavehull",
"qgis:meanandstandarddeviationplot",
"qgis:pointsdisplacement",
"qgis:polarplot",
"qgis:randomextractwithinsubsets",
"qgis:randompointsalongline",
"qgis:randompointsinlayerbounds",
"qgis:randompointsinsidepolygons",
"qgis:randomselection",
"qgis:randomselectionwithinsubsets",
"qgis:rastercalculator",
"qgis:rasterlayerhistogram",
"qgis:rectanglesovalsdiamondsvariable",
"qgis:regularpoints",
"qgis:relief",
"qgis:scatter3dplot",
"qgis:selectbyexpression",
"qgis:setstyleforrasterlayer",
"qgis:setstyleforvectorlayer",
"qgis:texttofloat",
"qgis:tininterpolation",
"qgis:variabledistancebuffer",
"qgis:vectorlayerhistogram",
"qgis:vectorlayerscatterplot",
):
self.assertTrue(
alg.tags(),
f"Algorithm {alg.id()} has no tags!",
)
if alg.id() not in ("qgis:rectanglesovalsdiamondsvariable",):
self.assertTrue(
alg.shortHelpString(),
f"Algorithm {alg.id()} has no shortHelpString!",
)
if alg.id() not in (
"native:createspatialindex",
"native:tilesxyzdirectory",
"native:tilesxyzmbtiles",
"pdal:assignprojection",
"pdal:boundary",
"pdal:clip",
"pdal:convertformat",
"pdal:createcopc",
"pdal:density",
"pdal:exportraster",
"pdal:exportrastertin",
"pdal:exportvector",
"pdal:filter",
"pdal:info",
"pdal:merge",
"pdal:reproject",
"pdal:thinbydecimate",
"pdal:thinbyradius",
"pdal:tile",
"pdal:virtualpointcloud",
"qgis:advancedpythonfieldcalculator",
"qgis:distancematrix",
"qgis:distancetonearesthublinetohub",
"qgis:distancetonearesthubpoints",
"qgis:eliminateselectedpolygons",
"qgis:executesql",
"qgis:generatepointspixelcentroidsalongline",
"qgis:hypsometriccurves",
"qgis:linestopolygons",
"qgis:pointsdisplacement",
"qgis:randomextractwithinsubsets",
"qgis:randompointsalongline",
"qgis:randompointsinlayerbounds",
"qgis:randompointsinsidepolygons",
"qgis:rastercalculator",
"qgis:rectanglesovalsdiamondsvariable",
"qgis:regularpoints",
"qgis:relief",
"qgis:setstyleforrasterlayer",
"qgis:setstyleforvectorlayer",
"qgis:statisticsbycategories",
"qgis:variabledistancebuffer",
):
self.assertTrue(
alg.shortDescription(),
f"Algorithm {alg.id()} has not shortDescription!",
)
if alg.shortDescription():
self.assertTrue(
alg.shortDescription()[0].isupper(),
f'Algorithm {alg.id()} shortDescription does not start with capital! "{alg.shortDescription()}"',
)
self.assertEqual(
alg.shortDescription()[-1],
".",
f'Algorithm {alg.id()} shortDescription does not end with full stop "{alg.shortDescription()}"',
)
self.assertFalse(
alg.shortDescription().lower().startswith("this algorithm"),
f'Algorithm {alg.id()} shortDescription should NOT start with eg "This algorithm computes...", just use "Computes..." instead: "{alg.shortDescription()}"',
)
first_word = alg.shortDescription().split(" ")[0].lower()
if first_word not in ("randomly",):
self.assertEqual(
first_word[-1],
"s",
f'Algorithm {alg.id()} shortDescription should start with a verb ending in s, eg "Combines", "Creates",... "{alg.shortDescription()}"',
)
self.assertFalse(
"</" in alg.shortDescription(),
f'Algorithm {alg.id()} shortDescription should not contain any HTML formatting "{alg.shortDescription()}"',
)
if __name__ == "__main__":
nose2.main()