mirror of
https://github.com/qgis/QGIS.git
synced 2025-04-21 00:03:05 -04:00
Since it's safe to evaluate parameters in background threads now, it's usually going to be ok to evaluate everything in the processAlgorithm step. This keeps the algorithm code as simple as possible, and will make porting faster. Note that the prepare/postProcess virtual methods still exist and can be used when an algorithm MUST do setup/cleanup work in the main thread.
305 lines
11 KiB
Python
305 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
***************************************************************************
|
|
ScriptAlgorithm.py
|
|
---------------------
|
|
Date : August 2012
|
|
Copyright : (C) 2012 by Victor Olaya
|
|
Email : volayaf at gmail dot com
|
|
***************************************************************************
|
|
* *
|
|
* This program is free software; you can redistribute it and/or modify *
|
|
* it under the terms of the GNU General Public License as published by *
|
|
* the Free Software Foundation; either version 2 of the License, or *
|
|
* (at your option) any later version. *
|
|
* *
|
|
***************************************************************************
|
|
"""
|
|
from builtins import str
|
|
|
|
__author__ = 'Victor Olaya'
|
|
__date__ = 'August 2012'
|
|
__copyright__ = '(C) 2012, Victor Olaya'
|
|
|
|
# This will get replaced with a git SHA1 when you do a git archive
|
|
|
|
__revision__ = '$Format:%H$'
|
|
|
|
import os
|
|
import re
|
|
import json
|
|
from qgis.core import (QgsExpressionContextUtils,
|
|
QgsExpressionContext,
|
|
QgsProcessingAlgorithm,
|
|
QgsProject,
|
|
QgsApplication,
|
|
QgsMessageLog,
|
|
QgsProcessingUtils,
|
|
QgsProcessingAlgorithm)
|
|
|
|
from qgis.PyQt.QtCore import (QCoreApplication)
|
|
|
|
from processing.core.GeoAlgorithm import GeoAlgorithm
|
|
from processing.gui.Help2Html import getHtmlFromHelpFile
|
|
from processing.core.parameters import getParameterFromString
|
|
from processing.core.outputs import getOutputFromString
|
|
from processing.script.WrongScriptException import WrongScriptException
|
|
|
|
pluginPath = os.path.split(os.path.dirname(__file__))[0]
|
|
|
|
|
|
class ScriptAlgorithm(QgsProcessingAlgorithm):
|
|
|
|
def __init__(self, descriptionFile, script=None):
|
|
"""The script parameter can be used to directly pass the code
|
|
of the script without a file.
|
|
|
|
This is to be used from the script edition dialog, but should
|
|
not be used in other cases.
|
|
"""
|
|
|
|
super().__init__()
|
|
self._icon = QgsApplication.getThemeIcon("/processingScript.svg")
|
|
self._name = ''
|
|
self._display_name = ''
|
|
self._group = ''
|
|
self._flags = None
|
|
|
|
self.script = script
|
|
self.allowEdit = True
|
|
self.noCRSWarning = False
|
|
self.descriptionFile = descriptionFile
|
|
if script is not None:
|
|
self.defineCharacteristicsFromScript()
|
|
if descriptionFile is not None:
|
|
self.defineCharacteristicsFromFile()
|
|
|
|
self.ns = {}
|
|
self.cleaned_script = None
|
|
self.results = {}
|
|
|
|
def create(self):
|
|
return ScriptAlgorithm(self.descriptionFile)
|
|
|
|
def icon(self):
|
|
return self._icon
|
|
|
|
def name(self):
|
|
return self._name
|
|
|
|
def displayName(self):
|
|
return self._display_name
|
|
|
|
def group(self):
|
|
return self._group
|
|
|
|
def flags(self):
|
|
if self._flags is not None:
|
|
return QgsProcessingAlgorithm.Flags(self._flags)
|
|
else:
|
|
return QgsProcessingAlgorithm.flags(self)
|
|
|
|
def svgIconPath(self):
|
|
return QgsApplication.iconPath("processingScript.svg")
|
|
|
|
def defineCharacteristicsFromFile(self):
|
|
self.error = None
|
|
self.script = ''
|
|
filename = os.path.basename(self.descriptionFile)
|
|
self._name = filename[:filename.rfind('.')].replace('_', ' ')
|
|
self._display_name = self._name
|
|
self._group = self.tr('User scripts', 'ScriptAlgorithm')
|
|
with open(self.descriptionFile) as lines:
|
|
line = lines.readline()
|
|
while line != '':
|
|
if line.startswith('##'):
|
|
try:
|
|
self.processParameterLine(line.strip('\n'))
|
|
except:
|
|
self.error = self.tr('This script has a syntax errors.\n'
|
|
'Problem with line: {0}', 'ScriptAlgorithm').format(line)
|
|
self.script += line
|
|
line = lines.readline()
|
|
if self._group == self.tr('[Test scripts]', 'ScriptAlgorithm'):
|
|
self._flags = QgsProcessingAlgorithm.FlagHideFromToolbox | QgsProcessingAlgorithm.FlagHideFromModeler
|
|
|
|
def defineCharacteristicsFromScript(self):
|
|
lines = self.script.split('\n')
|
|
self._name = '[Unnamed algorithm]'
|
|
self._display_name = self.tr('[Unnamed algorithm]', 'ScriptAlgorithm')
|
|
self._group = self.tr('User scripts', 'ScriptAlgorithm')
|
|
for line in lines:
|
|
if line.startswith('##'):
|
|
try:
|
|
self.processParameterLine(line.strip('\n'))
|
|
except:
|
|
pass
|
|
|
|
def canExecute(self):
|
|
return not self.error, self.error
|
|
|
|
def validateInputCrs(self, parameters, context):
|
|
if self.noCRSWarning:
|
|
return True
|
|
else:
|
|
return QgsProcessingAlgorithm.validateInputCrs(self, parameters, context)
|
|
|
|
def createDescriptiveName(self, s):
|
|
return s.replace('_', ' ')
|
|
|
|
def processParameterLine(self, line):
|
|
param = None
|
|
line = line.replace('#', '')
|
|
|
|
if line == "nomodeler":
|
|
self._flags = self._flags | QgsProcessingAlgorithm.FlagHideFromModeler
|
|
return
|
|
if line == "nocrswarning":
|
|
self.noCRSWarning = True
|
|
return
|
|
tokens = line.split('=', 1)
|
|
desc = self.createDescriptiveName(tokens[0])
|
|
if tokens[1].lower().strip() == 'group':
|
|
self._group = tokens[0]
|
|
return
|
|
if tokens[1].lower().strip() == 'name':
|
|
self._name = self._display_name = tokens[0]
|
|
return
|
|
|
|
out = getOutputFromString(line)
|
|
if out is None:
|
|
param = getParameterFromString(line)
|
|
|
|
if param is not None:
|
|
self.addParameter(param)
|
|
elif out is not None:
|
|
out.setName(tokens[0])
|
|
out.setDescription(desc)
|
|
self.addOutput(out)
|
|
else:
|
|
raise WrongScriptException(
|
|
self.tr('Could not load script: {0}.\n'
|
|
'Problem with line "{1}"', 'ScriptAlgorithm').format(self.descriptionFile or '', line))
|
|
|
|
def prepareAlgorithm(self, parameters, context, feedback):
|
|
for param in self.parameterDefinitions():
|
|
method = None
|
|
if param.type() == "boolean":
|
|
method = self.parameterAsBool
|
|
elif param.type() == "crs":
|
|
method = self.parameterAsCrs
|
|
elif param.type() == "layer":
|
|
method = self.parameterAsLayer
|
|
elif param.type() == "extent":
|
|
method = self.parameterAsExtent
|
|
elif param.type() == "point":
|
|
method = self.parameterAsPoint
|
|
elif param.type() == "file":
|
|
method = self.parameterAsFile
|
|
elif param.type() == "matrix":
|
|
method = self.parameterAsMatrix
|
|
elif param.type() == "multilayer":
|
|
method = self.parameterAsLayerList
|
|
elif param.type() == "number":
|
|
method = self.parameterAsDouble
|
|
elif param.type() == "range":
|
|
method = self.parameterAsRange
|
|
elif param.type() == "raster":
|
|
method = self.parameterAsRasterLayer
|
|
elif param.type() == "enum":
|
|
method = self.parameterAsEnum
|
|
elif param.type() == "string":
|
|
method = self.parameterAsString
|
|
elif param.type() == "expression":
|
|
method = self.parameterAsString
|
|
elif param.type() == "vector":
|
|
method = self.parameterAsVectorLayer
|
|
elif param.type() == "field":
|
|
if param.allowMultiple():
|
|
method = self.parameterAsFields
|
|
else:
|
|
method = self.parameterAsString
|
|
elif param.type() == "source":
|
|
method = self.parameterAsSource
|
|
|
|
if method:
|
|
self.ns[param.name()] = method(parameters, param.name(), context)
|
|
|
|
self.ns['scriptDescriptionFile'] = self.descriptionFile
|
|
for out in self.outputDefinitions():
|
|
self.ns[out.name()] = None
|
|
|
|
self.ns['self'] = self
|
|
self.ns['parameters'] = parameters
|
|
|
|
expression_context = self.createExpressionContext(parameters, context)
|
|
variables = re.findall('@[a-zA-Z0-9_]*', self.script)
|
|
script = 'import processing\n'
|
|
script += self.script
|
|
for var in variables:
|
|
varname = var[1:]
|
|
if context.hasVariable(varname):
|
|
script = script.replace(var, context.variable(varname))
|
|
else:
|
|
# messy - it's probably NOT a variable, and instead an email address or some other string containing '@'
|
|
QgsMessageLog.logMessage(self.tr('Cannot find variable: {0}').format(varname), self.tr('Processing'), QgsMessageLog.WARNING)
|
|
self.cleaned_script = script
|
|
|
|
return True
|
|
|
|
def processAlgorithm(self, parameters, context, feedback):
|
|
self.ns['feedback'] = feedback
|
|
self.ns['context'] = context
|
|
|
|
exec((self.cleaned_script), self.ns)
|
|
self.results = {}
|
|
for out in self.outputDefinitions():
|
|
self.results[out.name()] = self.ns[out.name()]
|
|
del self.ns
|
|
return self.results
|
|
|
|
def helpUrl(self):
|
|
if self.descriptionFile is None:
|
|
return None
|
|
helpfile = self.descriptionFile + '.help'
|
|
if os.path.exists(helpfile):
|
|
return getHtmlFromHelpFile(self, helpfile)
|
|
else:
|
|
return None
|
|
|
|
def shortHelpString(self):
|
|
if self.descriptionFile is None:
|
|
return None
|
|
helpFile = str(self.descriptionFile) + '.help'
|
|
if os.path.exists(helpFile):
|
|
with open(helpFile) as f:
|
|
try:
|
|
descriptions = json.load(f)
|
|
if 'ALG_DESC' in descriptions:
|
|
return str(descriptions['ALG_DESC'])
|
|
except:
|
|
return None
|
|
return None
|
|
|
|
def getParameterDescriptions(self):
|
|
descs = {}
|
|
if self.descriptionFile is None:
|
|
return descs
|
|
helpFile = str(self.descriptionFile) + '.help'
|
|
if os.path.exists(helpFile):
|
|
with open(helpFile) as f:
|
|
try:
|
|
descriptions = json.load(f)
|
|
for param in self.parameterDefinitions():
|
|
if param.name() in descriptions:
|
|
descs[param.name()] = str(descriptions[param.name()])
|
|
except:
|
|
return descs
|
|
return descs
|
|
|
|
def tr(self, string, context=''):
|
|
if context == '':
|
|
context = self.__class__.__name__
|
|
return QCoreApplication.translate(context, string)
|