mirror of
https://github.com/qgis/QGIS.git
synced 2025-10-11 00:04:09 -04:00
612 lines
22 KiB
Python
612 lines
22 KiB
Python
"""
|
|
***************************************************************************
|
|
GdalUtils.py
|
|
---------------------
|
|
Date : August 2012
|
|
Copyright : (C) 2012 by Victor Olaya
|
|
Email : volayaf at gmail dot com
|
|
***************************************************************************
|
|
* *
|
|
* This program is free software; you can redistribute it and/or modify *
|
|
* it under the terms of the GNU General Public License as published by *
|
|
* the Free Software Foundation; either version 2 of the License, or *
|
|
* (at your option) any later version. *
|
|
* *
|
|
***************************************************************************
|
|
"""
|
|
|
|
__author__ = "Victor Olaya"
|
|
__date__ = "August 2012"
|
|
__copyright__ = "(C) 2012, Victor Olaya"
|
|
|
|
from typing import Dict, List, Optional
|
|
import os
|
|
import subprocess
|
|
import platform
|
|
import re
|
|
import warnings
|
|
from dataclasses import dataclass
|
|
|
|
import psycopg2
|
|
|
|
from qgis.core import (
|
|
Qgis,
|
|
QgsBlockingProcess,
|
|
QgsRunProcess,
|
|
QgsApplication,
|
|
QgsVectorFileWriter,
|
|
QgsProcessingFeedback,
|
|
QgsProcessingUtils,
|
|
QgsMessageLog,
|
|
QgsSettings,
|
|
QgsCredentials,
|
|
QgsDataSourceUri,
|
|
QgsCoordinateReferenceSystem,
|
|
QgsProcessingException,
|
|
QgsProviderRegistry,
|
|
QgsMapLayer,
|
|
QgsProcessingContext,
|
|
)
|
|
|
|
from qgis.PyQt.QtCore import QCoreApplication, QProcess
|
|
|
|
|
|
@dataclass
|
|
class GdalConnectionDetails:
|
|
"""
|
|
Encapsulates connection details for a layer
|
|
"""
|
|
|
|
connection_string: Optional[str] = None
|
|
format: Optional[str] = None
|
|
open_options: Optional[list[str]] = None
|
|
layer_name: Optional[str] = None
|
|
credential_options: Optional[dict] = None
|
|
geometry_column_name: Optional[str] = None
|
|
|
|
def open_options_as_arguments(self) -> list[str]:
|
|
"""
|
|
Returns any open options as a list of arguments
|
|
"""
|
|
res = []
|
|
for option in self.open_options:
|
|
res.append(f"-oo {option}")
|
|
|
|
return res
|
|
|
|
def credential_options_as_arguments(self) -> list[str]:
|
|
"""
|
|
Returns any credential options as a list of arguments
|
|
"""
|
|
res = []
|
|
for key, value in self.credential_options.items():
|
|
res.append(f"--config {key} {value}")
|
|
|
|
return res
|
|
|
|
|
|
try:
|
|
from osgeo import gdal, ogr
|
|
|
|
gdal.UseExceptions()
|
|
ogr.UseExceptions()
|
|
|
|
gdalAvailable = True
|
|
except:
|
|
gdalAvailable = False
|
|
|
|
|
|
class GdalUtils:
|
|
GDAL_HELP_PATH = "GDAL_HELP_PATH"
|
|
|
|
supportedRasters = None
|
|
supportedOutputRasters = None
|
|
|
|
@staticmethod
|
|
def runGdal(commands, feedback=None):
|
|
if feedback is None:
|
|
feedback = QgsProcessingFeedback()
|
|
envval = os.getenv("PATH")
|
|
# We need to give some extra hints to get things picked up on OS X
|
|
isDarwin = False
|
|
try:
|
|
isDarwin = platform.system() == "Darwin"
|
|
except OSError: # https://travis-ci.org/m-kuhn/QGIS#L1493-L1526
|
|
pass
|
|
if isDarwin and os.path.isfile(
|
|
os.path.join(QgsApplication.prefixPath(), "Contents", "MacOS", "gdalinfo")
|
|
):
|
|
# Looks like there's a bundled gdal. Let's use it.
|
|
os.environ["PATH"] = "{}{}{}".format(
|
|
os.path.join(QgsApplication.prefixPath(), "Contents", "MacOS"),
|
|
os.pathsep,
|
|
envval,
|
|
)
|
|
else:
|
|
# Other platforms should use default gdal finder codepath
|
|
settings = QgsSettings()
|
|
path = settings.value("/GdalTools/gdalPath", "")
|
|
if not path.lower() in envval.lower().split(os.pathsep):
|
|
envval += f"{os.pathsep}{path}"
|
|
os.putenv("PATH", envval)
|
|
|
|
fused_command = " ".join([str(c) for c in commands])
|
|
QgsMessageLog.logMessage(fused_command, "Processing", Qgis.MessageLevel.Info)
|
|
feedback.pushInfo(GdalUtils.tr("GDAL command:"))
|
|
feedback.pushCommandInfo(fused_command)
|
|
feedback.pushInfo(GdalUtils.tr("GDAL command output:"))
|
|
|
|
loglines = [GdalUtils.tr("GDAL execution console output")]
|
|
# create string list of number from 0 to 99
|
|
progress_string_list = [str(a) for a in range(0, 100)]
|
|
|
|
def on_stdout(ba):
|
|
val = ba.data().decode("UTF-8")
|
|
# catch progress reports
|
|
if val == "100 - done.":
|
|
on_stdout.progress = 100
|
|
feedback.setProgress(on_stdout.progress)
|
|
else:
|
|
# remove any number of trailing "." or ".." strings
|
|
match = re.match(r".*?(\d+)\.+\s*$", val)
|
|
found_number = False
|
|
if match:
|
|
int_val = match.group(1)
|
|
if int_val in progress_string_list:
|
|
on_stdout.progress = int(int_val)
|
|
feedback.setProgress(on_stdout.progress)
|
|
found_number = True
|
|
|
|
if not found_number and val == ".":
|
|
on_stdout.progress += 2.5
|
|
feedback.setProgress(on_stdout.progress)
|
|
|
|
on_stdout.buffer += val
|
|
if on_stdout.buffer.endswith("\n") or on_stdout.buffer.endswith("\r"):
|
|
# flush buffer
|
|
feedback.pushConsoleInfo(on_stdout.buffer.rstrip())
|
|
loglines.append(on_stdout.buffer.rstrip())
|
|
on_stdout.buffer = ""
|
|
|
|
on_stdout.progress = 0
|
|
on_stdout.buffer = ""
|
|
|
|
def on_stderr(ba):
|
|
val = ba.data().decode("UTF-8")
|
|
on_stderr.buffer += val
|
|
|
|
if on_stderr.buffer.endswith("\n") or on_stderr.buffer.endswith("\r"):
|
|
# flush buffer
|
|
feedback.reportError(on_stderr.buffer.rstrip())
|
|
loglines.append(on_stderr.buffer.rstrip())
|
|
on_stderr.buffer = ""
|
|
|
|
on_stderr.buffer = ""
|
|
|
|
command, *arguments = QgsRunProcess.splitCommand(fused_command)
|
|
proc = QgsBlockingProcess(command, arguments)
|
|
proc.setStdOutHandler(on_stdout)
|
|
proc.setStdErrHandler(on_stderr)
|
|
|
|
res = proc.run(feedback)
|
|
|
|
# Ensure to flush the buffers if they are not empty.
|
|
# For example, this can happen on stdout if the
|
|
# output did not end with a new line.
|
|
if on_stdout.buffer:
|
|
loglines.append(on_stdout.buffer.rstrip())
|
|
if on_stderr.buffer:
|
|
loglines.append(on_stderr.buffer.rstrip())
|
|
|
|
if feedback.isCanceled() and res != 0:
|
|
feedback.pushInfo(GdalUtils.tr("Process was canceled and did not complete"))
|
|
elif (
|
|
not feedback.isCanceled()
|
|
and proc.exitStatus() == QProcess.ExitStatus.CrashExit
|
|
):
|
|
raise QgsProcessingException(
|
|
GdalUtils.tr("Process was unexpectedly terminated")
|
|
)
|
|
elif res == 0:
|
|
feedback.pushInfo(GdalUtils.tr("Process completed successfully"))
|
|
elif proc.processError() == QProcess.ProcessError.FailedToStart:
|
|
raise QgsProcessingException(
|
|
GdalUtils.tr(
|
|
"Process {} failed to start. Either {} is missing, or you may have insufficient permissions to run the program."
|
|
).format(command, command)
|
|
)
|
|
else:
|
|
feedback.reportError(
|
|
GdalUtils.tr("Process returned error code {}").format(res)
|
|
)
|
|
|
|
return loglines
|
|
|
|
@staticmethod
|
|
def getSupportedRasters():
|
|
if not gdalAvailable:
|
|
return {}
|
|
|
|
if GdalUtils.supportedRasters is not None:
|
|
return GdalUtils.supportedRasters
|
|
|
|
if gdal.GetDriverCount() == 0:
|
|
gdal.AllRegister()
|
|
|
|
GdalUtils.supportedRasters = {}
|
|
GdalUtils.supportedOutputRasters = {}
|
|
GdalUtils.supportedRasters["GTiff"] = ["tif", "tiff"]
|
|
GdalUtils.supportedOutputRasters["GTiff"] = ["tif", "tiff"]
|
|
|
|
for i in range(gdal.GetDriverCount()):
|
|
driver = gdal.GetDriver(i)
|
|
if driver is None:
|
|
continue
|
|
shortName = driver.ShortName
|
|
metadata = driver.GetMetadata()
|
|
if gdal.DCAP_RASTER not in metadata or metadata[gdal.DCAP_RASTER] != "YES":
|
|
continue
|
|
|
|
if gdal.DMD_EXTENSIONS in metadata:
|
|
extensions = metadata[gdal.DMD_EXTENSIONS].split(" ")
|
|
if extensions:
|
|
GdalUtils.supportedRasters[shortName] = extensions
|
|
# Only creatable rasters can be referenced in output rasters
|
|
if (
|
|
gdal.DCAP_CREATE in metadata
|
|
and metadata[gdal.DCAP_CREATE] == "YES"
|
|
) or (
|
|
gdal.DCAP_CREATECOPY in metadata
|
|
and metadata[gdal.DCAP_CREATECOPY] == "YES"
|
|
):
|
|
GdalUtils.supportedOutputRasters[shortName] = extensions
|
|
|
|
return GdalUtils.supportedRasters
|
|
|
|
@staticmethod
|
|
def getSupportedOutputRasters():
|
|
if not gdalAvailable:
|
|
return {}
|
|
|
|
if GdalUtils.supportedOutputRasters is not None:
|
|
return GdalUtils.supportedOutputRasters
|
|
else:
|
|
GdalUtils.getSupportedRasters()
|
|
|
|
return GdalUtils.supportedOutputRasters
|
|
|
|
@staticmethod
|
|
def getSupportedRasterExtensions():
|
|
allexts = []
|
|
for exts in list(GdalUtils.getSupportedRasters().values()):
|
|
for ext in exts:
|
|
if ext not in allexts and ext not in ["", "tif", "tiff"]:
|
|
allexts.append(ext)
|
|
allexts.sort()
|
|
allexts[0:0] = ["tif", "tiff"]
|
|
return allexts
|
|
|
|
@staticmethod
|
|
def getSupportedOutputRasterExtensions():
|
|
allexts = []
|
|
for exts in list(GdalUtils.getSupportedOutputRasters().values()):
|
|
for ext in exts:
|
|
if ext not in allexts and ext not in ["", "tif", "tiff"]:
|
|
allexts.append(ext)
|
|
allexts.sort()
|
|
allexts[0:0] = ["tif", "tiff"]
|
|
return allexts
|
|
|
|
@staticmethod
|
|
def getVectorDriverFromFileName(filename):
|
|
ext = os.path.splitext(filename)[1]
|
|
if ext == "":
|
|
return "ESRI Shapefile"
|
|
|
|
formats = QgsVectorFileWriter.supportedFiltersAndFormats()
|
|
for format in formats:
|
|
if ext in format.filterString:
|
|
return format.driverName
|
|
return "ESRI Shapefile"
|
|
|
|
@staticmethod
|
|
def getFormatShortNameFromFilename(filename):
|
|
ext = filename[filename.rfind(".") + 1 :]
|
|
supported = GdalUtils.getSupportedRasters()
|
|
for name in list(supported.keys()):
|
|
exts = supported[name]
|
|
if ext in exts:
|
|
return name
|
|
return "GTiff"
|
|
|
|
@staticmethod
|
|
def escapeAndJoin(strList):
|
|
escChars = [" ", "&", "(", ")", '"', ";"]
|
|
joined = ""
|
|
for s in strList:
|
|
if not isinstance(s, str):
|
|
s = str(s)
|
|
# don't escape if command starts with - and isn't a negative number, e.g. -9999
|
|
if s and re.match(r"^([^-]|-\d)", s) and any(c in s for c in escChars):
|
|
escaped = '"' + s.replace("\\", "\\\\").replace('"', '"""') + '"'
|
|
else:
|
|
escaped = s
|
|
if escaped is not None:
|
|
joined += escaped + " "
|
|
return joined.strip()
|
|
|
|
@staticmethod
|
|
def version():
|
|
return int(gdal.VersionInfo("VERSION_NUM"))
|
|
|
|
@staticmethod
|
|
def readableVersion():
|
|
return gdal.VersionInfo("RELEASE_NAME")
|
|
|
|
@staticmethod
|
|
def gdal_connection_details_from_uri(
|
|
uri: str, context: QgsProcessingContext
|
|
) -> GdalConnectionDetails:
|
|
"""
|
|
Generates GDAL connection details from layer source
|
|
"""
|
|
layer = QgsProcessingUtils.mapLayerFromString(uri, context, False)
|
|
if layer is None:
|
|
path, ext = os.path.splitext(uri)
|
|
_format = QgsVectorFileWriter.driverForExtension(ext)
|
|
return GdalConnectionDetails(connection_string=uri, format=f'"{_format}"')
|
|
|
|
return GdalUtils.gdal_connection_details_from_layer(layer)
|
|
|
|
@staticmethod
|
|
def gdal_connection_details_from_layer(layer: QgsMapLayer) -> GdalConnectionDetails:
|
|
"""
|
|
Builds GDAL connection details from a QGIS map layer
|
|
"""
|
|
provider = layer.providerType()
|
|
if provider == "spatialite":
|
|
# dbname='/geodata/osm_ch.sqlite' table="places" (Geometry) sql=
|
|
regex = re.compile("dbname='(.+)'")
|
|
r = regex.search(str(layer.source()))
|
|
return GdalConnectionDetails(
|
|
connection_string=r.groups()[0], format='"SQLite"'
|
|
)
|
|
elif provider == "postgres":
|
|
# dbname='ktryjh_iuuqef' host=spacialdb.com port=9999
|
|
# user='ktryjh_iuuqef' password='xyqwer' sslmode=disable
|
|
# key='gid' estimatedmetadata=true srid=4326 type=MULTIPOLYGON
|
|
# table="t4" (geom) sql=
|
|
dsUri = QgsDataSourceUri(layer.dataProvider().dataSourceUri())
|
|
conninfo = dsUri.connectionInfo()
|
|
conn = None
|
|
ok = False
|
|
while not conn:
|
|
try:
|
|
conn = psycopg2.connect(dsUri.connectionInfo())
|
|
except psycopg2.OperationalError:
|
|
(ok, user, passwd) = QgsCredentials.instance().get(
|
|
conninfo, dsUri.username(), dsUri.password()
|
|
)
|
|
if not ok:
|
|
break
|
|
|
|
dsUri.setUsername(user)
|
|
dsUri.setPassword(passwd)
|
|
|
|
if not conn:
|
|
raise RuntimeError(
|
|
"Could not connect to PostgreSQL database - check connection info"
|
|
)
|
|
|
|
if ok:
|
|
QgsCredentials.instance().put(conninfo, user, passwd)
|
|
|
|
return GdalConnectionDetails(
|
|
connection_string=f"PG:{dsUri.connectionInfo()}", format='"PostgreSQL"'
|
|
)
|
|
elif provider == "mssql":
|
|
# 'dbname=\'db_name\' host=myHost estimatedmetadata=true
|
|
# srid=27700 type=MultiPolygon table="dbo"."my_table"
|
|
# #(Shape) sql='
|
|
dsUri = layer.dataProvider().uri()
|
|
ogrstr = "MSSQL:"
|
|
ogrstr += f"database={dsUri.database()};"
|
|
ogrstr += f"server={dsUri.host()};"
|
|
if dsUri.username() != "":
|
|
ogrstr += f"uid={dsUri.username()};"
|
|
else:
|
|
ogrstr += "trusted_connection=yes;"
|
|
if dsUri.password() != "":
|
|
ogrstr += f"pwd={dsUri.password()};"
|
|
ogrstr += f"tables={dsUri.table()}"
|
|
return GdalConnectionDetails(connection_string=ogrstr, format='"MSSQL"')
|
|
elif provider == "oracle":
|
|
# OCI:user/password@host:port/service:table
|
|
dsUri = QgsDataSourceUri(layer.dataProvider().dataSourceUri())
|
|
ogrstr = "OCI:"
|
|
if dsUri.username() != "":
|
|
ogrstr += dsUri.username()
|
|
if dsUri.password() != "":
|
|
ogrstr += "/" + dsUri.password()
|
|
delim = "@"
|
|
|
|
if dsUri.host() != "":
|
|
ogrstr += delim + dsUri.host()
|
|
delim = ""
|
|
if dsUri.port() not in ["", "1521"]:
|
|
ogrstr += ":" + dsUri.port()
|
|
ogrstr += "/"
|
|
if dsUri.database() != "":
|
|
ogrstr += dsUri.database()
|
|
elif dsUri.database() != "":
|
|
ogrstr += delim + dsUri.database()
|
|
|
|
if ogrstr == "OCI:":
|
|
raise RuntimeError("Invalid oracle data source - check connection info")
|
|
|
|
ogrstr += ":"
|
|
if dsUri.schema() != "":
|
|
ogrstr += dsUri.schema() + "."
|
|
|
|
ogrstr += dsUri.table()
|
|
return GdalConnectionDetails(connection_string=ogrstr, format='"OCI"')
|
|
elif provider.lower() == "wfs":
|
|
uri = QgsDataSourceUri(layer.source())
|
|
baseUrl = uri.param("url").split("?")[0]
|
|
return GdalConnectionDetails(
|
|
connection_string=f"WFS:{baseUrl}", format='"WFS"'
|
|
)
|
|
elif provider.lower() == "ogr":
|
|
parts = QgsProviderRegistry.instance().decodeUri("ogr", layer.source())
|
|
if "path" in parts:
|
|
path = parts["path"]
|
|
if "vsiPrefix" in parts:
|
|
path = parts["vsiPrefix"] + path
|
|
|
|
_, ext = os.path.splitext(parts["path"])
|
|
format = QgsVectorFileWriter.driverForExtension(ext)
|
|
|
|
return GdalConnectionDetails(
|
|
connection_string=path,
|
|
format=f'"{format}"',
|
|
open_options=parts.get("openOptions", None),
|
|
credential_options=parts.get("credentialOptions", None),
|
|
)
|
|
elif provider.lower() == "gdal":
|
|
parts = QgsProviderRegistry.instance().decodeUri("gdal", layer.source())
|
|
if "path" in parts:
|
|
path = parts["path"]
|
|
if "vsiPrefix" in parts:
|
|
path = parts["vsiPrefix"] + path
|
|
|
|
return GdalConnectionDetails(
|
|
connection_string=path,
|
|
open_options=parts.get("openOptions", None),
|
|
credential_options=parts.get("credentialOptions", None),
|
|
)
|
|
elif provider == "postgresraster":
|
|
gdal_source = ""
|
|
uri = layer.dataProvider().uri()
|
|
gdal_source = f"PG: {uri.connectionInfo()}"
|
|
schema = uri.schema()
|
|
if schema:
|
|
gdal_source += f" schema='{schema}'"
|
|
table = uri.table()
|
|
gdal_source += f" table='{table}'"
|
|
column = uri.param("column") or uri.geometryColumn()
|
|
if column:
|
|
gdal_source += f" column='{column}'"
|
|
is_tiled = any(
|
|
[
|
|
layer.dataProvider().xSize() != layer.dataProvider().xBlockSize(),
|
|
layer.dataProvider().ySize() != layer.dataProvider().yBlockSize(),
|
|
]
|
|
)
|
|
gdal_source += f" mode={2 if is_tiled else 1}"
|
|
where = layer.dataProvider().subsetString()
|
|
if where:
|
|
gdal_source += f" where='{where}'"
|
|
|
|
return GdalConnectionDetails(
|
|
connection_string=gdal_source, format='"PostGISRaster"'
|
|
)
|
|
|
|
ogrstr = str(layer.source()).split("|")[0]
|
|
path, ext = os.path.splitext(ogrstr)
|
|
format = QgsVectorFileWriter.driverForExtension(ext)
|
|
return GdalConnectionDetails(connection_string=ogrstr, format=f'"{format}"')
|
|
|
|
@staticmethod
|
|
def ogrLayerName(uri):
|
|
if uri.startswith('"') and uri.endswith('"'):
|
|
uri = uri.strip('"')
|
|
if " table=" in uri:
|
|
# table="schema"."table"
|
|
re_table_schema = re.compile(' table="([^"]*)"\\."([^"]*)"')
|
|
r = re_table_schema.search(uri)
|
|
if r:
|
|
return r.groups()[0] + "." + r.groups()[1]
|
|
# table="table"
|
|
re_table = re.compile(' table="([^"]*)"')
|
|
r = re_table.search(uri)
|
|
if r:
|
|
return r.groups()[0]
|
|
elif "layername" in uri:
|
|
regex = re.compile("(layername=)([^|]*)")
|
|
r = regex.search(uri)
|
|
return r.groups()[1]
|
|
|
|
fields = uri.split("|")
|
|
basePath = fields[0]
|
|
fields = fields[1:]
|
|
layerid = 0
|
|
for f in fields:
|
|
if f.startswith("layername="):
|
|
return f.split("=")[1]
|
|
if f.startswith("layerid="):
|
|
layerid = int(f.split("=")[1])
|
|
|
|
try:
|
|
ds = gdal.OpenEx(basePath, gdal.OF_VECTOR)
|
|
except Exception:
|
|
return None
|
|
|
|
ly = ds.GetLayer(layerid)
|
|
if not ly:
|
|
return None
|
|
|
|
name = ly.GetName()
|
|
ds = None
|
|
return name
|
|
|
|
@staticmethod
|
|
def parseCreationOptions(value):
|
|
parts = value.split("|")
|
|
options = []
|
|
for p in parts:
|
|
options.extend(["-co", p])
|
|
return options
|
|
|
|
@staticmethod
|
|
def writeLayerParameterToTextFile(
|
|
filename, alg, parameters, parameter_name, context, quote=True, executing=False
|
|
):
|
|
listFile = QgsProcessingUtils.generateTempFilename(filename, context)
|
|
|
|
if executing:
|
|
layers = []
|
|
for l in alg.parameterAsLayerList(parameters, parameter_name, context):
|
|
layer_details = GdalUtils.gdal_connection_details_from_layer(l)
|
|
if quote:
|
|
layers.append('"' + layer_details.connection_string + '"')
|
|
else:
|
|
layers.append(layer_details.connection_string)
|
|
|
|
with open(listFile, "w") as f:
|
|
f.write("\n".join(layers))
|
|
|
|
return listFile
|
|
|
|
@staticmethod
|
|
def gdal_crs_string(crs):
|
|
"""
|
|
Converts a QgsCoordinateReferenceSystem to a string understandable
|
|
by GDAL
|
|
:param crs: crs to convert
|
|
:return: gdal friendly string
|
|
"""
|
|
if (
|
|
crs.authid().upper().startswith("EPSG:")
|
|
or crs.authid().upper().startswith("IGNF:")
|
|
or crs.authid().upper().startswith("ESRI:")
|
|
):
|
|
return crs.authid()
|
|
|
|
return crs.toWkt(QgsCoordinateReferenceSystem.WktVariant.WKT_PREFERRED_GDAL)
|
|
|
|
@classmethod
|
|
def tr(cls, string, context=""):
|
|
if context == "":
|
|
context = cls.__name__
|
|
return QCoreApplication.translate(context, string)
|