mirror of
https://github.com/qgis/QGIS.git
synced 2025-03-01 00:46:20 -05:00
initAlgorithm() method This allows 2 benefits: - algorithms can be subclassed and have subclasses add additional parameters/outputs to the algorithm. With the previous approach of declaring parameters/outputs in the constructor, it's not possible to call virtual methods to add additional parameters/ outputs (since you can't call virtual methods from a constructor). - initAlgorithm takes a variant map argument, allowing the algorithm to dynamically adjust its declared parameters and outputs according to this configuration map. This potentially allows model algorithms which can be configured to have variable numbers of parameters and outputs at run time. E.g. a "router" algorithm which directs features to one of any number of output sinks depending on some user configured criteria.
308 lines
11 KiB
Python
308 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
***************************************************************************
|
|
ScriptAlgorithm.py
|
|
---------------------
|
|
Date : August 2012
|
|
Copyright : (C) 2012 by Victor Olaya
|
|
Email : volayaf at gmail dot com
|
|
***************************************************************************
|
|
* *
|
|
* This program is free software; you can redistribute it and/or modify *
|
|
* it under the terms of the GNU General Public License as published by *
|
|
* the Free Software Foundation; either version 2 of the License, or *
|
|
* (at your option) any later version. *
|
|
* *
|
|
***************************************************************************
|
|
"""
|
|
from builtins import str
|
|
|
|
__author__ = 'Victor Olaya'
|
|
__date__ = 'August 2012'
|
|
__copyright__ = '(C) 2012, Victor Olaya'
|
|
|
|
# This will get replaced with a git SHA1 when you do a git archive
|
|
|
|
__revision__ = '$Format:%H$'
|
|
|
|
import os
|
|
import re
|
|
import json
|
|
from qgis.core import (QgsExpressionContextUtils,
|
|
QgsExpressionContext,
|
|
QgsProcessingAlgorithm,
|
|
QgsProject,
|
|
QgsApplication,
|
|
QgsMessageLog,
|
|
QgsProcessingUtils,
|
|
QgsProcessingAlgorithm)
|
|
|
|
from qgis.PyQt.QtCore import (QCoreApplication)
|
|
|
|
from processing.core.GeoAlgorithm import GeoAlgorithm
|
|
from processing.gui.Help2Html import getHtmlFromHelpFile
|
|
from processing.core.parameters import getParameterFromString
|
|
from processing.core.outputs import getOutputFromString
|
|
from processing.script.WrongScriptException import WrongScriptException
|
|
|
|
pluginPath = os.path.split(os.path.dirname(__file__))[0]
|
|
|
|
|
|
class ScriptAlgorithm(QgsProcessingAlgorithm):
|
|
|
|
def __init__(self, descriptionFile, script=None):
|
|
"""The script parameter can be used to directly pass the code
|
|
of the script without a file.
|
|
|
|
This is to be used from the script edition dialog, but should
|
|
not be used in other cases.
|
|
"""
|
|
|
|
super().__init__()
|
|
self._icon = QgsApplication.getThemeIcon("/processingScript.svg")
|
|
self._name = ''
|
|
self._display_name = ''
|
|
self._group = ''
|
|
self._flags = None
|
|
|
|
self.script = script
|
|
self.allowEdit = True
|
|
self.noCRSWarning = False
|
|
self.descriptionFile = descriptionFile
|
|
if script is not None:
|
|
self.defineCharacteristicsFromScript()
|
|
if descriptionFile is not None:
|
|
self.defineCharacteristicsFromFile()
|
|
|
|
self.ns = {}
|
|
self.cleaned_script = None
|
|
self.results = {}
|
|
|
|
def createInstance(self):
|
|
return ScriptAlgorithm(self.descriptionFile)
|
|
|
|
def initAlgorithm(self, config=None):
|
|
pass
|
|
|
|
def icon(self):
|
|
return self._icon
|
|
|
|
def name(self):
|
|
return self._name
|
|
|
|
def displayName(self):
|
|
return self._display_name
|
|
|
|
def group(self):
|
|
return self._group
|
|
|
|
def flags(self):
|
|
if self._flags is not None:
|
|
return QgsProcessingAlgorithm.Flags(self._flags)
|
|
else:
|
|
return QgsProcessingAlgorithm.flags(self)
|
|
|
|
def svgIconPath(self):
|
|
return QgsApplication.iconPath("processingScript.svg")
|
|
|
|
def defineCharacteristicsFromFile(self):
|
|
self.error = None
|
|
self.script = ''
|
|
filename = os.path.basename(self.descriptionFile)
|
|
self._name = filename[:filename.rfind('.')].replace('_', ' ')
|
|
self._display_name = self._name
|
|
self._group = self.tr('User scripts', 'ScriptAlgorithm')
|
|
with open(self.descriptionFile) as lines:
|
|
line = lines.readline()
|
|
while line != '':
|
|
if line.startswith('##'):
|
|
try:
|
|
self.processParameterLine(line.strip('\n'))
|
|
except:
|
|
self.error = self.tr('This script has a syntax errors.\n'
|
|
'Problem with line: {0}', 'ScriptAlgorithm').format(line)
|
|
self.script += line
|
|
line = lines.readline()
|
|
if self._group == self.tr('[Test scripts]', 'ScriptAlgorithm'):
|
|
self._flags = QgsProcessingAlgorithm.FlagHideFromToolbox | QgsProcessingAlgorithm.FlagHideFromModeler
|
|
|
|
def defineCharacteristicsFromScript(self):
|
|
lines = self.script.split('\n')
|
|
self._name = '[Unnamed algorithm]'
|
|
self._display_name = self.tr('[Unnamed algorithm]', 'ScriptAlgorithm')
|
|
self._group = self.tr('User scripts', 'ScriptAlgorithm')
|
|
for line in lines:
|
|
if line.startswith('##'):
|
|
try:
|
|
self.processParameterLine(line.strip('\n'))
|
|
except:
|
|
pass
|
|
|
|
def canExecute(self):
|
|
return not self.error, self.error
|
|
|
|
def validateInputCrs(self, parameters, context):
|
|
if self.noCRSWarning:
|
|
return True
|
|
else:
|
|
return QgsProcessingAlgorithm.validateInputCrs(self, parameters, context)
|
|
|
|
def createDescriptiveName(self, s):
|
|
return s.replace('_', ' ')
|
|
|
|
def processParameterLine(self, line):
|
|
param = None
|
|
line = line.replace('#', '')
|
|
|
|
if line == "nomodeler":
|
|
self._flags = self._flags | QgsProcessingAlgorithm.FlagHideFromModeler
|
|
return
|
|
if line == "nocrswarning":
|
|
self.noCRSWarning = True
|
|
return
|
|
tokens = line.split('=', 1)
|
|
desc = self.createDescriptiveName(tokens[0])
|
|
if tokens[1].lower().strip() == 'group':
|
|
self._group = tokens[0]
|
|
return
|
|
if tokens[1].lower().strip() == 'name':
|
|
self._name = self._display_name = tokens[0]
|
|
return
|
|
|
|
out = getOutputFromString(line)
|
|
if out is None:
|
|
param = getParameterFromString(line)
|
|
|
|
if param is not None:
|
|
self.addParameter(param)
|
|
elif out is not None:
|
|
out.setName(tokens[0])
|
|
out.setDescription(desc)
|
|
self.addOutput(out)
|
|
else:
|
|
raise WrongScriptException(
|
|
self.tr('Could not load script: {0}.\n'
|
|
'Problem with line "{1}"', 'ScriptAlgorithm').format(self.descriptionFile or '', line))
|
|
|
|
def prepareAlgorithm(self, parameters, context, feedback):
|
|
for param in self.parameterDefinitions():
|
|
method = None
|
|
if param.type() == "boolean":
|
|
method = self.parameterAsBool
|
|
elif param.type() == "crs":
|
|
method = self.parameterAsCrs
|
|
elif param.type() == "layer":
|
|
method = self.parameterAsLayer
|
|
elif param.type() == "extent":
|
|
method = self.parameterAsExtent
|
|
elif param.type() == "point":
|
|
method = self.parameterAsPoint
|
|
elif param.type() == "file":
|
|
method = self.parameterAsFile
|
|
elif param.type() == "matrix":
|
|
method = self.parameterAsMatrix
|
|
elif param.type() == "multilayer":
|
|
method = self.parameterAsLayerList
|
|
elif param.type() == "number":
|
|
method = self.parameterAsDouble
|
|
elif param.type() == "range":
|
|
method = self.parameterAsRange
|
|
elif param.type() == "raster":
|
|
method = self.parameterAsRasterLayer
|
|
elif param.type() == "enum":
|
|
method = self.parameterAsEnum
|
|
elif param.type() == "string":
|
|
method = self.parameterAsString
|
|
elif param.type() == "expression":
|
|
method = self.parameterAsString
|
|
elif param.type() == "vector":
|
|
method = self.parameterAsVectorLayer
|
|
elif param.type() == "field":
|
|
if param.allowMultiple():
|
|
method = self.parameterAsFields
|
|
else:
|
|
method = self.parameterAsString
|
|
elif param.type() == "source":
|
|
method = self.parameterAsSource
|
|
|
|
if method:
|
|
self.ns[param.name()] = method(parameters, param.name(), context)
|
|
|
|
self.ns['scriptDescriptionFile'] = self.descriptionFile
|
|
for out in self.outputDefinitions():
|
|
self.ns[out.name()] = None
|
|
|
|
self.ns['self'] = self
|
|
self.ns['parameters'] = parameters
|
|
|
|
expression_context = self.createExpressionContext(parameters, context)
|
|
variables = re.findall('@[a-zA-Z0-9_]*', self.script)
|
|
script = 'import processing\n'
|
|
script += self.script
|
|
for var in variables:
|
|
varname = var[1:]
|
|
if context.hasVariable(varname):
|
|
script = script.replace(var, context.variable(varname))
|
|
else:
|
|
# messy - it's probably NOT a variable, and instead an email address or some other string containing '@'
|
|
QgsMessageLog.logMessage(self.tr('Cannot find variable: {0}').format(varname), self.tr('Processing'), QgsMessageLog.WARNING)
|
|
self.cleaned_script = script
|
|
|
|
return True
|
|
|
|
def processAlgorithm(self, parameters, context, feedback):
|
|
self.ns['feedback'] = feedback
|
|
self.ns['context'] = context
|
|
|
|
exec((self.cleaned_script), self.ns)
|
|
self.results = {}
|
|
for out in self.outputDefinitions():
|
|
self.results[out.name()] = self.ns[out.name()]
|
|
del self.ns
|
|
return self.results
|
|
|
|
def helpUrl(self):
|
|
if self.descriptionFile is None:
|
|
return None
|
|
helpfile = self.descriptionFile + '.help'
|
|
if os.path.exists(helpfile):
|
|
return getHtmlFromHelpFile(self, helpfile)
|
|
else:
|
|
return None
|
|
|
|
def shortHelpString(self):
|
|
if self.descriptionFile is None:
|
|
return None
|
|
helpFile = str(self.descriptionFile) + '.help'
|
|
if os.path.exists(helpFile):
|
|
with open(helpFile) as f:
|
|
try:
|
|
descriptions = json.load(f)
|
|
if 'ALG_DESC' in descriptions:
|
|
return str(descriptions['ALG_DESC'])
|
|
except:
|
|
return None
|
|
return None
|
|
|
|
def getParameterDescriptions(self):
|
|
descs = {}
|
|
if self.descriptionFile is None:
|
|
return descs
|
|
helpFile = str(self.descriptionFile) + '.help'
|
|
if os.path.exists(helpFile):
|
|
with open(helpFile) as f:
|
|
try:
|
|
descriptions = json.load(f)
|
|
for param in self.parameterDefinitions():
|
|
if param.name() in descriptions:
|
|
descs[param.name()] = str(descriptions[param.name()])
|
|
except:
|
|
return descs
|
|
return descs
|
|
|
|
def tr(self, string, context=''):
|
|
if context == '':
|
|
context = self.__class__.__name__
|
|
return QCoreApplication.translate(context, string)
|