mirror of
				https://github.com/qgis/QGIS.git
				synced 2025-10-31 00:06:02 -04:00 
			
		
		
		
	
		
			
				
	
	
		
			250 lines
		
	
	
		
			9.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			250 lines
		
	
	
		
			9.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """
 | |
| ***************************************************************************
 | |
|     HypsometricCurves.py
 | |
|     ---------------------
 | |
|     Date                 : November 2014
 | |
|     Copyright            : (C) 2014 by Alexander Bruy
 | |
|     Email                : alexander dot bruy at gmail dot com
 | |
| ***************************************************************************
 | |
| *                                                                         *
 | |
| *   This program is free software; you can redistribute it and/or modify  *
 | |
| *   it under the terms of the GNU General Public License as published by  *
 | |
| *   the Free Software Foundation; either version 2 of the License, or     *
 | |
| *   (at your option) any later version.                                   *
 | |
| *                                                                         *
 | |
| ***************************************************************************
 | |
| """
 | |
| 
 | |
| __author__ = 'Alexander Bruy'
 | |
| __date__ = 'November 2014'
 | |
| __copyright__ = '(C) 2014, Alexander Bruy'
 | |
| 
 | |
| import os
 | |
| import csv
 | |
| 
 | |
| from osgeo import gdal, ogr, osr
 | |
| 
 | |
| gdal.UseExceptions()
 | |
| ogr.UseExceptions()
 | |
| osr.UseExceptions()
 | |
| 
 | |
| from qgis.core import (QgsRectangle,
 | |
|                        QgsGeometry,
 | |
|                        QgsFeatureRequest,
 | |
|                        QgsProcessingException,
 | |
|                        QgsProcessing,
 | |
|                        QgsProcessingParameterBoolean,
 | |
|                        QgsProcessingParameterNumber,
 | |
|                        QgsProcessingParameterRasterLayer,
 | |
|                        QgsProcessingParameterFeatureSource,
 | |
|                        QgsProcessingParameterFolderDestination)
 | |
| 
 | |
| from processing.algs.qgis.QgisAlgorithm import QgisAlgorithm
 | |
| from processing.tools import raster
 | |
| 
 | |
| from qgis.PyQt.QtCore import QCoreApplication
 | |
| 
 | |
| 
 | |
| class HypsometricCurves(QgisAlgorithm):
 | |
|     INPUT_DEM = 'INPUT_DEM'
 | |
|     BOUNDARY_LAYER = 'BOUNDARY_LAYER'
 | |
|     STEP = 'STEP'
 | |
|     USE_PERCENTAGE = 'USE_PERCENTAGE'
 | |
|     OUTPUT_DIRECTORY = 'OUTPUT_DIRECTORY'
 | |
| 
 | |
|     def group(self):
 | |
|         return self.tr('Raster terrain analysis')
 | |
| 
 | |
|     def groupId(self):
 | |
|         return 'rasterterrainanalysis'
 | |
| 
 | |
|     def __init__(self):
 | |
|         super().__init__()
 | |
| 
 | |
|     def initAlgorithm(self, config=None):
 | |
|         self.addParameter(QgsProcessingParameterRasterLayer(self.INPUT_DEM,
 | |
|                                                             self.tr('DEM to analyze')))
 | |
|         self.addParameter(QgsProcessingParameterFeatureSource(self.BOUNDARY_LAYER,
 | |
|                                                               self.tr('Boundary layer'), [QgsProcessing.SourceType.TypeVectorPolygon]))
 | |
|         self.addParameter(QgsProcessingParameterNumber(self.STEP,
 | |
|                                                        self.tr('Step'), type=QgsProcessingParameterNumber.Type.Double, minValue=0.0, defaultValue=100.0))
 | |
|         self.addParameter(QgsProcessingParameterBoolean(self.USE_PERCENTAGE,
 | |
|                                                         self.tr('Use % of area instead of absolute value'), defaultValue=False))
 | |
| 
 | |
|         self.addParameter(QgsProcessingParameterFolderDestination(self.OUTPUT_DIRECTORY,
 | |
|                                                                   self.tr('Hypsometric curves')))
 | |
| 
 | |
|     def name(self):
 | |
|         return 'hypsometriccurves'
 | |
| 
 | |
|     def displayName(self):
 | |
|         return self.tr('Hypsometric curves')
 | |
