QGIS/python/plugins/grassprovider/grass_algorithm.py

1376 lines
54 KiB
Python

"""
***************************************************************************
grass_algorithm.py
---------------------
Date : February 2015
Copyright : (C) 2014-2015 by Victor Olaya
Email : volayaf at gmail dot com
***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************
"""
__author__ = "Victor Olaya"
__date__ = "February 2015"
__copyright__ = "(C) 2012-2015, Victor Olaya"
from typing import Dict, Optional
import sys
import os
import uuid
import math
import importlib
from pathlib import Path
from qgis.PyQt.QtCore import QCoreApplication, QUrl
from qgis.core import (
Qgis,
QgsMapLayer,
QgsRasterLayer,
QgsApplication,
QgsMapLayerType,
QgsCoordinateReferenceSystem,
QgsProcessingUtils,
QgsProcessing,
QgsMessageLog,
QgsVectorFileWriter,
QgsProcessingContext,
QgsProcessingAlgorithm,
QgsProcessingParameterDefinition,
QgsProcessingException,
QgsProcessingParameterCrs,
QgsProcessingParameterExtent,
QgsProcessingParameterEnum,
QgsProcessingParameterNumber,
QgsProcessingParameterString,
QgsProcessingParameterField,
QgsProcessingParameterPoint,
QgsProcessingParameterBoolean,
QgsProcessingParameterRange,
QgsProcessingParameterFeatureSource,
QgsProcessingParameterVectorLayer,
QgsProcessingParameterRasterLayer,
QgsProcessingParameterMultipleLayers,
QgsProcessingParameterVectorDestination,
QgsProcessingParameterRasterDestination,
QgsProcessingParameterFileDestination,
QgsProcessingParameterFile,
QgsProcessingParameterFolderDestination,
QgsProcessingOutputHtml,
QgsVectorLayer,
QgsProviderRegistry,
)
from qgis.utils import iface
import warnings
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
from osgeo import ogr
from processing.core.ProcessingConfig import ProcessingConfig
from processing.core.parameters import getParameterFromString
from grassprovider.parsed_description import ParsedDescription
from grassprovider.grass_utils import GrassUtils
from processing.tools.system import isWindows, getTempFilename
pluginPath = os.path.normpath(
os.path.join(os.path.split(os.path.dirname(__file__))[0], os.pardir)
)
class GrassAlgorithm(QgsProcessingAlgorithm):
GRASS_OUTPUT_TYPE_PARAMETER = "GRASS_OUTPUT_TYPE_PARAMETER"
GRASS_MIN_AREA_PARAMETER = "GRASS_MIN_AREA_PARAMETER"
GRASS_SNAP_TOLERANCE_PARAMETER = "GRASS_SNAP_TOLERANCE_PARAMETER"
GRASS_REGION_EXTENT_PARAMETER = "GRASS_REGION_PARAMETER"
GRASS_REGION_CELLSIZE_PARAMETER = "GRASS_REGION_CELLSIZE_PARAMETER"
GRASS_REGION_ALIGN_TO_RESOLUTION = "GRASS_REGION_ALIGN_TO_RESOLUTION"
GRASS_RASTER_FORMAT_OPT = "GRASS_RASTER_FORMAT_OPT"
GRASS_RASTER_FORMAT_META = "GRASS_RASTER_FORMAT_META"
GRASS_VECTOR_DSCO = "GRASS_VECTOR_DSCO"
GRASS_VECTOR_LCO = "GRASS_VECTOR_LCO"
GRASS_VECTOR_EXPORT_NOCAT = "GRASS_VECTOR_EXPORT_NOCAT"
OUTPUT_TYPES = ["auto", "point", "line", "area"]
QGIS_OUTPUT_TYPES = {
QgsProcessing.SourceType.TypeVectorAnyGeometry: "auto",
QgsProcessing.SourceType.TypeVectorPoint: "point",
QgsProcessing.SourceType.TypeVectorLine: "line",
QgsProcessing.SourceType.TypeVectorPolygon: "area",
}
def __init__(
self,
description_file: Optional[Path] = None,
json_definition: Optional[dict] = None,
description_folder: Optional[Path] = None,
):
super().__init__()
self._name = ""
self._display_name = ""
self._short_description = ""
self._group = ""
self._groupId = ""
self.grass_name = ""
self.params = []
self.hardcodedStrings = []
self.inputLayers = []
self.commands = []
self.outputCommands = []
self.exportedLayers = {}
self.fileOutputs = {}
self._description_file: Optional[Path] = description_file
self._json_definition: Optional[dict] = json_definition
self._description_folder: Optional[Path] = description_folder
# Default GRASS parameters
self.region = None
self.cellSize = None
self.snapTolerance = None
self.outputType = None
self.minArea = None
self.alignToResolution = None
# destination Crs for combineLayerExtents, will be set from layer or mapSettings
self.destination_crs = QgsCoordinateReferenceSystem()
# Load parameters from a description file
if self._description_file is not None:
self._define_characteristics_from_file()
else:
self._define_characteristics_from_json()
self.numExportedLayers = 0
# Do we need this anymore?
self.uniqueSuffix = str(uuid.uuid4()).replace("-", "")
# Use the ext mechanism
self.module = None
try:
extpath = None
ext_name = None
if self._description_file:
ext_name = self.name().replace(".", "_")
extpath = self._description_file.parents[1].joinpath(
"ext", ext_name + ".py"
)
elif self._json_definition.get("ext_path"):
ext_name = self._json_definition["ext_path"]
extpath = self._description_folder.parents[0].joinpath(
"ext", ext_name + ".py"
)
# this check makes it a bit faster
if extpath and extpath.exists():
spec = importlib.util.spec_from_file_location(
"grassprovider.ext." + ext_name, extpath
)
self.module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(self.module)
except Exception as e:
QgsMessageLog.logMessage(
self.tr("Failed to load: {0}\n{1}").format(extpath, e),
"Processing",
Qgis.MessageLevel.Critical,
)
pass
def createInstance(self):
return self.__class__(
description_file=self._description_file,
json_definition=self._json_definition,
description_folder=self._description_folder,
)
def name(self):
return self._name
def displayName(self):
return self._display_name
def shortDescription(self):
return self._short_description
def group(self):
return self._group
def groupId(self):
return self._groupId
def icon(self):
return QgsApplication.getThemeIcon("/providerGrass.svg")
def svgIconPath(self):
return QgsApplication.iconPath("providerGrass.svg")
def flags(self):
# TODO - maybe it's safe to background thread this?
return (
super().flags()
| QgsProcessingAlgorithm.Flag.FlagNoThreading
| QgsProcessingAlgorithm.Flag.FlagDisplayNameIsLiteral
)
def tr(self, string, context=""):
if context == "":
context = self.__class__.__name__
return QCoreApplication.translate(context, string)
def helpUrl(self):
helpPath = GrassUtils.grassHelpPath()
if helpPath == "":
return None
if os.path.exists(helpPath):
return QUrl.fromLocalFile(
os.path.join(helpPath, f"{self.grass_name}.html")
).toString()
else:
return helpPath + f"{self.grass_name}.html"
def initAlgorithm(self, config=None):
"""
Algorithm initialization
"""
for p in self.params:
# We use createOutput argument for automatic output creation
self.addParameter(p, True)
def _define_characteristics_from_file(self):
"""
Create algorithm parameters and outputs from a text file.
"""
results = ParsedDescription.parse_description_file(self._description_file)
self._define_characteristics_from_parsed_description(results)
def _define_characteristics_from_json(self):
"""
Create algorithm parameters and outputs from JSON definition.
"""
results = ParsedDescription.from_dict(self._json_definition)
self._define_characteristics_from_parsed_description(results)
def _define_characteristics_from_parsed_description(
self, description: ParsedDescription
):
"""
Create algorithm parameters and outputs from parsed description
"""
self.grass_name = description.grass_command
self._name = description.name
self._short_description = description.short_description
self._display_name = description.display_name
self._group = description.group
self._groupId = description.group_id
self.hardcodedStrings = description.hardcoded_strings[:]
self.params = []
has_raster_input: bool = False
has_vector_input: bool = False
has_raster_output: bool = False
has_vector_outputs: bool = False
for param_string in description.param_strings:
try:
parameter = getParameterFromString(param_string, "GrassAlgorithm")
except Exception as e:
QgsMessageLog.logMessage(
QCoreApplication.translate(
"GrassAlgorithm", "Could not open GRASS GIS algorithm: {0}"
).format(self._name),
QCoreApplication.translate("GrassAlgorithm", "Processing"),
Qgis.MessageLevel.Critical,
)
raise e
if parameter is None:
continue
self.params.append(parameter)
if isinstance(
parameter,
(
QgsProcessingParameterVectorLayer,
QgsProcessingParameterFeatureSource,
),
):
has_vector_input = True
elif isinstance(parameter, QgsProcessingParameterRasterLayer):
has_raster_input = True
elif isinstance(parameter, QgsProcessingParameterMultipleLayers):
if parameter.layerType() < 3 or parameter.layerType() == 5:
has_vector_input = True
elif parameter.layerType() == 3:
has_raster_input = True
elif isinstance(parameter, QgsProcessingParameterVectorDestination):
has_vector_outputs = True
elif isinstance(parameter, QgsProcessingParameterRasterDestination):
has_raster_output = True
param = QgsProcessingParameterExtent(
self.GRASS_REGION_EXTENT_PARAMETER,
self.tr("GRASS GIS region extent"),
optional=True,
)
param.setFlags(
param.flags() | QgsProcessingParameterDefinition.Flag.FlagAdvanced
)
self.params.append(param)
if has_raster_output or has_raster_input:
# Add a cellsize parameter
param = QgsProcessingParameterNumber(
self.GRASS_REGION_CELLSIZE_PARAMETER,
self.tr("GRASS GIS region cellsize (leave 0 for default)"),
type=QgsProcessingParameterNumber.Type.Double,
minValue=0.0,
maxValue=sys.float_info.max + 1,
defaultValue=0.0,
)
param.setFlags(
param.flags() | QgsProcessingParameterDefinition.Flag.FlagAdvanced
)
self.params.append(param)
if has_raster_output:
# Add a createopt parameter for format export
param = QgsProcessingParameterString(
self.GRASS_RASTER_FORMAT_OPT,
self.tr("Output Rasters format options (createopt)"),
multiLine=True,
optional=True,
)
param.setFlags(
param.flags() | QgsProcessingParameterDefinition.Flag.FlagAdvanced
)
param.setHelp(self.tr("Creation options should be comma separated"))
self.params.append(param)
# Add a metadata parameter for format export
param = QgsProcessingParameterString(
self.GRASS_RASTER_FORMAT_META,
self.tr("Output Rasters format metadata options (metaopt)"),
multiLine=True,
optional=True,
)
param.setFlags(
param.flags() | QgsProcessingParameterDefinition.Flag.FlagAdvanced
)
param.setHelp(self.tr("Metadata options should be comma separated"))
self.params.append(param)
if has_vector_input:
param = QgsProcessingParameterNumber(
self.GRASS_SNAP_TOLERANCE_PARAMETER,
self.tr("v.in.ogr snap tolerance (-1 = no snap)"),
type=QgsProcessingParameterNumber.Type.Double,
minValue=-1.0,
maxValue=sys.float_info.max + 1,
defaultValue=-1.0,
)
param.setFlags(
param.flags() | QgsProcessingParameterDefinition.Flag.FlagAdvanced
)
self.params.append(param)
param = QgsProcessingParameterNumber(
self.GRASS_MIN_AREA_PARAMETER,
self.tr("v.in.ogr min area"),
type=QgsProcessingParameterNumber.Type.Double,
minValue=0.0,
maxValue=sys.float_info.max + 1,
defaultValue=0.0001,
)
param.setFlags(
param.flags() | QgsProcessingParameterDefinition.Flag.FlagAdvanced
)
self.params.append(param)
if has_vector_outputs:
# Add an optional output type
param = QgsProcessingParameterEnum(
self.GRASS_OUTPUT_TYPE_PARAMETER,
self.tr("v.out.ogr output type"),
self.OUTPUT_TYPES,
defaultValue=0,
)
param.setFlags(
param.flags() | QgsProcessingParameterDefinition.Flag.FlagAdvanced
)
self.params.append(param)
# Add a DSCO parameter for format export
param = QgsProcessingParameterString(
self.GRASS_VECTOR_DSCO,
self.tr("v.out.ogr output data source options (dsco)"),
multiLine=True,
optional=True,
)
param.setFlags(
param.flags() | QgsProcessingParameterDefinition.Flag.FlagAdvanced
)
self.params.append(param)
# Add a LCO parameter for format export
param = QgsProcessingParameterString(
self.GRASS_VECTOR_LCO,
self.tr("v.out.ogr output layer options (lco)"),
multiLine=True,
optional=True,
)
param.setFlags(
param.flags() | QgsProcessingParameterDefinition.Flag.FlagAdvanced
)
self.params.append(param)
# Add a -c flag for export
param = QgsProcessingParameterBoolean(
self.GRASS_VECTOR_EXPORT_NOCAT,
self.tr(
"Also export features without category (not labeled). Otherwise only features with category are exported"
),
False,
)
param.setFlags(
param.flags() | QgsProcessingParameterDefinition.Flag.FlagAdvanced
)
self.params.append(param)
def getDefaultCellSize(self):
"""
Determine a default cell size from all the raster layers.
"""
cellsize = 0.0
layers = [l for l in self.inputLayers if isinstance(l, QgsRasterLayer)]
for layer in layers:
cellsize = max(layer.rasterUnitsPerPixelX(), cellsize)
if cellsize == 0.0:
cellsize = 100.0
return cellsize
def grabDefaultGrassParameters(self, parameters, context):
"""
Imports default GRASS parameters (EXTENT, etc) into
object attributes for faster retrieving.
"""
# GRASS region extent
self.region = self.parameterAsExtent(
parameters, self.GRASS_REGION_EXTENT_PARAMETER, context
)
# GRASS cell size
if self.parameterDefinition(self.GRASS_REGION_CELLSIZE_PARAMETER):
self.cellSize = self.parameterAsDouble(
parameters, self.GRASS_REGION_CELLSIZE_PARAMETER, context
)
# GRASS snap tolerance
self.snapTolerance = self.parameterAsDouble(
parameters, self.GRASS_SNAP_TOLERANCE_PARAMETER, context
)
# GRASS min area
self.minArea = self.parameterAsDouble(
parameters, self.GRASS_MIN_AREA_PARAMETER, context
)
# GRASS output type
self.outputType = self.parameterAsString(
parameters, self.GRASS_OUTPUT_TYPE_PARAMETER, context
)
# GRASS align to resolution
self.alignToResolution = self.parameterAsBoolean(
parameters, self.GRASS_REGION_ALIGN_TO_RESOLUTION, context
)
def processAlgorithm(self, original_parameters, context, feedback):
if isWindows():
path = GrassUtils.grassPath()
if path == "":
raise QgsProcessingException(
self.tr(
"GRASS GIS folder is not configured. Please "
"configure it before running GRASS GIS algorithms."
)
)
# make a copy of the original parameters dictionary - it gets modified by grass algorithms
parameters = {k: v for k, v in original_parameters.items()}
# Create brand new commands lists
self.commands = []
self.outputCommands = []
self.exportedLayers = {}
self.fileOutputs = {}
# If GRASS session has been created outside of this algorithm then
# get the list of layers loaded in GRASS otherwise start a new
# session
existingSession = GrassUtils.sessionRunning
if existingSession:
self.exportedLayers = GrassUtils.getSessionLayers()
else:
GrassUtils.startGrassSession()
# Handle default GRASS parameters
self.grabDefaultGrassParameters(parameters, context)
# Handle ext functions for inputs/command/outputs
for fName in ["Inputs", "Command", "Outputs"]:
fullName = f"process{fName}"
if self.module and hasattr(self.module, fullName):
getattr(self.module, fullName)(self, parameters, context, feedback)
else:
getattr(self, fullName)(parameters, context, feedback)
# Run GRASS
loglines = [self.tr("GRASS GIS execution commands")]
for line in self.commands:
feedback.pushCommandInfo(line)
loglines.append(line)
if ProcessingConfig.getSetting(GrassUtils.GRASS_LOG_COMMANDS):
QgsMessageLog.logMessage(
"\n".join(loglines), self.tr("Processing"), Qgis.MessageLevel.Info
)
GrassUtils.executeGrass(self.commands, feedback, self.outputCommands)
# If the session has been created outside of this algorithm, add
# the new GRASS GIS layers to it otherwise finish the session
if existingSession:
GrassUtils.addSessionLayers(self.exportedLayers)
else:
GrassUtils.endGrassSession()
# Return outputs map
outputs = {}
for out in self.outputDefinitions():
outName = out.name()
if outName in parameters:
if outName in self.fileOutputs:
outputs[outName] = self.fileOutputs[outName]
else:
outputs[outName] = parameters[outName]
if isinstance(out, QgsProcessingOutputHtml):
if self.module and hasattr(self.module, "convertToHtml"):
func = getattr(self.module, "convertToHtml")
func(self, self.fileOutputs[outName], outputs)
else:
self.convertToHtml(self.fileOutputs[outName])
return outputs
def processInputs(self, parameters, context, feedback):
"""Prepare the GRASS import commands"""
inputs = [
p
for p in self.parameterDefinitions()
if isinstance(
p,
(
QgsProcessingParameterVectorLayer,
QgsProcessingParameterFeatureSource,
QgsProcessingParameterRasterLayer,
QgsProcessingParameterMultipleLayers,
),
)
]
for param in inputs:
paramName = param.name()
if paramName not in parameters:
continue
# Handle Null parameter
if parameters[paramName] is None:
continue
elif (
isinstance(parameters[paramName], str)
and len(parameters[paramName]) == 0
):
continue
# Raster inputs needs to be imported into temp GRASS DB
if isinstance(param, QgsProcessingParameterRasterLayer):
if paramName not in self.exportedLayers:
self.loadRasterLayerFromParameter(paramName, parameters, context)
# Vector inputs needs to be imported into temp GRASS DB
elif isinstance(
param,
(
QgsProcessingParameterFeatureSource,
QgsProcessingParameterVectorLayer,
),
):
if paramName not in self.exportedLayers:
# Attribute tables are also vector inputs
if QgsProcessing.SourceType.TypeFile in param.dataTypes():
self.loadAttributeTableFromParameter(
paramName, parameters, context
)
else:
self.loadVectorLayerFromParameter(
paramName,
parameters,
context,
external=None,
feedback=feedback,
)
# For multiple inputs, process each layer
elif isinstance(param, QgsProcessingParameterMultipleLayers):
layers = self.parameterAsLayerList(parameters, paramName, context)
for idx, layer in enumerate(layers):
layerName = f"{paramName}_{idx}"
# Add a raster layer
if layer.type() == QgsMapLayerType.RasterLayer:
self.loadRasterLayer(layerName, layer, context)
# Add a vector layer
elif layer.type() == QgsMapLayerType.VectorLayer:
self.loadVectorLayer(
layerName, layer, context, external=None, feedback=feedback
)
self.postInputs(context)
def postInputs(self, context: QgsProcessingContext):
"""
After layer imports, we need to update some internal parameters
"""
# If projection has not already be set, use the project
self.setSessionProjectionFromProject(context)
# Build GRASS region
if self.region.isEmpty():
self.region = QgsProcessingUtils.combineLayerExtents(
self.inputLayers, self.destination_crs, context
)
command = "g.region n={} s={} e={} w={}".format(
self.region.yMaximum(),
self.region.yMinimum(),
self.region.xMaximum(),
self.region.xMinimum(),
)
# Handle cell size
if self.parameterDefinition(self.GRASS_REGION_CELLSIZE_PARAMETER):
if self.cellSize:
cellSize = self.cellSize
else:
cellSize = self.getDefaultCellSize()
command += f" res={cellSize}"
# Handle align to resolution
if self.alignToResolution:
command += " -a"
# Add the default parameters commands
self.commands.append(command)
QgsMessageLog.logMessage(
self.tr("processInputs end. Commands: {}").format(self.commands),
"Grass",
Qgis.MessageLevel.Info,
)
def processCommand(self, parameters, context, feedback, delOutputs=False):
"""
Prepare the GRASS algorithm command
:param parameters:
:param context:
:param delOutputs: do not add outputs to commands.
"""
noOutputs = [
o
for o in self.parameterDefinitions()
if o not in self.destinationParameterDefinitions()
]
command = f"{self.grass_name} "
command += "{}".join(self.hardcodedStrings)
# Add algorithm command
for param in noOutputs:
paramName = param.name()
value = None
# Exclude default GRASS parameters
if paramName in [
self.GRASS_REGION_CELLSIZE_PARAMETER,
self.GRASS_REGION_EXTENT_PARAMETER,
self.GRASS_MIN_AREA_PARAMETER,
self.GRASS_SNAP_TOLERANCE_PARAMETER,
self.GRASS_OUTPUT_TYPE_PARAMETER,
self.GRASS_REGION_ALIGN_TO_RESOLUTION,
self.GRASS_RASTER_FORMAT_OPT,
self.GRASS_RASTER_FORMAT_META,
self.GRASS_VECTOR_DSCO,
self.GRASS_VECTOR_LCO,
self.GRASS_VECTOR_EXPORT_NOCAT,
]:
continue
# Raster and vector layers
if isinstance(
param,
(
QgsProcessingParameterRasterLayer,
QgsProcessingParameterVectorLayer,
QgsProcessingParameterFeatureSource,
),
):
if paramName in self.exportedLayers:
value = self.exportedLayers[paramName]
else:
value = self.parameterAsCompatibleSourceLayerPath(
parameters,
paramName,
context,
QgsVectorFileWriter.supportedFormatExtensions(),
)
# MultipleLayers
elif isinstance(param, QgsProcessingParameterMultipleLayers):
layers = self.parameterAsLayerList(parameters, paramName, context)
values = []
for idx in range(len(layers)):
layerName = f"{paramName}_{idx}"
values.append(self.exportedLayers[layerName])
value = ",".join(values)
# For booleans, we just add the parameter name
elif isinstance(param, QgsProcessingParameterBoolean):
if self.parameterAsBoolean(parameters, paramName, context):
command += f" {paramName}"
# For Extents, remove if the value is null
elif isinstance(param, QgsProcessingParameterExtent):
if self.parameterAsExtent(parameters, paramName, context):
value = self.parameterAsString(parameters, paramName, context)
# For enumeration, we need to grab the string value
elif isinstance(param, QgsProcessingParameterEnum):
# Handle multiple values
if param.allowMultiple():
indexes = self.parameterAsEnums(parameters, paramName, context)
else:
indexes = [self.parameterAsEnum(parameters, paramName, context)]
if indexes:
value = '"{}"'.format(
",".join([param.options()[i] for i in indexes])
)
# For strings, we just translate as string
elif isinstance(param, QgsProcessingParameterString):
data = self.parameterAsString(parameters, paramName, context)
# if string is empty, we don't add it
if len(data) > 0:
value = '"{}"'.format(
self.parameterAsString(parameters, paramName, context)
)
# For fields, we just translate as string
elif isinstance(param, QgsProcessingParameterField):
value = ",".join(self.parameterAsFields(parameters, paramName, context))
elif isinstance(param, QgsProcessingParameterFile):
if self.parameterAsString(parameters, paramName, context):
value = '"{}"'.format(
self.parameterAsString(parameters, paramName, context)
)
elif isinstance(param, QgsProcessingParameterPoint):
if self.parameterAsString(parameters, paramName, context):
# parameter specified, evaluate as point
# TODO - handle CRS transform
point = self.parameterAsPoint(parameters, paramName, context)
value = f"{point.x()},{point.y()}"
# For numbers, we translate as a string
elif isinstance(
param, (QgsProcessingParameterNumber, QgsProcessingParameterPoint)
):
value = self.parameterAsString(parameters, paramName, context)
elif isinstance(param, QgsProcessingParameterRange):
v = self.parameterAsRange(parameters, paramName, context)
if (
param.flags() & QgsProcessingParameterDefinition.Flag.FlagOptional
) and (math.isnan(v[0]) or math.isnan(v[1])):
continue
else:
value = f"{v[0]},{v[1]}"
elif isinstance(param, QgsProcessingParameterCrs):
if self.parameterAsCrs(parameters, paramName, context):
# TODO: ideally we should be exporting to WKT here, but it seems not all grass algorithms
# will accept a wkt string for a crs value (e.g. r.tileset)
value = f'"{self.parameterAsCrs(parameters, paramName, context).toProj()}"'
# For everything else, we assume that it is a string
else:
value = '"{}"'.format(
self.parameterAsString(parameters, paramName, context)
)
if value:
command += " {}={}".format(paramName.replace("~", ""), value)
# Handle outputs
if not delOutputs:
for out in self.destinationParameterDefinitions():
# We exclude hidden parameters
if out.flags() & QgsProcessingParameterDefinition.Flag.FlagHidden:
continue
outName = out.name()
# For File destination
if isinstance(out, QgsProcessingParameterFileDestination):
if outName in parameters and parameters[outName] is not None:
outPath = self.parameterAsFileOutput(
parameters, outName, context
)
self.fileOutputs[outName] = outPath
# for HTML reports, we need to redirect stdout
if out.defaultFileExtension().lower() == "html":
if outName == "html":
# for "fake" outputs redirect command stdout
command += f' > "{outPath}"'
else:
# for real outputs only output itself should be redirected
command += f' {outName}=- > "{outPath}"'
else:
command += f' {outName}="{outPath}"'
# For folders destination
elif isinstance(out, QgsProcessingParameterFolderDestination):
# We need to add a unique temporary basename
uniqueBasename = outName + self.uniqueSuffix
command += f" {outName}={uniqueBasename}"
else:
if outName in parameters and parameters[outName] is not None:
# We add an output name to make sure it is unique if the session
# uses this algorithm several times.
uniqueOutputName = outName + self.uniqueSuffix
command += f" {outName}={uniqueOutputName}"
# Add output file to exported layers, to indicate that
# they are present in GRASS
self.exportedLayers[outName] = uniqueOutputName
command += " --overwrite"
self.commands.append(command)
QgsMessageLog.logMessage(
self.tr("processCommands end. Commands: {}").format(self.commands),
"Grass",
Qgis.MessageLevel.Info,
)
def vectorOutputType(self, parameters, context):
"""Determine vector output types for outputs"""
self.outType = "auto"
if self.parameterDefinition(self.GRASS_OUTPUT_TYPE_PARAMETER):
typeidx = self.parameterAsEnum(
parameters, self.GRASS_OUTPUT_TYPE_PARAMETER, context
)
self.outType = "auto" if typeidx is None else self.OUTPUT_TYPES[typeidx]
def processOutputs(self, parameters, context, feedback):
"""Prepare the GRASS v.out.ogr commands"""
# Determine general vector output type
self.vectorOutputType(parameters, context)
for out in self.destinationParameterDefinitions():
outName = out.name()
if outName not in parameters:
# skipped output
continue
if isinstance(out, QgsProcessingParameterRasterDestination):
self.exportRasterLayerFromParameter(outName, parameters, context)
elif isinstance(out, QgsProcessingParameterVectorDestination):
self.exportVectorLayerFromParameter(outName, parameters, context)
elif isinstance(out, QgsProcessingParameterFolderDestination):
self.exportRasterLayersIntoDirectory(outName, parameters, context)
def loadRasterLayerFromParameter(
self, name, parameters, context: QgsProcessingContext, external=None, band=1
):
"""
Creates a dedicated command to load a raster into
the temporary GRASS DB.
:param name: name of the parameter.
:param parameters: algorithm parameters dict.
:param context: algorithm context.
:param external: use r.external if True, r.in.gdal otherwise.
:param band: imports only specified band. None for all bands.
"""
layer = self.parameterAsRasterLayer(parameters, name, context)
self.loadRasterLayer(name, layer, context, external, band)
def loadRasterLayer(
self,
name,
layer,
context: QgsProcessingContext,
external=None,
band=1,
destName=None,
):
"""
Creates a dedicated command to load a raster into
the temporary GRASS DB.
:param name: name of the parameter.
:param layer: QgsMapLayer for the raster layer.
:param context: Processing context
:param external: use r.external if True, r.in.gdal if False.
:param band: imports only specified band. None for all bands.
:param destName: force the destination name of the raster.
"""
if external is None:
external = ProcessingConfig.getSetting(GrassUtils.GRASS_USE_REXTERNAL)
self.inputLayers.append(layer)
self.setSessionProjectionFromLayer(layer, context)
if not destName:
destName = f"rast_{os.path.basename(getTempFilename(context=context))}"
self.exportedLayers[name] = destName
if layer.providerType() == "postgresraster":
source = ""
uri = layer.dataProvider().uri()
source = f"PG: {uri.connectionInfo()}"
schema = uri.schema()
if schema:
source += f" schema='{schema}'"
table = uri.table()
source += f" table='{table}'"
column = uri.param("column") or uri.geometryColumn()
if column:
source += f" column='{column}'"
is_tiled = any(
[
layer.dataProvider().xSize() != layer.dataProvider().xBlockSize(),
layer.dataProvider().ySize() != layer.dataProvider().yBlockSize(),
]
)
source += f" mode={2 if is_tiled else 1}"
where = layer.dataProvider().subsetString()
if where:
source += f" where='{where}'"
else:
source = os.path.normpath(layer.source())
command = '{} input="{}" {}output="{}" --overwrite -o'.format(
"r.external" if external else "r.in.gdal",
source,
f"band={band} " if band else "",
destName,
)
self.commands.append(command)
def exportRasterLayerFromParameter(
self, name, parameters, context, colorTable=True
):
"""
Creates a dedicated command to export a raster from
temporary GRASS DB into a file via gdal.
:param name: name of the parameter.
:param parameters: Algorithm parameters dict.
:param context: Algorithm context.
:param colorTable: preserve color Table.
"""
fileName = self.parameterAsOutputLayer(parameters, name, context)
if not fileName:
return
fileName = os.path.normpath(fileName)
grassName = f"{name}{self.uniqueSuffix}"
outFormat = GrassUtils.getRasterFormatFromFilename(fileName)
createOpt = self.parameterAsString(
parameters, self.GRASS_RASTER_FORMAT_OPT, context
)
metaOpt = self.parameterAsString(
parameters, self.GRASS_RASTER_FORMAT_META, context
)
self.exportRasterLayer(
grassName, fileName, colorTable, outFormat, createOpt, metaOpt
)
self.fileOutputs[name] = fileName
def exportRasterLayer(
self,
grassName,
fileName,
colorTable=True,
outFormat="GTiff",
createOpt=None,
metaOpt=None,
):
"""
Creates a dedicated command to export a raster from
temporary GRASS DB into a file via gdal.
:param grassName: name of the raster to export.
:param fileName: file path of raster layer.
:param colorTable: preserve color Table.
:param outFormat: file format for export.
:param createOpt: creation options for format.
:param metaOpt: metadata options for export.
"""
createOpt = createOpt or GrassUtils.GRASS_RASTER_FORMATS_CREATEOPTS.get(
outFormat
)
for cmd in [self.commands, self.outputCommands]:
# Adjust region to layer before exporting
cmd.append(f"g.region raster={grassName}")
cmd.append(
'r.out.gdal -t -m{} input="{}" output="{}" format="{}" {}{} --overwrite'.format(
"" if colorTable else " -c",
grassName,
fileName,
outFormat,
f' createopt="{createOpt}"' if createOpt else "",
f' metaopt="{metaOpt}"' if metaOpt else "",
)
)
def exportRasterLayersIntoDirectory(
self, name, parameters, context, colorTable=True, wholeDB=False
):
"""
Creates a dedicated loop command to export rasters from
temporary GRASS DB into a directory via gdal.
:param name: name of the output directory parameter.
:param parameters: Algorithm parameters dict.
:param context: Algorithm context.
:param colorTable: preserve color Table.
:param wholeDB: export every raster layer from the GRASSDB
"""
# Grab directory name and temporary basename
outDir = os.path.normpath(self.parameterAsString(parameters, name, context))
basename = ""
if not wholeDB:
basename = name + self.uniqueSuffix
# Add a loop export from the basename
for cmd in [self.commands, self.outputCommands]:
# TODO Format/options support
if isWindows():
cmd.append("if not exist {0} mkdir {0}".format(outDir))
cmd.append(
"for /F %%r IN ('g.list type^=rast pattern^=\"{}*\"') do r.out.gdal -m{} input=%%r output={}/%%r.tif {}".format(
basename,
" -t" if colorTable else "",
outDir,
'--overwrite -c createopt="TFW=YES,COMPRESS=LZW"',
)
)
else:
cmd.append(f"for r in $(g.list type=rast pattern='{basename}*'); do")
cmd.append(
" r.out.gdal -m{0} input=${{r}} output={1}/${{r}}.tif {2}".format(
" -t" if colorTable else "",
outDir,
'--overwrite -c createopt="TFW=YES,COMPRESS=LZW"',
)
)
cmd.append("done")
def loadVectorLayerFromParameter(
self, name, parameters, context, feedback, external=False
):
"""
Creates a dedicated command to load a vector into
the temporary GRASS DB.
:param name: name of the parameter
:param parameters: Parameters of the algorithm.
:param context: Processing context
:param external: use v.external (v.in.ogr if False).
"""
layer = self.parameterAsVectorLayer(parameters, name, context)
is_ogr_disk_based_layer = (
layer is not None and layer.dataProvider().name() == "ogr"
)
if is_ogr_disk_based_layer:
# we only support direct reading of disk based ogr layers -- not ogr postgres layers, etc
source_parts = QgsProviderRegistry.instance().decodeUri(
"ogr", layer.source()
)
if not source_parts.get("path"):
is_ogr_disk_based_layer = False
elif source_parts.get("layerId"):
# no support for directly reading layers by id in grass
is_ogr_disk_based_layer = False
if not is_ogr_disk_based_layer:
# parameter is not a vector layer or not an OGR layer - try to convert to a source compatible with
# grass OGR inputs and extract selection if required
path = self.parameterAsCompatibleSourceLayerPath(
parameters,
name,
context,
QgsVectorFileWriter.supportedFormatExtensions(),
feedback=feedback,
)
ogr_layer = QgsVectorLayer(path, "", "ogr")
self.loadVectorLayer(
name, ogr_layer, context, external=external, feedback=feedback
)
else:
# already an ogr disk based layer source
self.loadVectorLayer(
name, layer, context, external=external, feedback=feedback
)
def loadVectorLayer(
self, name, layer, context: QgsProcessingContext, external=False, feedback=None
):
"""
Creates a dedicated command to load a vector into
temporary GRASS DB.
:param name: name of the parameter
:param layer: QgsMapLayer for the vector layer.
:param context: Processing context
:param external: use v.external (v.in.ogr if False).
:param feedback: feedback object
"""
# TODO: support multiple input formats
if external is None:
external = ProcessingConfig.getSetting(GrassUtils.GRASS_USE_VEXTERNAL)
source_parts = QgsProviderRegistry.instance().decodeUri("ogr", layer.source())
file_path = source_parts.get("path")
layer_name = source_parts.get("layerName")
# safety check: we can only use external for ogr layers which support random read
if external:
if feedback is not None:
feedback.pushInfo(
self.tr("Attempting to use v.external for direct layer read")
)
ds = ogr.Open(file_path)
if ds is not None:
ogr_layer = ds.GetLayer()
if ogr_layer is None or not ogr_layer.TestCapability(ogr.OLCRandomRead):
if feedback is not None:
feedback.reportError(
self.tr(
"Cannot use v.external: layer does not support random read"
)
)
external = False
else:
if feedback is not None:
feedback.reportError(
self.tr("Cannot use v.external: error reading layer")
)
external = False
self.inputLayers.append(layer)
self.setSessionProjectionFromLayer(layer, context)
destFilename = f"vector_{os.path.basename(getTempFilename(context=context))}"
self.exportedLayers[name] = destFilename
command = '{}{}{} input="{}"{} output="{}" --overwrite -o'.format(
"v.external" if external else "v.in.ogr",
f" min_area={self.minArea}" if not external else "",
f" snap={self.snapTolerance}" if not external else "",
os.path.normpath(file_path),
f' layer="{layer_name}"' if layer_name else "",
destFilename,
)
if layer.subsetString():
escaped_subset = layer.subsetString().replace('"', '\\"')
command += f' where="{escaped_subset}"'
self.commands.append(command)
def exportVectorLayerFromParameter(
self, name, parameters, context, layer=None, nocats=False
):
"""
Creates a dedicated command to export a vector from
a QgsProcessingParameter.
:param name: name of the parameter.
:param context: parameters context.
:param layer: for vector with multiples layers, exports only one layer.
:param nocats: do not export GRASS categories.
"""
fileName = os.path.normpath(
self.parameterAsOutputLayer(parameters, name, context)
)
grassName = f"{name}{self.uniqueSuffix}"
# Find if there is a dataType
dataType = self.outType
if self.outType == "auto":
parameter = self.parameterDefinition(name)
if parameter:
layerType = parameter.dataType()
if layerType in self.QGIS_OUTPUT_TYPES:
dataType = self.QGIS_OUTPUT_TYPES[layerType]
outFormat = QgsVectorFileWriter.driverForExtension(
os.path.splitext(fileName)[1]
).replace(" ", "_")
dsco = self.parameterAsString(parameters, self.GRASS_VECTOR_DSCO, context)
lco = self.parameterAsString(parameters, self.GRASS_VECTOR_LCO, context)
exportnocat = self.parameterAsBoolean(
parameters, self.GRASS_VECTOR_EXPORT_NOCAT, context
)
self.exportVectorLayer(
grassName,
fileName,
layer,
nocats,
dataType,
outFormat,
dsco,
lco,
exportnocat,
)
self.fileOutputs[name] = fileName
def exportVectorLayer(
self,
grassName,
fileName,
layer=None,
nocats=False,
dataType="auto",
outFormat=None,
dsco=None,
lco=None,
exportnocat=False,
):
"""
Creates a dedicated command to export a vector from
temporary GRASS DB into a file via OGR.
:param grassName: name of the vector to export.
:param fileName: file path of vector layer.
:param dataType: export only this type of data.
:param layer: for vector with multiples layers, exports only one layer.
:param nocats: do not export GRASS categories.
:param outFormat: file format for export.
:param dsco: datasource creation options for format.
:param lco: layer creation options for format.
:param exportnocat: do not export features without categories.
"""
if outFormat is None:
outFormat = QgsVectorFileWriter.driverForExtension(
os.path.splitext(fileName)[1]
).replace(" ", "_")
for cmd in [self.commands, self.outputCommands]:
cmd.append(
'v.out.ogr{} type="{}" input="{}" output="{}" format="{}" {}{}{}{} --overwrite'.format(
"" if nocats else "",
dataType,
grassName,
fileName,
outFormat,
f"layer={layer}" if layer else "",
f' dsco="{dsco}"' if dsco else "",
f' lco="{lco}"' if lco else "",
" -c" if exportnocat else "",
)
)
def loadAttributeTableFromParameter(self, name, parameters, context):
"""
Creates a dedicated command to load an attribute table
into the temporary GRASS DB.
:param name: name of the parameter
:param parameters: Parameters of the algorithm.
:param context: Processing context
"""
table = self.parameterAsVectorLayer(parameters, name, context)
self.loadAttributeTable(name, table, context)
def loadAttributeTable(
self, name, layer, context: QgsProcessingContext, destName=None
):
"""
Creates a dedicated command to load an attribute table
into the temporary GRASS DB.
:param name: name of the input parameter.
:param layer: a layer object to import from.
:param context: processing context
:param destName: force the name for the table into GRASS DB.
"""
self.inputLayers.append(layer)
if not destName:
destName = f"table_{os.path.basename(getTempFilename(context=context))}"
self.exportedLayers[name] = destName
command = 'db.in.ogr --overwrite input="{}" output="{}"'.format(
os.path.normpath(layer.source()), destName
)
self.commands.append(command)
def exportAttributeTable(self, grassName, fileName, outFormat="CSV", layer=1):
"""
Creates a dedicated command to export an attribute
table from the temporary GRASS DB into a file via ogr.
:param grassName: name of the parameter.
:param fileName: file path of raster layer.
:param outFormat: file format for export.
:param layer: In GRASS a vector can have multiple layers.
"""
for cmd in [self.commands, self.outputCommands]:
cmd.append(
'db.out.ogr input="{}" output="{}" layer={} format={} --overwrite'.format(
grassName, fileName, layer, outFormat
)
)
def setSessionProjectionFromProject(self, context: QgsProcessingContext):
"""
Set the projection from the project.
We create a WKT definition which is transmitted to Grass
"""
if not GrassUtils.projectionSet and iface:
self.setSessionProjection(
iface.mapCanvas().mapSettings().destinationCrs(), context
)
def setSessionProjectionFromLayer(
self, layer: QgsMapLayer, context: QgsProcessingContext
):
"""
Set the projection from a QgsVectorLayer.
We create a WKT definition which is transmitted to Grass
"""
if not GrassUtils.projectionSet:
self.setSessionProjection(layer.crs(), context)
def setSessionProjection(self, crs, context: QgsProcessingContext):
"""
Set the session projection to the specified CRS
"""
self.destination_crs = crs
file_name = GrassUtils.exportCrsWktToFile(crs, context)
command = f'g.proj -c wkt="{file_name}"'
self.commands.append(command)
GrassUtils.projectionSet = True
def convertToHtml(self, fileName):
# Read HTML contents
lines = []
with open(fileName, encoding="utf-8") as f:
lines = f.readlines()
if len(lines) > 1 and "<html>" not in lines[0]:
# Then write into the HTML file
with open(fileName, "w", encoding="utf-8") as f:
f.write("<html><head>")
f.write(
'<meta http-equiv="Content-Type" content="text/html; charset=utf-8" /></head>'
)
f.write("<body><p>")
for line in lines:
f.write(f"{line}</br>")
f.write("</p></body></html>")
def canExecute(self):
message = GrassUtils.checkGrassIsInstalled()
return not message, message
def checkParameterValues(self, parameters, context):
grass_parameters = {k: v for k, v in parameters.items()}
if self.module:
if hasattr(self.module, "checkParameterValuesBeforeExecuting"):
func = getattr(self.module, "checkParameterValuesBeforeExecuting")
return func(self, grass_parameters, context)
return super().checkParameterValues(grass_parameters, context)