""" *************************************************************************** AlgorithmDialog.py --------------------- Date : August 2012 Copyright : (C) 2012 by Victor Olaya Email : volayaf at gmail dot com *************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * *************************************************************************** """ __author__ = "Victor Olaya" __date__ = "August 2012" __copyright__ = "(C) 2012, Victor Olaya" import datetime import time from qgis.PyQt.QtCore import QCoreApplication from qgis.PyQt.QtWidgets import QMessageBox, QPushButton, QDialogButtonBox from qgis.PyQt.QtGui import QColor, QPalette from qgis.core import ( Qgis, QgsApplication, QgsProcessingAlgRunnerTask, QgsProcessingOutputHtml, QgsProcessingAlgorithm, QgsProxyProgressTask, QgsProcessingFeatureSourceDefinition, ) from qgis.gui import ( QgsGui, QgsProcessingAlgorithmDialogBase, QgsProcessingParametersGenerator, QgsProcessingContextGenerator, ) from qgis.utils import iface from processing.core.ProcessingConfig import ProcessingConfig from processing.core.ProcessingResults import resultsList from processing.gui.ParametersPanel import ParametersPanel from processing.gui.BatchAlgorithmDialog import BatchAlgorithmDialog from processing.gui.AlgorithmDialogBase import AlgorithmDialogBase from processing.gui.AlgorithmExecutor import executeIterating, execute, execute_in_place from processing.gui.Postprocessing import handleAlgorithmResults from processing.tools import dataobjects class AlgorithmDialog(QgsProcessingAlgorithmDialogBase): def __init__(self, alg, in_place=False, parent=None): super().__init__(parent) self.feedback_dialog = None self.in_place = in_place self.active_layer = iface.activeLayer() if self.in_place else None self.context = None self.feedback = None self.history_log_id = None self.history_details = {} self.setAlgorithm(alg) self.setMainWidget(self.getParametersPanel(alg, self)) if not self.in_place: self.runAsBatchButton = QPushButton( QCoreApplication.translate("AlgorithmDialog", "Run as Batch Process…") ) self.runAsBatchButton.clicked.connect(self.runAsBatch) self.buttonBox().addButton( self.runAsBatchButton, QDialogButtonBox.ButtonRole.ResetRole ) # reset role to ensure left alignment else: in_place_input_parameter_name = "INPUT" if hasattr(alg, "inputParameterName"): in_place_input_parameter_name = alg.inputParameterName() self.mainWidget().setParameters( {in_place_input_parameter_name: self.active_layer} ) self.runAsBatchButton = None has_selection = self.active_layer and ( self.active_layer.selectedFeatureCount() > 0 ) self.buttonBox().button(QDialogButtonBox.StandardButton.Ok).setText( QCoreApplication.translate( "AlgorithmDialog", "Modify Selected Features" ) if has_selection else QCoreApplication.translate( "AlgorithmDialog", "Modify All Features" ) ) self.setWindowTitle(self.windowTitle() + " | " + self.active_layer.name()) self.updateRunButtonVisibility() def getParametersPanel(self, alg, parent): panel = ParametersPanel(parent, alg, self.in_place, self.active_layer) return panel def runAsBatch(self): self.close() dlg = BatchAlgorithmDialog(self.algorithm().create(), parent=iface.mainWindow()) dlg.show() dlg.exec() def resetAdditionalGui(self): if not self.in_place: self.runAsBatchButton.setEnabled(True) def blockAdditionalControlsWhileRunning(self): if not self.in_place: self.runAsBatchButton.setEnabled(False) def setParameters(self, parameters): self.mainWidget().setParameters(parameters) def flag_invalid_parameter_value(self, message: str, widget): """ Highlights a parameter with an invalid value """ try: self.buttonBox().accepted.connect(lambda w=widget: w.setPalette(QPalette())) palette = widget.palette() palette.setColor(QPalette.ColorRole.Base, QColor(255, 255, 0)) widget.setPalette(palette) except: pass self.messageBar().clearWidgets() self.messageBar().pushMessage( "", self.tr("Wrong or missing parameter value: {0}").format(message), level=Qgis.MessageLevel.Warning, duration=5, ) def flag_invalid_output_extension(self, message: str, widget): """ Highlights a parameter with an invalid output extension """ try: self.buttonBox().accepted.connect(lambda w=widget: w.setPalette(QPalette())) palette = widget.palette() palette.setColor(QPalette.ColorRole.Base, QColor(255, 255, 0)) widget.setPalette(palette) except: pass self.messageBar().clearWidgets() self.messageBar().pushMessage( "", message, level=Qgis.MessageLevel.Warning, duration=5 ) def createProcessingParameters( self, flags=QgsProcessingParametersGenerator.Flags() ): if self.mainWidget() is None: return {} try: return self.mainWidget().createProcessingParameters(flags) except AlgorithmDialogBase.InvalidParameterValue as e: self.flag_invalid_parameter_value(e.parameter.description(), e.widget) except AlgorithmDialogBase.InvalidOutputExtension as e: self.flag_invalid_output_extension(e.message, e.widget) return {} def processingContext(self): if self.context is None: self.feedback = self.createFeedback() self.context = dataobjects.createContext(self.feedback) self.applyContextOverrides(self.context) return self.context def runAlgorithm(self): self.feedback = self.createFeedback() self.context = dataobjects.createContext(self.feedback) self.applyContextOverrides(self.context) self.algorithmAboutToRun.emit(self.context) checkCRS = ProcessingConfig.getSetting(ProcessingConfig.WARN_UNMATCHING_CRS) try: # messy as all heck, but we don't want to call the dialog's implementation of # createProcessingParameters as we want to catch the exceptions raised by the # parameter panel instead... parameters = ( {} if self.mainWidget() is None else self.mainWidget().createProcessingParameters() ) if checkCRS and not self.algorithm().validateInputCrs( parameters, self.context ): reply = QMessageBox.question( self, self.tr("Unmatching CRS's"), self.tr( "Parameters do not all use the same CRS. This can " "cause unexpected results.\nDo you want to " "continue?" ), QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No, ) if reply == QMessageBox.StandardButton.No: return ok, msg = self.algorithm().checkParameterValues(parameters, self.context) if not ok: QMessageBox.warning(self, self.tr("Unable to execute algorithm"), msg) return self.blockControlsWhileRunning() self.setExecutedAnyResult(True) self.cancelButton().setEnabled(False) self.iterateParam = None for param in self.algorithm().parameterDefinitions(): if ( isinstance( parameters.get(param.name(), None), QgsProcessingFeatureSourceDefinition, ) and parameters[param.name()].flags & QgsProcessingFeatureSourceDefinition.Flag.FlagCreateIndividualOutputPerInputFeature ): self.iterateParam = param.name() break self.clearProgress() self.feedback.pushVersionInfo(self.algorithm().provider()) if ( self.algorithm().provider() and self.algorithm().provider().warningMessage() ): self.feedback.reportError(self.algorithm().provider().warningMessage()) self.feedback.pushInfo( QCoreApplication.translate( "AlgorithmDialog", "Algorithm started at: {}" ).format(datetime.datetime.now().replace(microsecond=0).isoformat()) ) self.setInfo( QCoreApplication.translate( "AlgorithmDialog", "Algorithm '{0}' starting…" ).format(self.algorithm().displayName()), escapeHtml=False, ) self.feedback.pushInfo(self.tr("Input parameters:")) display_params = [] for k, v in parameters.items(): display_params.append( "'" + k + "' : " + self.algorithm() .parameterDefinition(k) .valueAsPythonString(v, self.context) ) self.feedback.pushCommandInfo("{ " + ", ".join(display_params) + " }") self.feedback.pushInfo("") start_time = time.time() def elapsed_time(start_time) -> str: delta_t = time.time() - start_time hours = int(delta_t / 3600) minutes = int((delta_t % 3600) / 60) seconds = delta_t - hours * 3600 - minutes * 60 str_hours = [self.tr("hour"), self.tr("hours")][hours > 1] str_minutes = [self.tr("minute"), self.tr("minutes")][minutes > 1] str_seconds = [self.tr("second"), self.tr("seconds")][seconds != 1] if hours > 0: elapsed = "{0:0.2f} {1} ({2} {3} {4} {5} {6:0.0f} {1})".format( delta_t, str_seconds, hours, str_hours, minutes, str_minutes, seconds, ) elif minutes > 0: elapsed = "{0:0.2f} {1} ({2} {3} {4:0.0f} {1})".format( delta_t, str_seconds, minutes, str_minutes, seconds ) else: elapsed = f"{delta_t:0.2f} {str_seconds}" return elapsed if self.iterateParam: # Make sure the Log tab is visible before executing the algorithm try: self.showLog() self.repaint() except: pass self.cancelButton().setEnabled( self.algorithm().flags() & QgsProcessingAlgorithm.Flag.FlagCanCancel ) if executeIterating( self.algorithm(), parameters, self.iterateParam, self.context, self.feedback, ): self.feedback.pushInfo( self.tr("Execution completed in {}").format( elapsed_time(start_time) ) ) self.cancelButton().setEnabled(False) self.finish(True, parameters, self.context, self.feedback) else: self.cancelButton().setEnabled(False) self.resetGui() else: self.history_details = { "python_command": self.algorithm().asPythonCommand( parameters, self.context ), "algorithm_id": self.algorithm().id(), "parameters": self.algorithm().asMap(parameters, self.context), } process_command, command_ok = self.algorithm().asQgisProcessCommand( parameters, self.context ) if command_ok: self.history_details["process_command"] = process_command self.history_log_id, _ = QgsGui.historyProviderRegistry().addEntry( "processing", self.history_details ) QgsGui.instance().processingRecentAlgorithmLog().push( self.algorithm().id() ) self.cancelButton().setEnabled( self.algorithm().flags() & QgsProcessingAlgorithm.Flag.FlagCanCancel ) def on_complete(ok, results): if ok: self.feedback.pushInfo( self.tr("Execution completed in {}").format( elapsed_time(start_time) ) ) self.feedback.pushFormattedResults( self.algorithm(), self.context, results ) else: self.feedback.reportError( self.tr("Execution failed after {}").format( elapsed_time(start_time) ) ) self.feedback.pushInfo("") if self.history_log_id is not None: # can't deepcopy this! self.history_details["results"] = { k: v for k, v in results.items() if k != "CHILD_INPUTS" } self.history_details["log"] = self.feedback.htmlLog() QgsGui.historyProviderRegistry().updateEntry( self.history_log_id, self.history_details ) if self.feedback_dialog is not None: self.feedback_dialog.close() self.feedback_dialog.deleteLater() self.feedback_dialog = None self.cancelButton().setEnabled(False) self.finish( ok, results, self.context, self.feedback, in_place=self.in_place ) self.feedback = None self.context = None if not self.in_place and not ( self.algorithm().flags() & QgsProcessingAlgorithm.Flag.FlagNoThreading ): # Make sure the Log tab is visible before executing the algorithm self.showLog() task = QgsProcessingAlgRunnerTask( self.algorithm(), parameters, self.context, self.feedback ) if task.isCanceled(): on_complete(False, {}) else: task.executed.connect(on_complete) self.setCurrentTask(task) else: self.proxy_progress = QgsProxyProgressTask( QCoreApplication.translate( "AlgorithmDialog", "Executing “{}”" ).format(self.algorithm().displayName()) ) QgsApplication.taskManager().addTask(self.proxy_progress) self.feedback.progressChanged.connect( self.proxy_progress.setProxyProgress ) self.feedback_dialog = self.createProgressDialog() self.feedback_dialog.show() if self.in_place: ok, results = execute_in_place( self.algorithm(), parameters, self.context, self.feedback ) else: ok, results = execute( self.algorithm(), parameters, self.context, self.feedback ) self.feedback.progressChanged.disconnect() self.proxy_progress.finalize(ok) on_complete(ok, results) except AlgorithmDialogBase.InvalidParameterValue as e: self.flag_invalid_parameter_value(e.parameter.description(), e.widget) except AlgorithmDialogBase.InvalidOutputExtension as e: self.flag_invalid_output_extension(e.message, e.widget) def finish(self, successful, result, context, feedback, in_place=False): keepOpen = not successful or ProcessingConfig.getSetting( ProcessingConfig.KEEP_DIALOG_OPEN ) generated_html_outputs = False if not in_place and self.iterateParam is None: # add html results to results dock for out in self.algorithm().outputDefinitions(): if ( isinstance(out, QgsProcessingOutputHtml) and out.name() in result and result[out.name()] ): resultsList.addResult( icon=self.algorithm().icon(), name=out.description(), timestamp=time.localtime(), result=result[out.name()], ) generated_html_outputs = True if not handleAlgorithmResults(self.algorithm(), context, feedback, result): self.resetGui() return self.setExecuted(True) self.setResults(result) self.setInfo( self.tr("Algorithm '{0}' finished").format(self.algorithm().displayName()), escapeHtml=False, ) self.algorithmFinished.emit(successful, result) if not in_place and not keepOpen: self.close() else: self.resetGui() if generated_html_outputs: self.setInfo( self.tr( "HTML output has been generated by this algorithm." "\nOpen the results dialog to check it." ), escapeHtml=False, )