mirror of
				https://github.com/qgis/QGIS.git
				synced 2025-11-04 00:04:25 -05:00 
			
		
		
		
	
		
			
				
	
	
		
			238 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			238 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
 | 
						|
"""
 | 
						|
/***************************************************************************
 | 
						|
Name                 : DB Manager
 | 
						|
Description          : Database manager plugin for QGIS
 | 
						|
Date                 : May 23, 2011
 | 
						|
copyright            : (C) 2011 by Giuseppe Sucameli
 | 
						|
email                : brush.tyler@gmail.com
 | 
						|
 | 
						|
 ***************************************************************************/
 | 
						|
 | 
						|
/***************************************************************************
 | 
						|
 *                                                                         *
 | 
						|
 *   This program is free software; you can redistribute it and/or modify  *
 | 
						|
 *   it under the terms of the GNU General Public License as published by  *
 | 
						|
 *   the Free Software Foundation; either version 2 of the License, or     *
 | 
						|
 *   (at your option) any later version.                                   *
 | 
						|
 *                                                                         *
 | 
						|
 ***************************************************************************/
 | 
						|
"""
 | 
						|
from builtins import str
 | 
						|
from builtins import object
 | 
						|
 | 
						|
from qgis.core import QgsDataSourceUri
 | 
						|
 | 
						|
from .plugin import DbError, ConnectionError
 | 
						|
 | 
						|
 | 
						|
