QGIS/python/plugins/processing/gui/Postprocessing.py
2024-01-19 19:44:48 +10:00

294 lines
11 KiB
Python

"""
***************************************************************************
Postprocessing.py
---------------------
Date : August 2012
Copyright : (C) 2012 by Victor Olaya
Email : volayaf at gmail dot com
***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************
"""
__author__ = 'Victor Olaya'
__date__ = 'August 2012'
__copyright__ = '(C) 2012, Victor Olaya'
import traceback
from typing import (
Dict,
List,
Optional,
Tuple
)
from qgis.PyQt.QtCore import QCoreApplication
from qgis.core import (
Qgis,
QgsProcessingFeedback,
QgsProcessingUtils,
QgsMapLayer,
QgsWkbTypes,
QgsMessageLog,
QgsProcessingContext,
QgsProcessingAlgorithm,
QgsLayerTreeLayer,
QgsLayerTreeGroup
)
from processing.core.ProcessingConfig import ProcessingConfig
from processing.gui.RenderingStyles import RenderingStyles
SORT_ORDER_CUSTOM_PROPERTY = '_processing_sort_order'
def determine_output_name(dest_id: str,
details: QgsProcessingContext.LayerDetails,
alg: QgsProcessingAlgorithm,
context: QgsProcessingContext,
parameters: Dict) -> str:
"""
If running a model, the execution will arrive here when an
algorithm that is part of that model is executed. We check if
its output is a final output of the model, and adapt the output
name accordingly
"""
for out in alg.outputDefinitions():
if out.name() not in parameters:
continue
output_value = parameters[out.name()]
if hasattr(output_value, "sink"):
output_value = output_value.sink.valueAsString(
context.expressionContext()
)[0]
else:
output_value = str(output_value)
if output_value == dest_id:
return out.name()
return details.outputName
def post_process_layer(output_name: str,
layer: QgsMapLayer,
alg: QgsProcessingAlgorithm):
"""
Applies post-processing steps to a layer
"""
style = None
if output_name:
style = RenderingStyles.getStyle(alg.id(), output_name)
if style is None:
if layer.type() == Qgis.LayerType.Raster:
style = ProcessingConfig.getSetting(ProcessingConfig.RASTER_STYLE)
elif layer.type() == Qgis.LayerType.Vector:
if layer.geometryType() == QgsWkbTypes.GeometryType.PointGeometry:
style = ProcessingConfig.getSetting(
ProcessingConfig.VECTOR_POINT_STYLE)
elif layer.geometryType() == QgsWkbTypes.GeometryType.LineGeometry:
style = ProcessingConfig.getSetting(
ProcessingConfig.VECTOR_LINE_STYLE)
else:
style = ProcessingConfig.getSetting(
ProcessingConfig.VECTOR_POLYGON_STYLE)
if style:
layer.loadNamedStyle(style)
try:
from qgis._3d import QgsPointCloudLayer3DRenderer
if layer.type() == Qgis.LayerType.PointCloud:
if layer.renderer3D() is None:
# If the layer has no 3D renderer and syncing 3D to 2D
# renderer is enabled, we create a renderer and set it up
# with the 2D renderer
if layer.sync3DRendererTo2DRenderer():
renderer_3d = QgsPointCloudLayer3DRenderer()
renderer_3d.convertFrom2DRenderer(layer.renderer())
layer.setRenderer3D(renderer_3d)
except ImportError:
QgsMessageLog.logMessage(
QCoreApplication.translate(
"Postprocessing",
"3D library is not available, "
"can't assign a 3d renderer to a layer."
)
)
def create_layer_tree_layer(layer: QgsMapLayer,
details: QgsProcessingContext.LayerDetails) \
-> QgsLayerTreeLayer:
"""
Applies post-processing steps to a QgsLayerTreeLayer created for
an algorithm's output
"""
layer_tree_layer = QgsLayerTreeLayer(layer)
if ProcessingConfig.getSetting(ProcessingConfig.VECTOR_FEATURE_COUNT) and \
layer.type() == Qgis.LayerType.Vector:
layer_tree_layer.setCustomProperty("showFeatureCount", True)
if details.layerSortKey:
layer_tree_layer.setCustomProperty(SORT_ORDER_CUSTOM_PROPERTY,
details.layerSortKey)
return layer_tree_layer
def get_layer_tree_results_group(details: QgsProcessingContext.LayerDetails,
context: QgsProcessingContext) \
-> QgsLayerTreeGroup:
"""
Returns the destination layer tree group to store results in
"""
destination_project = details.project or context.project()
# default to placing results in the top level of the layer tree
results_group = details.project.layerTreeRoot()
# if a specific results group is specified in Processing settings,
# respect it (and create if necessary)
results_group_name = ProcessingConfig.getSetting(
ProcessingConfig.RESULTS_GROUP_NAME)
if results_group_name:
results_group = destination_project.layerTreeRoot().findGroup(
results_group_name)
if not results_group:
results_group = destination_project.layerTreeRoot().insertGroup(
0, results_group_name)
results_group.setExpanded(True)
# if this particular output layer has a specific output group assigned,
# find or create it now
if details.groupName:
group = results_group.findGroup(details.groupName)
if not group:
group = results_group.insertGroup(
0, details.groupName)
group.setExpanded(True)
else:
group = results_group
return group
def handleAlgorithmResults(alg: QgsProcessingAlgorithm,
context: QgsProcessingContext,
feedback: Optional[QgsProcessingFeedback] = None,
parameters: Optional[Dict] = None):
if not parameters:
parameters = {}
if feedback is None:
feedback = QgsProcessingFeedback()
wrong_layers = []
feedback.setProgressText(
QCoreApplication.translate(
'Postprocessing',
'Loading resulting layers'
)
)
i = 0
added_layers: List[Tuple[QgsLayerTreeGroup, QgsLayerTreeLayer]] = []
layers_to_post_process: List[Tuple[QgsMapLayer,
QgsProcessingContext.LayerDetails]] = []
for dest_id, details in context.layersToLoadOnCompletion().items():
if feedback.isCanceled():
return False
if len(context.layersToLoadOnCompletion()) > 2:
# only show progress feedback if we're loading a bunch of layers
feedback.setProgress(
100 * i / float(len(context.layersToLoadOnCompletion()))
)
try:
layer = QgsProcessingUtils.mapLayerFromString(
dest_id,
context,
typeHint=details.layerTypeHint
)
if layer is not None:
details.setOutputLayerName(layer)
output_name = determine_output_name(
dest_id, details, alg, context, parameters
)
post_process_layer(output_name, layer, alg)
# Load layer to layer tree root or to a specific group
results_group = get_layer_tree_results_group(details, context)
# note here that we may not retrieve an owned layer -- eg if the
# output layer already exists in the destination project
owned_map_layer = context.temporaryLayerStore().takeMapLayer(layer)
if owned_map_layer:
details.project.addMapLayer(owned_map_layer, False)
# we don't add the layer to the tree yet -- that's done
# later, after we've sorted all added layers
layer_tree_layer = create_layer_tree_layer(owned_map_layer, details)
added_layers.append((results_group, layer_tree_layer))
if details.postProcessor():
# we defer calling the postProcessor set in the context
# until the layer has been added to the project's layer
# tree, just in case the postProcessor contains logic
# relating to layer tree handling
layers_to_post_process.append((layer, details))
else:
wrong_layers.append(str(dest_id))
except Exception:
QgsMessageLog.logMessage(
QCoreApplication.translate(
'Postprocessing',
"Error loading result layer:"
) + "\n" + traceback.format_exc(),
'Processing',
Qgis.MessageLevel.Critical)
wrong_layers.append(str(dest_id))
i += 1
# sort added layer tree layers
sorted_layer_tree_layers = sorted(
added_layers,
key=lambda x: x[1].customProperty(SORT_ORDER_CUSTOM_PROPERTY, 0)
)
for group, layer_node in sorted_layer_tree_layers:
layer_node.removeCustomProperty(SORT_ORDER_CUSTOM_PROPERTY)
group.insertChildNode(0, layer_node)
# all layers have been added to the layer tree, so safe to call
# postProcessors now
for layer, details in layers_to_post_process:
details.postProcessor().postProcessLayer(
layer,
context,
feedback)
feedback.setProgress(100)
if wrong_layers:
msg = QCoreApplication.translate(
'Postprocessing',
"The following layers were not correctly generated."
)
msg += "\n" + "\n".join([f"{lay}" for lay in wrong_layers]) + '\n'
msg += QCoreApplication.translate(
'Postprocessing',
"You can check the 'Log Messages Panel' in QGIS main window "
"to find more information about the execution of the algorithm.")
feedback.reportError(msg)
return len(wrong_layers) == 0