# -*- coding: utf-8 -*- """ /*************************************************************************** Name : Virtual layers plugin for DB Manager Date : December 2015 copyright : (C) 2015 by Hugo Mercier email : hugo dot mercier at oslandia dot com ***************************************************************************/ /*************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * ***************************************************************************/ """ from __future__ import print_function from builtins import object from qgis.PyQt.QtCore import QUrl, QTemporaryFile from ..connector import DBConnector from ..plugin import Table from qgis.core import QgsDataSourceUri, QgsVirtualLayerDefinition, QgsProject, QgsMapLayer, QgsVectorLayer, QgsCoordinateReferenceSystem, QgsWkbTypes import sqlite3 class sqlite3_connection(object): def __init__(self, sqlite_file): self.conn = sqlite3.connect(sqlite_file) def __enter__(self): return self.conn def __exit__(self, type, value, traceback): self.conn.close() def getQueryGeometryName(sqlite_file): # introspect the file with sqlite3_connection(sqlite_file) as conn: c = conn.cursor() for r in c.execute("SELECT url FROM _meta"): d = QgsVirtualLayerDefinition.fromUrl(QUrl(r[0])) if d.hasDefinedGeometry(): return d.geometryField() return None def classFactory(): return VLayerConnector # Tables in DB Manager are identified by their display names # This global registry maps a display name with a layer id # It is filled when getVectorTables is called class VLayerRegistry(object): _instance = None @classmethod def instance(cls): if cls._instance is None: cls._instance = VLayerRegistry() return cls._instance def __init__(self): self.layers = {} def reset(self): self.layers = {} def has(self, k): return k in self.layers def get(self, k): return self.layers.get(k) def __getitem__(self, k): return self.get(k) def set(self, k, l): self.layers[k] = l def __setitem__(self, k, l): self.set(k, l) def items(self): return list(self.layers.items()) def getLayer(self, l): lid = self.layers.get(l) if lid is None: return lid return QgsProject.instance().mapLayer(lid) class VLayerConnector(DBConnector): def __init__(self, uri): pass def _execute(self, cursor, sql): # This is only used to get list of fields class DummyCursor(object): def __init__(self, sql): self.sql = sql def close(self): pass return DummyCursor(sql) def _get_cursor(self, name=None): # fix_print_with_import print(("_get_cursor_", name)) def _get_cursor_columns(self, c): tf = QTemporaryFile() tf.open() tmp = tf.fileName() tf.close() df = QgsVirtualLayerDefinition() df.setFilePath(tmp) df.setQuery(c.sql) p = QgsVectorLayer(df.toString(), "vv", "virtual") if not p.isValid(): return [] f = [f.name() for f in p.fields()] if p.geometryType() != QgsWkbTypes.NullGeometry: gn = getQueryGeometryName(tmp) if gn: f += [gn] return f def uri(self): return QgsDataSourceUri("qgis") def getInfo(self): return "info" def getSpatialInfo(self): return None def hasSpatialSupport(self): return True def hasRasterSupport(self): return False def hasCustomQuerySupport(self): return True def hasTableColumnEditingSupport(self): return False def fieldTypes(self): return [ "integer", "bigint", "smallint", # integers "real", "double", "float", "numeric", # floats "varchar", "varchar(255)", "character(20)", "text", # strings "date", "datetime" # date/time ] def getSchemas(self): return None def getTables(self, schema=None, add_sys_tables=False): """ get list of tables """ return self.getVectorTables() def getVectorTables(self, schema=None): """ get list of table with a geometry column it returns: name (table name) is_system_table type = 'view' (is a view?) geometry_column: f_table_name (the table name in geometry_columns may be in a wrong case, use this to load the layer) f_geometry_column type coord_dimension srid """ reg = VLayerRegistry.instance() VLayerRegistry.instance().reset() lst = [] for _, l in list(QgsProject.instance().mapLayers().items()): if l.type() == QgsMapLayer.VectorLayer: lname = l.name() # if there is already a layer with this name, use the layer id # as name if reg.has(lname): lname = l.id() VLayerRegistry.instance().set(lname, l.id()) geomType = None dim = None g = l.dataProvider().wkbType() if g == QgsWkbTypes.Point: geomType = 'POINT' dim = 'XY' elif g == QgsWkbTypes.LineString: geomType = 'LINESTRING' dim = 'XY' elif g == QgsWkbTypes.Polygon: geomType = 'POLYGON' dim = 'XY' elif g == QgsWkbTypes.MultiPoint: geomType = 'MULTIPOINT' dim = 'XY' elif g == QgsWkbTypes.MultiLineString: geomType = 'MULTILINESTRING' dim = 'XY' elif g == QgsWkbTypes.MultiPolygon: geomType = 'MULTIPOLYGON' dim = 'XY' elif g == QgsWkbTypes.Point25D: geomType = 'POINT' dim = 'XYZ' elif g == QgsWkbTypes.LineString25D: geomType = 'LINESTRING' dim = 'XYZ' elif g == QgsWkbTypes.Polygon25D: geomType = 'POLYGON' dim = 'XYZ' elif g == QgsWkbTypes.MultiPoint25D: geomType = 'MULTIPOINT' dim = 'XYZ' elif g == QgsWkbTypes.MultiLineString25D: geomType = 'MULTILINESTRING' dim = 'XYZ' elif g == QgsWkbTypes.MultiPolygon25D: geomType = 'MULTIPOLYGON' dim = 'XYZ' lst.append( (Table.VectorType, lname, False, False, l.id(), 'geometry', geomType, dim, l.crs().postgisSrid())) return lst def getRasterTables(self, schema=None): return [] def getTableRowCount(self, table): t = table[1] l = VLayerRegistry.instance().getLayer(t) return l.featureCount() def getTableFields(self, table): """ return list of columns in table """ t = table[1] l = VLayerRegistry.instance().getLayer(t) # id, name, type, nonnull, default, pk n = l.dataProvider().fields().size() f = [(i, f.name(), f.typeName(), False, None, False) for i, f in enumerate(l.dataProvider().fields())] f += [(n, "geometry", "geometry", False, None, False)] return f def getTableIndexes(self, table): return [] def getTableConstraints(self, table): return None def getTableTriggers(self, table): return [] def deleteTableTrigger(self, trigger, table=None): return def getTableExtent(self, table, geom): is_id, t = table if is_id: l = QgsProject.instance().mapLayer(t) else: l = VLayerRegistry.instance().getLayer(t) e = l.extent() r = (e.xMinimum(), e.yMinimum(), e.xMaximum(), e.yMaximum()) return r def getViewDefinition(self, view): print("**unimplemented** getViewDefinition") def getSpatialRefInfo(self, srid): crs = QgsCoordinateReferenceSystem(srid) return crs.description() def isVectorTable(self, table): return True def isRasterTable(self, table): return False def createTable(self, table, field_defs, pkey): print("**unimplemented** createTable") return False def deleteTable(self, table): print("**unimplemented** deleteTable") return False def emptyTable(self, table): print("**unimplemented** emptyTable") return False def renameTable(self, table, new_table): print("**unimplemented** renameTable") return False def moveTable(self, table, new_table, new_schema=None): print("**unimplemented** moveTable") return False def createView(self, view, query): print("**unimplemented** createView") return False def deleteView(self, view): print("**unimplemented** deleteView") return False def renameView(self, view, new_name): print("**unimplemented** renameView") return False def runVacuum(self): print("**unimplemented** runVacuum") return False def addTableColumn(self, table, field_def): print("**unimplemented** addTableColumn") return False def deleteTableColumn(self, table, column): print("**unimplemented** deleteTableColumn") def updateTableColumn(self, table, column, new_name, new_data_type=None, new_not_null=None, new_default=None): print("**unimplemented** updateTableColumn") def renameTableColumn(self, table, column, new_name): print("**unimplemented** renameTableColumn") return False def setColumnType(self, table, column, data_type): print("**unimplemented** setColumnType") return False def setColumnDefault(self, table, column, default): print("**unimplemented** setColumnDefault") return False def setColumnNull(self, table, column, is_null): print("**unimplemented** setColumnNull") return False def isGeometryColumn(self, table, column): print("**unimplemented** isGeometryColumn") return False def addGeometryColumn(self, table, geom_column='geometry', geom_type='POINT', srid=-1, dim=2): print("**unimplemented** addGeometryColumn") return False def deleteGeometryColumn(self, table, geom_column): print("**unimplemented** deleteGeometryColumn") return False def addTableUniqueConstraint(self, table, column): print("**unimplemented** addTableUniqueConstraint") return False def deleteTableConstraint(self, table, constraint): print("**unimplemented** deleteTableConstraint") return False def addTablePrimaryKey(self, table, column): print("**unimplemented** addTablePrimaryKey") return False def createTableIndex(self, table, name, column, unique=False): print("**unimplemented** createTableIndex") return False def deleteTableIndex(self, table, name): print("**unimplemented** deleteTableIndex") return False def createSpatialIndex(self, table, geom_column='geometry'): print("**unimplemented** createSpatialIndex") return False def deleteSpatialIndex(self, table, geom_column='geometry'): print("**unimplemented** deleteSpatialIndex") return False def hasSpatialIndex(self, table, geom_column='geometry'): print("**unimplemented** hasSpatialIndex") return False def execution_error_types(self): print("**unimplemented** execution_error_types") return False def connection_error_types(self): print("**unimplemented** connection_error_types") return False def getSqlDictionary(self): from .sql_dictionary import getSqlDictionary sql_dict = getSqlDictionary() items = [] for tbl in self.getTables(): items.append(tbl[1]) # table name for fld in self.getTableFields((None, tbl[1])): items.append(fld[1]) # field name sql_dict["identifier"] = items return sql_dict def getQueryBuilderDictionary(self): from .sql_dictionary import getQueryBuilderDictionary return getQueryBuilderDictionary()