class DBConnector(object):
 | 
						|
 | 
						|
    def __init__(self, uri):
 | 
						|
        self.connection = None
 | 
						|
        self._uri = uri
 | 
						|
 | 
						|
    def __del__(self):
 | 
						|
        pass  # print "DBConnector.__del__", self._uri.connectionInfo()
 | 
						|
        if self.connection is not None:
 | 
						|
            self.connection.close()
 | 
						|
        self.connection = None
 | 
						|
 | 
						|
    def uri(self):
 | 
						|
        return QgsDataSourceUri(self._uri.uri(False))
 | 
						|
 | 
						|
    def cancel(self):
 | 
						|
        pass
 | 
						|
 | 
						|
    def publicUri(self):
 | 
						|
        publicUri = QgsDataSourceUri.removePassword(self._uri.uri(False))
 | 
						|
        return QgsDataSourceUri(publicUri)
 | 
						|
 | 
						|
    def hasSpatialSupport(self):
 | 
						|
        return False
 | 
						|
 | 
						|
    def canAddGeometryColumn(self, table):
 | 
						|
        return self.hasSpatialSupport()
 | 
						|
 | 
						|
    def canAddSpatialIndex(self, table):
 | 
						|
        return self.hasSpatialSupport()
 | 
						|
 | 
						|
    def hasRasterSupport(self):
 | 
						|
        return False
 | 
						|
 | 
						|
    def hasCustomQuerySupport(self):
 | 
						|
        return False
 | 
						|
 | 
						|
    def hasTableColumnEditingSupport(self):
 | 
						|
        return False
 | 
						|
 | 
						|
    def hasCreateSpatialViewSupport(self):
 | 
						|
        return False
 | 
						|
 | 
						|
    def execution_error_types(self):
 | 
						|
        raise Exception("DBConnector.execution_error_types() is an abstract method")
 | 
						|
 | 
						|
    def connection_error_types(self):
 | 
						|
        raise Exception("DBConnector.connection_error_types() is an abstract method")
 | 
						|
 | 
						|
    def error_types(self):
 | 
						|
        return self.connection_error_types() + self.execution_error_types()
 | 
						|
 | 
						|
    def _execute(self, cursor, sql):
 | 
						|
        if cursor is None:
 | 
						|
            cursor = self._get_cursor()
 | 
						|
        try:
 | 
						|
            cursor.execute(sql)
 | 
						|
 | 
						|
        except self.connection_error_types() as e:
 | 
						|
            raise ConnectionError(e)
 | 
						|
 | 
						|
        except self.execution_error_types() as e:
 | 
						|
            # do the rollback to avoid a "current transaction aborted, commands ignored" errors
 | 
						|
            self._rollback()
 | 
						|
            raise DbError(e, sql)
 | 
						|
 | 
						|
        return cursor
 | 
						|
 | 
						|
    def _execute_and_commit(self, sql):
 | 
						|
        """ tries to execute and commit some action, on error it rolls back the change """
 | 
						|
        self._execute(None, sql)
 | 
						|
        self._commit()
 | 
						|
 | 
						|
    def _get_cursor(self, name=None):
 | 
						|
        try:
 | 
						|
            if name is not None:
 | 
						|
                name = str(name).encode('ascii', 'replace').replace('?', "_")
 | 
						|
                self._last_cursor_named_id = 0 if not hasattr(self,
 | 
						|
                                                              '_last_cursor_named_id') else self._last_cursor_named_id + 1
 | 
						|
                return self.connection.cursor("%s_%d" % (name, self._last_cursor_named_id))
 | 
						|
 | 
						|
            return self.connection.cursor()
 | 
						|
 | 
						|
        except self.connection_error_types() as e:
 | 
						|
            raise ConnectionError(e)
 | 
						|
 | 
						|
        except self.execution_error_types() as e:
 | 
						|
            # do the rollback to avoid a "current transaction aborted, commands ignored" errors
 | 
						|
            self._rollback()
 | 
						|
            raise DbError(e)
 | 
						|
 | 
						|
    def _close_cursor(self, c):
 | 
						|
        try:
 | 
						|
            if c and not c.closed:
 | 
						|
                c.close()
 | 
						|
 | 
						|
        except self.error_types():
 | 
						|
            pass
 | 
						|
 | 
						|
        return
 | 
						|
 | 
						|
    def _fetchall(self, c):
 | 
						|
        try:
 | 
						|
            return c.fetchall()
 | 
						|
 | 
						|
        except self.connection_error_types() as e:
 | 
						|
            raise ConnectionError(e)
 | 
						|
 | 
						|
        except self.execution_error_types() as e:
 | 
						|
            # do the rollback to avoid a "current transaction aborted, commands ignored" errors
 | 
						|
            self._rollback()
 | 
						|
            raise DbError(e)
 | 
						|
 | 
						|
    def _fetchone(self, c):
 | 
						|
        try:
 | 
						|
            return c.fetchone()
 | 
						|
 | 
						|
        except self.connection_error_types() as e:
 | 
						|
            raise ConnectionError(e)
 | 
						|
 | 
						|
        except self.execution_error_types() as e:
 | 
						|
            # do the rollback to avoid a "current transaction aborted, commands ignored" errors
 | 
						|
            self._rollback()
 | 
						|
            raise DbError(e)
 | 
						|
 | 
						|
    def _commit(self):
 | 
						|
        try:
 | 
						|
            self.connection.commit()
 | 
						|
 | 
						|
        except self.connection_error_types() as e:
 | 
						|
            raise ConnectionError(e)
 | 
						|
 | 
						|
        except self.execution_error_types() as e:
 | 
						|
            # do the rollback to avoid a "current transaction aborted, commands ignored" errors
 | 
						|
            self._rollback()
 | 
						|
            raise DbError(e)
 | 
						|
 | 
						|
    def _rollback(self):
 | 
						|
        try:
 | 
						|
            self.connection.rollback()
 | 
						|
 | 
						|
        except self.connection_error_types() as e:
 | 
						|
            raise ConnectionError(e)
 | 
						|
 | 
						|
        except self.execution_error_types() as e:
 | 
						|
            # do the rollback to avoid a "current transaction aborted, commands ignored" errors
 | 
						|
            self._rollback()
 | 
						|
            raise DbError(e)
 | 
						|
 | 
						|
    def _get_cursor_columns(self, c):
 | 
						|
        try:
 | 
						|
            if c.description:
 | 
						|
                return [x[0] for x in c.description]
 | 
						|
 | 
						|
        except self.connection_error_types() + self.execution_error_types():
 | 
						|
            return []
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def quoteId(self, identifier):
 | 
						|
        if hasattr(identifier, '__iter__') and not isinstance(identifier, str):
 | 
						|
            ids = list()
 | 
						|
            for i in identifier:
 | 
						|
                if i is None or i == "":
 | 
						|
                    continue
 | 
						|
                ids.append(self.quoteId(i))
 | 
						|
            return u'.'.join(ids)
 | 
						|
 | 
						|
        identifier = str(
 | 
						|
            identifier) if identifier is not None else str()  # make sure it's python unicode string
 | 
						|
        return u'"%s"' % identifier.replace('"', '""')
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def quoteString(self, txt):
 | 
						|
        """ make the string safe - replace ' with '' """
 | 
						|
        if hasattr(txt, '__iter__') and not isinstance(txt, str):
 | 
						|
            txts = list()
 | 
						|
            for i in txt:
 | 
						|
                if i is None:
 | 
						|
                    continue
 | 
						|
                txts.append(self.quoteString(i))
 | 
						|
            return u'.'.join(txts)
 | 
						|
 | 
						|
        txt = str(txt) if txt is not None else str()  # make sure it's python unicode string
 | 
						|
        return u"'%s'" % txt.replace("'", "''")
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def getSchemaTableName(self, table):
 | 
						|
        if not hasattr(table, '__iter__') and not isinstance(table, str):
 | 
						|
            return (None, table)
 | 
						|
        if isinstance(table, str):
 | 
						|
            table = table.split('.')
 | 
						|
        if len(table) < 2:
 | 
						|
            return (None, table[0])
 | 
						|
        else:
 | 
						|
            return (table[0], table[1])
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def getSqlDictionary(self):
 | 
						|
        """ return a generic SQL dictionary """
 | 
						|
        try:
 | 
						|
            from ..sql_dictionary import getSqlDictionary
 | 
						|
 | 
						|
            return getSqlDictionary()
 | 
						|
        except ImportError:
 | 
						|
            return []
 | 
						|
 | 
						|
    def getQueryBuilderDictionary(self):
 | 
						|
        return {}
 |