mirror of
				https://github.com/qgis/QGIS.git
				synced 2025-10-31 00:06:02 -04:00 
			
		
		
		
	
		
			
				
	
	
		
			221 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			221 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| """
 | |
| /***************************************************************************
 | |
| Name                 : QtSqlDB
 | |
| Description          : DB API 2.0 interface for QtSql
 | |
| Date                 : June 6, 2015
 | |
| Copyright            : (C) 2015 by Jürgen E. Fischer
 | |
| email                : jef at norbit dot de
 | |
| 
 | |
|  ***************************************************************************/
 | |
| 
 | |
| /***************************************************************************
 | |
|  *                                                                         *
 | |
|  *   This program is free software; you can redistribute it and/or modify  *
 | |
|  *   it under the terms of the GNU General Public License as published by  *
 | |
|  *   the Free Software Foundation; either version 2 of the License, or     *
 | |
|  *   (at your option) any later version.                                   *
 | |
|  *                                                                         *
 | |
|  ***************************************************************************/
 | |
| """
 | |
| from builtins import range
 | |
| from builtins import object
 | |
| 
 | |
| from qgis.PyQt.QtCore import QVariant, QDate, QTime, QDateTime, QByteArray
 | |
| from qgis.PyQt.QtSql import QSqlDatabase, QSqlQuery, QSqlField
 | |
| 
 | |
| paramstyle = "qmark"
 | |
| threadsafety = 1
 | |
| apilevel = "2.0"
 | |
| 
 | |
| import time
 | |
| import datetime
 | |
| 
 | |
| 
 | |
| def Date(year, month, day):
 | |
|     return datetime.date(year, month, day)
 | |
| 
 | |
| 
 | |
| def Time(hour, minute, second):
 | |
|     return datetime.time(hour, minute, second)
 | |
| 
 | |
| 
 | |
| def Timestamp(year, month, day, hour, minute, second):
 | |
|     return datetime.datetime(year, month, day, hour, minute, second)
 | |
| 
 | |
| 
 | |
| def DateFromTicks(ticks):
 | |
|     return Date(*time.localtime(ticks)[:3])
 | |
| 
 | |
| 
 | |
| def TimeFromTicks(ticks):
 | |
|     return Time(*time.localtime(ticks)[3:6])
 | |
| 
 | |
| 
 | |
| def TimestampFromTicks(ticks):
 | |
|     return Timestamp(*time.localtime(ticks)[:6])
 | |
| 
 | |
| 
 | |
| class ConnectionError(Exception):
 | |
| 
 | |
|     def __init__(self, *args, **kwargs):
 | |
|         super(ConnectionError, self).__init__(*args, **kwargs)
 | |
| 
 | |
| 
 | |
| class ExecError(Exception):
 | |
| 
 | |
|     def __init__(self, *args, **kwargs):
 | |
|         super(ExecError, self).__init__(*args, **kwargs)
 | |
| 
 | |
| 
 | |
| class QtSqlDBCursor(object):
 | |
| 
 | |
|     def __init__(self, conn):
 | |
|         self.qry = QSqlQuery(conn)
 | |
|         self.description = None
 | |
|         self.rowcount = -1
 | |
|         self.arraysize = 1
 | |
| 
 | |
|     def close(self):
 | |
|         self.qry.finish()
 | |
| 
 | |
|     def execute(self, operation, parameters=[]):
 | |
|         if len(parameters) == 0:
 | |
|             if not self.qry.exec_(operation):
 | |
|                 raise ExecError(self.qry.lastError().databaseText())
 | |
|         else:
 | |
|             if not self.qry.prepare(operation):
 | |
|                 raise ExecError(self.qry.lastError().databaseText())
 | |
| 
 | |
|             for i in range(len(parameters)):
 | |
|                 self.qry.bindValue(i, parameters[i])
 | |
| 
 | |
|             if not self.qry.exec_():
 | |
|                 raise ExecError(self.qry.lastError().databaseText())
 | |
| 
 | |
|         self.rowcount = self.qry.size()
 | |
|         self.description = []
 | |
|         for c in range(self.qry.record().count()):
 | |
|             f = self.qry.record().field(c)
 | |
| 
 | |
|             if f.type() == QVariant.Date:
 | |
|                 t = Date
 | |
|             elif f.type() == QVariant.Time:
 | |
|                 t = Time
 | |