| 
 | |
|     def processAlgorithm(self, parameters, context, feedback):
 | |
|         try:
 | |
|             import numpy
 | |
|         except ImportError:
 | |
|             raise QgsProcessingException(QCoreApplication.translate('HypsometricCurves', 'This algorithm requires the Python “numpy” library. Please install this library and try again.'))
 | |
| 
 | |
|         raster_layer = self.parameterAsRasterLayer(parameters, self.INPUT_DEM, context)
 | |
|         target_crs = raster_layer.crs()
 | |
|         rasterPath = raster_layer.source()
 | |
| 
 | |
|         source = self.parameterAsSource(parameters, self.BOUNDARY_LAYER, context)
 | |
|         if source is None:
 | |
|             raise QgsProcessingException(self.invalidSourceError(parameters, self.BOUNDARY_LAYER))
 | |
| 
 | |
|         step = self.parameterAsDouble(parameters, self.STEP, context)
 | |
|         percentage = self.parameterAsBoolean(parameters, self.USE_PERCENTAGE, context)
 | |
| 
 | |
|         outputPath = self.parameterAsString(parameters, self.OUTPUT_DIRECTORY, context)
 | |
|         if not os.path.exists(outputPath):
 | |
|             os.makedirs(outputPath)
 | |
| 
 | |
|         rasterDS = gdal.Open(rasterPath, gdal.GA_ReadOnly)
 | |
|         geoTransform = rasterDS.GetGeoTransform()
 | |
|         rasterBand = rasterDS.GetRasterBand(1)
 | |
|         noData = rasterBand.GetNoDataValue()
 | |
| 
 | |
|         cellXSize = abs(geoTransform[1])
 | |
|         cellYSize = abs(geoTransform[5])
 | |
|         rasterXSize = rasterDS.RasterXSize
 | |
|         rasterYSize = rasterDS.RasterYSize
 | |
| 
 | |
|         rasterBBox = QgsRectangle(geoTransform[0],
 | |
|                                   geoTransform[3] - cellYSize * rasterYSize,
 | |
|                                   geoTransform[0] + cellXSize * rasterXSize,
 | |
|                                   geoTransform[3])
 | |
|         rasterGeom = QgsGeometry.fromRect(rasterBBox)
 | |
| 
 | |
|         crs = osr.SpatialReference()
 | |
|         crs.ImportFromProj4(str(target_crs.toProj()))
 | |
| 
 | |
|         memVectorDriver = ogr.GetDriverByName('Memory')
 | |
|         memRasterDriver = gdal.GetDriverByName('MEM')
 | |
| 
 | |
|         features = source.getFeatures(QgsFeatureRequest().setDestinationCrs(target_crs, context.transformContext()))
 | |
|         total = 100.0 / source.featureCount() if source.featureCount() else 0
 | |
| 
 | |
|         for current, f in enumerate(features):
 | |
|             if not f.hasGeometry():
 | |
|                 continue
 | |
| 
 | |
|             if feedback.isCanceled():
 | |
|                 break
 | |
| 
 | |
|             geom = f.geometry()
 | |
|             intersectedGeom = rasterGeom.intersection(geom)
 | |
| 
 | |
|             if intersectedGeom.isEmpty():
 | |
|                 feedback.pushInfo(
 | |
|                     self.tr('Feature {0} does not intersect raster or '
 | |
|                             'entirely located in NODATA area').format(f.id()))
 | |
|                 continue
 | |
| 
 | |
|             fName = os.path.join(
 | |
|                 outputPath, f'histogram_{source.sourceName()}_{f.id()}.csv')
 | |
| 
 | |
|             ogrGeom = ogr.CreateGeometryFromWkt(intersectedGeom.asWkt())
 | |
|             bbox = intersectedGeom.boundingBox()
 | |
|             xMin = bbox.xMinimum()
 | |
|             xMax = bbox.xMaximum()
 | |
|             yMin = bbox.yMinimum()
 | |
|             yMax = bbox.yMaximum()
 | |
| 
 | |
|             (startColumn, startRow) = raster.mapToPixel(xMin, yMax, geoTransform)
 | |
|             (endColumn, endRow) = raster.mapToPixel(xMax, yMin, geoTransform)
 | |
| 
 | |
|             width = endColumn - startColumn
 | |
|             height = endRow - startRow
 | |
