QGIS/python/plugins/processing/algs/grass7/Grass7Algorithm.py

617 lines
27 KiB
Python

# -*- coding: utf-8 -*-
"""
***************************************************************************
Grass7Algorithm.py
---------------------
Date : February 2015
Copyright : (C) 2014-2015 by Victor Olaya
Email : volayaf at gmail dot com
***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************
"""
from builtins import str
from builtins import range
__author__ = 'Victor Olaya'
__date__ = 'February 2015'
__copyright__ = '(C) 2012-2015, Victor Olaya'
# This will get replaced with a git SHA1 when you do a git archive
__revision__ = '$Format:%H$'
import os
import uuid
import importlib
from qgis.PyQt.QtCore import QCoreApplication, QUrl
from qgis.core import (QgsRasterLayer,
QgsApplication,
QgsProcessingUtils,
QgsMessageLog,
QgsProcessingAlgorithm,
QgsProcessingParameterDefinition,
QgsProcessingException,
QgsProcessingParameterExtent,
QgsProcessingParameterNumber)
from qgis.utils import iface
#from processing.core.GeoAlgorithm import GeoAlgorithm (replaced by QgsProcessingAlgorithm)
from processing.core.ProcessingConfig import ProcessingConfig
#from processing.core.GeoAlgorithmExecutionException import GeoAlgorithmExecutionException (replaced by QgsProcessingException).
from processing.core.parameters import (getParameterFromString,
ParameterVector,
ParameterMultipleInput,
#ParameterExtent,
#ParameterNumber,
#ParameterSelection,
ParameterRaster,
ParameterTable,
ParameterBoolean,
ParameterString,
ParameterPoint)
from processing.core.outputs import (getOutputFromString,
OutputRaster,
OutputVector,
OutputFile,
OutputHTML)
from .Grass7Utils import Grass7Utils
from processing.tools import dataobjects, system
pluginPath = os.path.normpath(os.path.join(
os.path.split(os.path.dirname(__file__))[0], os.pardir))
class Grass7Algorithm(QgsProcessingAlgorithm):
GRASS_OUTPUT_TYPE_PARAMETER = 'GRASS_OUTPUT_TYPE_PARAMETER'
GRASS_MIN_AREA_PARAMETER = 'GRASS_MIN_AREA_PARAMETER'
GRASS_SNAP_TOLERANCE_PARAMETER = 'GRASS_SNAP_TOLERANCE_PARAMETER'
GRASS_REGION_EXTENT_PARAMETER = 'GRASS_REGION_PARAMETER'
GRASS_REGION_CELLSIZE_PARAMETER = 'GRASS_REGION_CELLSIZE_PARAMETER'
GRASS_REGION_ALIGN_TO_RESOLUTION = '-a_r.region'
OUTPUT_TYPES = ['auto', 'point', 'line', 'area']
def __init__(self, descriptionfile):
super().__init__()
#GeoAlgorithm.__init__(self)
self._name = ''
self._display_name = ''
self._group = ''
self.grass7Name = ''
self.hardcodedStrings = []
self.descriptionFile = descriptionfile
self.defineCharacteristicsFromFile()
self.numExportedLayers = 0
self.uniqueSuffix = str(uuid.uuid4()).replace('-', '')
# Use the ext mechanism
name = self.name().replace('.', '_')
try:
self.module = importlib.import_module('processing.algs.grass7.ext.' + name)
except ImportError:
self.module = None
def name(self):
return self._name
def displayName(self):
return self._display_name
def group(self):
return self._group
def icon(self):
return QgsApplication.getThemeIcon("/providerGrass.svg")
def svgIconPath(self):
return QgsApplication.iconPath("providerGrass.svg")
def tr(self, string, context=''):
if context == '':
context = self.__class__.__name__
return QCoreApplication.translate(context, string)
def helpUrl(self):
helpPath = Grass7Utils.grassHelpPath()
if helpPath == '':
return None
if os.path.exists(helpPath):
return QUrl.fromLocalFile(os.path.join(helpPath, '{}.html'.format(self.grass7Name))).toString()
else:
return helpPath + '{}.html'.format(self.grass7Name)
def getParameterDescriptions(self):
descs = {}
_, helpfile = self.help()
try:
with open(helpfile) as infile:
lines = infile.readlines()
for i in range(len(lines)):
if lines[i].startswith('<DT><b>'):
for param in self.parameterDefinitions():
searchLine = '<b>' + param.name() + '</b>'
if searchLine in lines[i]:
i += 1
descs[param.name()] = (lines[i])[4:-6]
break
except Exception:
pass
return descs
def defineCharacteristicsFromFile(self):
"""
Create algorithm parameters and outputs from a text file.
"""
with open(self.descriptionFile) as lines:
# First line of the file is the Grass algorithm name
line = lines.readline().strip('\n').strip()
self.grass7Name = line
# Second line if the algorithm name in Processing
line = lines.readline().strip('\n').strip()
self._name = line
self._display_name = QCoreApplication.translate("GrassAlgorithm", line)
if " - " not in self._name:
self._name = self.grass7Name + " - " + self._name
self._display_name = self.grass7Name + " - " + self._display_name
self._name = self._name[:self._name.find(' ')].lower()
# Read the grass group
line = lines.readline().strip('\n').strip()
self._group = QCoreApplication.translate("GrassAlgorithm", line)
hasRasterOutput = False
hasVectorInput = False
vectorOutputs = 0
# Then you have parameters/output definition
line = lines.readline().strip('\n').strip()
while line != '':
try:
line = line.strip('\n').strip()
if line.startswith('Hardcoded'):
self.hardcodedStrings.append(line[len('Hardcoded|'):])
parameter = getParameterFromString(line)
if parameter is not None:
self.addParameter(parameter)
if isinstance(parameter, ParameterVector):
hasVectorInput = True
if isinstance(parameter, ParameterMultipleInput) \
and parameter.datatype < 3:
hasVectorInput = True
else:
output = getOutputFromString(line)
self.addOutput(output)
if isinstance(output, OutputRaster):
hasRasterOutput = True
elif isinstance(output, OutputVector):
vectorOutputs += 1
if isinstance(output, OutputHTML):
self.addOutput(OutputFile("rawoutput",
self.tr("{0} (raw output)").format(output.description()),
"txt"))
line = lines.readline().strip('\n').strip()
except Exception as e:
QgsMessageLog.logMessage(self.tr('Could not open GRASS GIS 7 algorithm: {0}\n{1}').format(self.descriptionFile, line), self.tr('Processing'), QgsMessageLog.CRITICAL)
raise e
self.addParameter(QgsProcessingParameterExtent(
self.GRASS_REGION_EXTENT_PARAMETER,
self.tr('GRASS GIS 7 region extent'))
)
if hasRasterOutput:
self.addParameter(QgsProcessingParameterNumber(
self.GRASS_REGION_CELLSIZE_PARAMETER,
self.tr('GRASS GIS 7 region cellsize (leave 0 for default)'),
type=QgsProcessingParameterNumber.Double,
minValue=0.0, maxValue=None, defaultValue=0.0)
)
if hasVectorInput:
param = QgsProcessingParameterNumber(self.GRASS_SNAP_TOLERANCE_PARAMETER,
self.tr('v.in.ogr snap tolerance (-1 = no snap)'),
type=QgsProcessingParameterNumber.Double,
minValue=-1.0, maxValue=None, defaultValue=-1.0)
param.setFlags(param.flags() | QgsProcessingParameterDefinition.FlagAdvanced)
self.addParameter(param)
param = QgsProcessingParameterNumber(self.GRASS_MIN_AREA_PARAMETER,
self.tr('v.in.ogr min area'),
type=QgsProcessingParameterNumber.double,
minValue=0.0, maxValue=None, defaultValue=0.0001)
param.setFlags(param.flags() | QgsProcessingParameterDefinition.FlagAdvanced)
self.addParameter(param)
if vectorOutputs == 1:
param = QgsProcessingParameterEnum(self.GRASS_OUTPUT_TYPE_PARAMETER,
self.tr('v.out.ogr output type'),
self.OUTPUT_TYPES)
param.setFlags(param.flags() | QgsProcessingParameterDefinition.FlagAdvanced)
self.addParameter(param)
def getDefaultCellsize(self, parameters, context):
cellsize = 0
for param in self.parameterDefinitions():
if param.name() in parameters:
value = parameters[param.name()]
if isinstance(param, ParameterRaster):
if isinstance(value, QgsRasterLayer):
layer = value
else:
layer = QgsProcessingUtils.mapLayerFromString(param.value, context)
cellsize = max(cellsize, (layer.extent().xMaximum() -
layer.extent().xMinimum()) /
layer.width())
elif isinstance(param, ParameterMultipleInput):
layers = value.split(';')
for layername in layers:
layer = QgsProcessingUtils.mapLayerFromString(layername, context)
if isinstance(layer, QgsRasterLayer):
cellsize = max(cellsize, (
layer.extent().xMaximum() -
layer.extent().xMinimum()) /
layer.width()
)
if cellsize == 0:
cellsize = 100
return cellsize
def processAlgorithm(self, parameters, context, feedback):
if system.isWindows():
path = Grass7Utils.grassPath()
if path == '':
raise QgsProcessingException(
self.tr('GRASS GIS 7 folder is not configured. Please '
'configure it before running GRASS GIS 7 algorithms.'))
# Create brand new commands lists
self.commands = []
self.outputCommands = []
self.exportedLayers = {}
# If GRASS session has been created outside of this algorithm then
# get the list of layers loaded in GRASS otherwise start a new
# session
existingSession = Grass7Utils.sessionRunning
if existingSession:
self.exportedLayers = Grass7Utils.getSessionLayers()
else:
Grass7Utils.startGrass7Session()
# Handle ext functions for inputs/command/outputs
if self.module:
if hasattr(self.module, 'processInputs'):
func = getattr(self.module, 'processInputs')
func(self)
else:
self.processInputs(parameters, context)
if hasattr(self.module, 'processCommand'):
func = getattr(self.module, 'processCommand')
func(self)
else:
self.processCommand()
if hasattr(self.module, 'processOutputs'):
func = getattr(self.module, 'processOutputs')
func(self)
else:
self.processOutputs()
else:
self.processInputs(parameters, context)
self.processCommand()
self.processOutputs()
# Run GRASS
loglines = []
loglines.append(self.tr('GRASS GIS 7 execution commands'))
for line in self.commands:
feedback.pushCommandInfo(line)
loglines.append(line)
if ProcessingConfig.getSetting(Grass7Utils.GRASS_LOG_COMMANDS):
QgsMessageLog.logMessage("\n".join(loglines), self.tr('Processing'), QgsMessageLog.INFO)
Grass7Utils.executeGrass7(self.commands, feedback, self.outputCommands)
for out in self.outputs:
if isinstance(out, OutputHTML):
with open(self.getOutputFromName("rawoutput").value) as f:
rawOutput = "".join(f.readlines())
with open(out.value, "w") as f:
f.write("<pre>%s</pre>" % rawOutput)
# If the session has been created outside of this algorithm, add
# the new GRASS GIS 7 layers to it otherwise finish the session
if existingSession:
Grass7Utils.addSessionLayers(self.exportedLayers)
else:
Grass7Utils.endGrass7Session()
def processInputs(self, parameters, context):
"""Prepare the GRASS import commands"""
for param in self.parameterDefinitions():
if isinstance(param, ParameterRaster):
if not param.name() in parameters():
continue
value = parameters[param.name()]
# Check if the layer hasn't already been exported in, for
# example, previous GRASS calls in this session
if value in list(self.exportedLayers.keys()):
continue
else:
self.setSessionProjectionFromLayer(value, self.commands)
self.commands.append(self.exportRasterLayer(value))
if isinstance(param, ParameterVector):
if not param.name() in parameters():
continue
value = parameters[param.name()]
if value in list(self.exportedLayers.keys()):
continue
else:
self.setSessionProjectionFromLayer(value, self.commands)
self.commands.append(self.exportVectorLayer(value))
if isinstance(param, ParameterTable):
pass
if isinstance(param, ParameterMultipleInput):
if not param.name() in parameters():
continue
value = parameters[param.name()]
layers = value.split(';')
if layers is None or len(layers) == 0:
continue
if param.datatype == dataobjects.TYPE_RASTER:
for layer in layers:
if layer in list(self.exportedLayers.keys()):
continue
else:
self.setSessionProjectionFromLayer(layer, self.commands)
self.commands.append(self.exportRasterLayer(layer))
elif param.datatype in [dataobjects.TYPE_VECTOR_ANY,
dataobjects.TYPE_VECTOR_LINE,
dataobjects.TYPE_VECTOR_POLYGON,
dataobjects.TYPE_VECTOR_POINT]:
for layer in layers:
if layer in list(self.exportedLayers.keys()):
continue
else:
self.setSessionProjectionFromLayer(layer, self.commands)
self.commands.append(self.exportVectorLayer(layer))
self.setSessionProjectionFromProject(self.commands)
region = \
str(self.getParameterValue(self.GRASS_REGION_EXTENT_PARAMETER))
if not region:
region = QgsProcessingUtils.combineLayerExtents(layers)
regionCoords = region.split(',')
command = 'g.region'
command += ' n=' + str(regionCoords[3])
command += ' s=' + str(regionCoords[2])
command += ' e=' + str(regionCoords[1])
command += ' w=' + str(regionCoords[0])
cellsize = self.getParameterValue(self.GRASS_REGION_CELLSIZE_PARAMETER)
if cellsize:
command += ' res=' + str(cellsize)
else:
command += ' res=' + str(self.getDefaultCellsize(parameters, context))
alignToResolution = \
self.getParameterValue(self.GRASS_REGION_ALIGN_TO_RESOLUTION)
if alignToResolution:
command += ' -a'
self.commands.append(command)
def processCommand(self, parameters):
"""Prepare the GRASS algorithm command
:param parameters:
"""
command = self.grass7Name
command += ' ' + ' '.join(self.hardcodedStrings)
# Add algorithm command
for param in self.parameterDefinitions():
if param.value is None or param.value == '':
continue
if param.name in [self.GRASS_REGION_CELLSIZE_PARAMETER, self.GRASS_REGION_EXTENT_PARAMETER, self.GRASS_MIN_AREA_PARAMETER, self.GRASS_SNAP_TOLERANCE_PARAMETER, self.GRASS_OUTPUT_TYPE_PARAMETER, self.GRASS_REGION_ALIGN_TO_RESOLUTION]:
continue
if isinstance(param, (ParameterRaster, ParameterVector)):
value = param.value
if value in list(self.exportedLayers.keys()):
command += ' ' + param.name() + '=' \
+ self.exportedLayers[value]
else:
command += ' ' + param.name() + '=' + value
elif isinstance(param, ParameterMultipleInput):
s = param.value
for layer in list(self.exportedLayers.keys()):
s = s.replace(layer, self.exportedLayers[layer])
s = s.replace(';', ',')
command += ' ' + param.name() + '=' + s
elif isinstance(param, ParameterBoolean):
if param.value:
command += ' ' + param.name()
elif isinstance(param, QgsProcessingParameterEnum):
idx = int(param.value)
command += ' ' + param.name() + '=' + str(param.options[idx][1])
elif isinstance(param, ParameterString):
command += ' ' + param.name() + '="' + str(param.value) + '"'
elif isinstance(param, ParameterPoint):
command += ' ' + param.name() + '=' + str(param.value)
else:
command += ' ' + param.name() + '="' + str(param.value) + '"'
for out in self.outputs:
if isinstance(out, OutputFile):
command += ' > ' + out.value
elif not isinstance(out, OutputHTML):
# We add an output name to make sure it is unique if the session
# uses this algorithm several times.
uniqueOutputName = out.name + self.uniqueSuffix
command += ' ' + out.name + '=' + uniqueOutputName
# Add output file to exported layers, to indicate that
# they are present in GRASS
self.exportedLayers[out.value] = uniqueOutputName
command += ' --overwrite'
self.commands.append(command)
def processOutputs(self):
"""Prepare the GRASS v.out.ogr commands"""
for out in self.outputs:
if isinstance(out, OutputRaster):
filename = out.value
# Raster layer output: adjust region to layer before
# exporting
self.commands.append('g.region raster=' + out.name + self.uniqueSuffix)
self.outputCommands.append('g.region raster=' + out.name +
self.uniqueSuffix)
if self.grass7Name == 'r.statistics':
# r.statistics saves its results in a non-qgis compatible
# way. Post-process them with r.mapcalc.
calcExpression = 'correctedoutput' + self.uniqueSuffix
calcExpression += '=@' + out.name + self.uniqueSuffix
command = 'r.mapcalc expression="' + calcExpression + '"'
self.commands.append(command)
self.outputCommands.append(command)
command = 'r.out.gdal --overwrite -c createopt="TFW=YES,COMPRESS=LZW"'
command += ' input='
command += 'correctedoutput' + self.uniqueSuffix
command += ' output="' + filename + '"'
else:
command = 'r.out.gdal --overwrite -c createopt="TFW=YES,COMPRESS=LZW"'
command += ' input='
if self.grass7Name == 'r.horizon':
command += out.name + self.uniqueSuffix + '_0'
elif self.grass7Name == 'r.statistics':
self.commands.append(command)
self.outputCommands.append(command)
else:
command += out.name + self.uniqueSuffix
command += ' output="' + filename + '"'
self.commands.append(command)
self.outputCommands.append(command)
if isinstance(out, OutputVector):
filename = out.value
typeidx = self.getParameterValue(self.GRASS_OUTPUT_TYPE_PARAMETER)
outtype = ('auto' if typeidx
is None else self.OUTPUT_TYPES[typeidx])
if self.grass7Name == 'r.flow':
command = 'v.out.ogr type=line layer=0 -s -e input=' + out.name + self.uniqueSuffix
elif self.grass7Name == 'v.voronoi':
if '-l' in self.commands[-1]:
command = 'v.out.ogr type=line layer=0 -s -e input=' + out.name + self.uniqueSuffix
else:
command = 'v.out.ogr -s -e input=' + out.name + self.uniqueSuffix
command += ' type=' + outtype
elif self.grass7Name == 'v.sample':
command = 'v.out.ogr type=point -s -e input=' + out.name + self.uniqueSuffix
else:
command = 'v.out.ogr -s -e input=' + out.name + self.uniqueSuffix
command += ' type=' + outtype
command += ' output="' + os.path.dirname(out.value) + '"'
command += ' format=ESRI_Shapefile'
command += ' output_layer=' + os.path.basename(out.value)[:-4]
command += ' --overwrite'
self.commands.append(command)
self.outputCommands.append(command)
def exportVectorLayer(self, orgFilename):
context = dataobjects.createContext()
# TODO: improve this. We are now exporting if it is not a shapefile,
# but the functionality of v.in.ogr could be used for this.
# We also export if there is a selection
if not os.path.exists(orgFilename) or not orgFilename.endswith('shp'):
layer = QgsProcessingUtils.mapLayerFromString(orgFilename, context, False)
if layer:
filename = dataobjects.exportVectorLayer(layer)
else:
layer = QgsProcessingUtils.mapLayerFromString(orgFilename, context, False)
if layer:
#TODO
#useSelection = \
# ProcessingConfig.getSetting(ProcessingConfig.USE_SELECTED)
if useSelection and layer.selectedFeatureCount() != 0:
filename = dataobjects.exportVectorLayer(layer)
else:
filename = orgFilename
else:
filename = orgFilename
destFilename = 'a' + os.path.basename(self.getTempFilename())
self.exportedLayers[orgFilename] = destFilename
command = 'v.in.ogr'
min_area = self.getParameterValue(self.GRASS_MIN_AREA_PARAMETER)
command += ' min_area=' + str(min_area)
snap = self.getParameterValue(self.GRASS_SNAP_TOLERANCE_PARAMETER)
command += ' snap=' + str(snap)
command += ' input="' + os.path.dirname(filename) + '"'
command += ' layer=' + os.path.basename(filename)[:-4]
command += ' output=' + destFilename
command += ' --overwrite -o'
return command
def setSessionProjectionFromProject(self, commands):
if not Grass7Utils.projectionSet and iface:
proj4 = iface.mapCanvas().mapSettings().destinationCrs().toProj4()
command = 'g.proj'
command += ' -c'
command += ' proj4="' + proj4 + '"'
self.commands.append(command)
Grass7Utils.projectionSet = True
def setSessionProjectionFromLayer(self, layer, commands):
context = dataobjects.createContext()
if not Grass7Utils.projectionSet:
qGisLayer = QgsProcessingUtils.mapLayerFromString(layer, context)
if qGisLayer:
proj4 = str(qGisLayer.crs().toProj4())
command = 'g.proj'
command += ' -c'
command += ' proj4="' + proj4 + '"'
self.commands.append(command)
Grass7Utils.projectionSet = True
def exportRasterLayer(self, layer):
destFilename = 'a' + os.path.basename(self.getTempFilename())
self.exportedLayers[layer] = destFilename
command = 'r.external'
command += ' input="' + layer + '"'
command += ' band=1'
command += ' output=' + destFilename
command += ' --overwrite -o'
return command
def getTempFilename(self):
# TODO Replace with QgsProcessingUtils generateTempFilename
return system.getTempFilename()
def canExecute(self):
message = Grass7Utils.checkGrass7IsInstalled()
return not message, message
def checkParameterValues(self, parameters, context):
if self.module:
if hasattr(self.module, 'checkParameterValuesBeforeExecuting'):
func = getattr(self.module, 'checkParameterValuesBeforeExecuting')
return func(self), None
return super(Grass7Algorithm, self).checkParameterValues(parameters, context)