|             elif f.type() == QVariant.DateTime:
 | |
|                 t = Timestamp
 | |
|             elif f.type() == QVariant.Double:
 | |
|                 t = float
 | |
|             elif f.type() == QVariant.Int:
 | |
|                 t = int
 | |
|             elif f.type() == QVariant.String:
 | |
|                 t = str
 | |
|             elif f.type() == QVariant.ByteArray:
 | |
|                 t = str
 | |
|             else:
 | |
|                 continue
 | |
| 
 | |
|             self.description.append([
 | |
|                 f.name(),  # name
 | |
|                 t,  # type_code
 | |
|                 f.length(),  # display_size
 | |
|                 f.length(),  # internal_size
 | |
|                 f.precision(),  # precision
 | |
|                 None,  # scale
 | |
|                 f.requiredStatus() != QSqlField.Required  # null_ok
 | |
|             ])
 | |
| 
 | |
|     def executemany(self, operation, seq_of_parameters):
 | |
|         if len(seq_of_parameters) == 0:
 | |
|             return
 | |
| 
 | |
|         if not self.qry.prepare(operation):
 | |
|             raise ExecError(self.qry.lastError().databaseText())
 | |
| 
 | |
|         for r in seq_of_parameters:
 | |
|             for i in range(len(r)):
 | |
|                 self.qry.bindValue(i, r[i])
 | |
| 
 | |
|             if not self.qry.exec_():
 | |
|                 raise ExecError(self.qry.lastError().databaseText())
 | |
| 
 | |
|     def scroll(self, row):
 | |
|         return self.qry.seek(row)
 | |
| 
 | |
|     def fetchone(self):
 | |
|         if not self.qry.next():
 | |
|             return None
 | |
| 
 | |
|         row = []
 | |
|         for i in range(len(self.description)):
 | |
|             value = self.qry.value(i)
 | |
|             if (isinstance(value, QDate) or
 | |
|                     isinstance(value, QTime) or
 | |
|                     isinstance(value, QDateTime)):
 | |
|                 value = value.toString()
 | |
|             elif isinstance(value, QByteArray):
 | |
|                 value = u"GEOMETRY"
 | |
|                 # value = value.toHex()
 | |
| 
 | |
|             row.append(value)
 | |
| 
 | |
|         return row
 | |
| 
 | |
|     def fetchmany(self, size=10):
 | |
|         rows = []
 | |
|         while len(rows) < size:
 | |
|             row = self.fetchone()
 | |
|             if row is None:
 | |
|                 break
 | |
|             rows.append(row)
 | |
| 
 | |
|         return rows
 | |
| 
 | |
|     def fetchall(self):
 | |
|         rows = []
 | |
|         while True:
 | |
|             row = self.fetchone()
 | |
|             if row is None:
 | |
|                 break
 | |
|             rows.append(row)
 | |
| 
 | |
|         return rows
 | |
| 
 | |
|     def setinputsize(self, sizes):
 | |
|         raise ExecError("nyi")
 | |
| 
 | |
|     def setoutputsize(self, size, column=None):
 | |
|         raise ExecError("nyi")
 | |
| 
 | |
| 
 | |
| class QtSqlDBConnection(object):
 | |
|     connections = 0
 | |
| 
 | |
|     def __init__(self, driver, dbname, user, passwd):
 | |
|         self.conn = QSqlDatabase.addDatabase(
 | |
|             driver, "qtsql_%d" % QtSqlDBConnection.connections)
 | |
|         QtSqlDBConnection.connections += 1
 | |
|         self.conn.setDatabaseName(dbname)
 | |
|         self.conn.setUserName(user)
 | |
|         self.conn.setPassword(passwd)
 | |
| 
 | |
|         if not self.conn.open():
 | |
|             raise ConnectionError(self.conn.lastError().databaseText())
 | |
| 
 | |
|     def close(self):
 | |
|         self.conn.close()
 | |
| 
 | |
|     def commit(self):
 | |
|         self.conn.commit()
 | |
| 
 | |
|     def rollback(self):
 | |
|         self.conn.rollback()
 | |
| 
 | |
|     def cursor(self):
 | |
|         return QtSqlDBCursor(self.conn)
 | |
| 
 | |
| 
 | |
| def connect(driver, dbname, user, passwd):
 | |
|     return QtSqlDBConnection(driver, dbname, user, passwd)
 |