| 
 | |
|             srcOffset = (startColumn, startRow, width, height)
 | |
|             srcArray = rasterBand.ReadAsArray(*srcOffset)
 | |
| 
 | |
|             if srcOffset[2] == 0 or srcOffset[3] == 0:
 | |
|                 feedback.pushInfo(
 | |
|                     self.tr('Feature {0} is smaller than raster '
 | |
|                             'cell size').format(f.id()))
 | |
|                 continue
 | |
| 
 | |
|             newGeoTransform = (
 | |
|                 geoTransform[0] + srcOffset[0] * geoTransform[1],
 | |
|                 geoTransform[1],
 | |
|                 0.0,
 | |
|                 geoTransform[3] + srcOffset[1] * geoTransform[5],
 | |
|                 0.0,
 | |
|                 geoTransform[5]
 | |
|             )
 | |
| 
 | |
|             memVDS = memVectorDriver.CreateDataSource('out')
 | |
|             memLayer = memVDS.CreateLayer('poly', crs, ogr.wkbPolygon)
 | |
| 
 | |
|             ft = ogr.Feature(memLayer.GetLayerDefn())
 | |
|             ft.SetGeometry(ogrGeom)
 | |
|             memLayer.CreateFeature(ft)
 | |
|             ft.Destroy()
 | |
| 
 | |
|             rasterizedDS = memRasterDriver.Create('', srcOffset[2],
 | |
|                                                   srcOffset[3], 1, gdal.GDT_Byte)
 | |
|             rasterizedDS.SetGeoTransform(newGeoTransform)
 | |
|             gdal.RasterizeLayer(rasterizedDS, [1], memLayer, burn_values=[1])
 | |
|             rasterizedArray = rasterizedDS.ReadAsArray()
 | |
| 
 | |
|             srcArray = numpy.nan_to_num(srcArray)
 | |
|             masked = numpy.ma.MaskedArray(srcArray,
 | |
|                                           mask=numpy.logical_or(srcArray == noData,
 | |
|                                                                 numpy.logical_not(rasterizedArray)))
 | |
| 
 | |
|             self.calculateHypsometry(f.id(), fName, feedback, masked,
 | |
|                                      cellXSize, cellYSize, percentage, step)
 | |
| 
 | |
|             memVDS = None
 | |
|             rasterizedDS = None
 | |
|             feedback.setProgress(int(current * total))
 | |
| 
 | |
|         rasterDS = None
 | |
| 
 | |
|         return {self.OUTPUT_DIRECTORY: outputPath}
 | |
| 
 | |
|     def calculateHypsometry(self, fid, fName, feedback, data, pX, pY,
 | |
|                             percentage, step):
 | |
|         out = dict()
 | |
|         d = data.compressed()
 | |
|         if d.size == 0:
 | |
|             feedback.pushInfo(
 | |
|                 self.tr('Feature {0} does not intersect raster or '
 | |
|                         'entirely located in NODATA area').format(fid))
 | |
|             return
 | |
| 
 | |
|         minValue = d.min()
 | |
|         maxValue = d.max()
 | |
|         startValue = minValue
 | |
|         tmpValue = minValue + step
 | |
|         while startValue < maxValue:
 | |
|             out[tmpValue] = ((startValue <= d) & (d < tmpValue)).sum()
 | |
|             startValue = tmpValue
 | |
|             tmpValue += step
 | |
| 
 | |
|         if percentage:
 | |
|             multiplier = 100.0 / len(d.flat)
 | |
|         else:
 | |
|             multiplier = pX * pY
 | |
| 
 | |
|         for k, v in out.items():
 | |
|             out[k] = v * multiplier
 | |
| 
 | |
|         prev = None
 | |
|         for i in sorted(out.items()):
 | |
|             if prev is None:
 | |
|                 out[i[0]] = i[1]
 | |
|             else:
 | |
|                 out[i[0]] = i[1] + out[prev]
 | |
|             prev = i[0]
 | |
| 
 | |
|         with open(fName, 'w', newline='', encoding='utf-8') as out_file:
 | |
|             writer = csv.writer(out_file)
 | |
|             writer.writerow([self.tr('Area'), self.tr('Elevation')])
 | |
| 
 | |
|             for i in sorted(out.items()):
 | |
|                 writer.writerow([i[1], i[0]])
 |