Add cambalache runtime module

Add CmbProject class with import/export functions

Move tests files to its own directory
This commit is contained in:
Juan Pablo Ugarte 2021-01-14 13:30:38 -03:00
parent 330491169c
commit 797c2bd197
7 changed files with 380 additions and 347 deletions

View File

@ -1,4 +1,7 @@
# cambalache-db
# Cambalache
UI Designer Library
## cambalache-db
Cambalache Data Model generation tool

View File

@ -1,5 +1,5 @@
#
# CambalacheDB - Data Model for Cambalache
# CambalacheDb - Data Model generation tool for Cambalache
#
# Copyright (C) 2020 Juan Pablo Ugarte - All Rights Reserved
#
@ -15,371 +15,154 @@ import gir
from lxml import etree
from lxml.builder import E
class CambalacheDb:
def __init__(self, filename):
# Create DB file
self.conn = sqlite3.connect(filename)
def db_create_history_table(c, table, group_msg=None):
# Create a history table to store data for INSERT and DELETE commands
c.executescript(f'''
CREATE TABLE history_{table} AS SELECT * FROM {table} WHERE 0;
ALTER TABLE history_{table} ADD COLUMN history_id INTERGER REFERENCES history ON DELETE CASCADE;
CREATE INDEX history_{table}_history_id_fk ON history_{table} (history_id);
''')
dirname = os.path.dirname(__file__) or '.'
# Get table columns
columns = None
old_values = None
new_values = None
# Create DB tables
with open(os.path.join(dirname, 'cambalache-db.sql'), 'r') as sql:
c = self.conn.cursor()
c.executescript(sql.read())
self.conn.commit()
sql.close()
all_columns = []
non_pk_columns = []
# Create history tables
self._add_history_for('ui')
self._add_history_for('ui_library')
self._add_history_for('object',
"printf('Delete object %s:%s', OLD.type_id, OLD.name)")
self._add_history_for('object_property')
self._add_history_for('object_child_property')
self._add_history_for('object_signal')
self.conn.commit()
for row in c.execute(f'PRAGMA table_info({table});'):
col = row[1]
col_type = row[2]
pk = row[5]
if columns == None:
columns = col
old_values = 'OLD.' + col
new_values = 'NEW.' + col
else:
columns += ', ' + col
old_values += ', OLD.' + col
new_values += ', NEW.' + col
def populate_from_gir(self, girfile):
lib = gir.GirData(girfile)
lib.populate_db(self.conn)
self.conn.commit()
all_columns.append(col)
if not pk:
non_pk_columns.append(col)
# INSERT Trigger
c.execute(f'''
CREATE TRIGGER on_{table}_insert AFTER INSERT ON {table}
BEGIN
INSERT INTO history (command, data) VALUES ('INSERT', '{table}');
INSERT INTO history_{table} (history_id, {columns})
VALUES (last_insert_rowid(), {new_values});
END;
def _add_history_for(self, table, group_msg=None):
c = self.conn.cursor()
# Create a history table to store data for INSERT and DELETE commands
c.executescript(f'''
CREATE TABLE history_{table} AS SELECT * FROM {table} WHERE 0;
ALTER TABLE history_{table} ADD COLUMN history_id INTERGER REFERENCES history ON DELETE CASCADE;
CREATE INDEX history_{table}_history_id_fk ON history_{table} (history_id);
''')
# DELETE Trigger
if group_msg:
# Get table columns
columns = None
old_values = None
new_values = None
all_columns = []
non_pk_columns = []
for row in c.execute(f'PRAGMA table_info({table});'):
col = row[1]
col_type = row[2]
pk = row[5]
if columns == None:
columns = col
old_values = 'OLD.' + col
new_values = 'NEW.' + col
else:
columns += ', ' + col
old_values += ', OLD.' + col
new_values += ', NEW.' + col
all_columns.append(col)
if not pk:
non_pk_columns.append(col)
# INSERT Trigger
c.execute(f'''
CREATE TRIGGER on_{table}_before_delete BEFORE DELETE ON {table}
BEGIN
INSERT INTO history (command, data) VALUES ('PUSH', {group_msg});
END;
''')
pop = f"INSERT INTO history (command) VALUES ('POP');"
else:
pop = ''
c.execute(f'''
CREATE TRIGGER on_{table}_delete AFTER DELETE ON {table}
BEGIN
INSERT INTO history (command, data) VALUES ('DELETE', '{table}');
INSERT INTO history_{table} (history_id, {columns})
VALUES (last_insert_rowid(), {old_values});
{pop}
END;
''')
last_history_id = "(SELECT seq FROM sqlite_sequence WHERE name='history')"
# UPDATE Trigger for each non PK column
for column in non_pk_columns:
all_but_column = None
all_but_column_values = None
for col in all_columns:
if col != column:
if all_but_column == None:
all_but_column = col
all_but_column_values = 'NEW.' + col
else:
all_but_column += ', ' + col
all_but_column_values += ', NEW.' + col
c.execute(f'''
CREATE TRIGGER on_{table}_update_{column} AFTER UPDATE OF {column} ON {table}
WHEN
(SELECT command, data FROM history WHERE history_id = {last_history_id})
IS NOT ('UPDATE', '{table}')
OR
(SELECT {all_but_column} FROM history_{table} WHERE history_id = {last_history_id})
IS NOT ({all_but_column_values})
BEGIN
INSERT INTO history (command, data) VALUES ('UPDATE', '{table}');
INSERT INTO history_{table} (history_id, {columns})
VALUES (last_insert_rowid(), {new_values});
END;
CREATE TRIGGER on_{table}_insert AFTER INSERT ON {table}
BEGIN
INSERT INTO history (command, data) VALUES ('INSERT', '{table}');
INSERT INTO history_{table} (history_id, {columns})
VALUES (last_insert_rowid(), {new_values});
END;
''')
# DELETE Trigger
if group_msg:
c.execute(f'''
CREATE TRIGGER on_{table}_before_delete BEFORE DELETE ON {table}
BEGIN
INSERT INTO history (command, data) VALUES ('PUSH', {group_msg});
END;
''')
pop = f"INSERT INTO history (command) VALUES ('POP');"
else:
pop = ''
c.execute(f'''
CREATE TRIGGER on_{table}_update_{column}_compress AFTER UPDATE OF {column} ON {table}
WHEN
(SELECT command, data FROM history WHERE history_id = {last_history_id})
IS ('UPDATE', '{table}')
AND
(SELECT {all_but_column} FROM history_{table} WHERE history_id = {last_history_id})
IS ({all_but_column_values})
BEGIN
UPDATE history_{table} SET {column}=NEW.{column} WHERE history_id = {last_history_id};
END;
CREATE TRIGGER on_{table}_delete AFTER DELETE ON {table}
BEGIN
INSERT INTO history (command, data) VALUES ('DELETE', '{table}');
INSERT INTO history_{table} (history_id, {columns})
VALUES (last_insert_rowid(), {old_values});
{pop}
END;
''')
last_history_id = "(SELECT seq FROM sqlite_sequence WHERE name='history')"
def db_create_history_tables(conn):
c = conn.cursor()
db_create_history_table(c, 'ui')
db_create_history_table(c, 'ui_library')
db_create_history_table(c, 'object',
"printf('Delete object %s:%s', OLD.type_id, OLD.name)")
db_create_history_table(c, 'object_property')
db_create_history_table(c, 'object_child_property')
db_create_history_table(c, 'object_signal')
conn.commit()
# UPDATE Trigger for each non PK column
for column in non_pk_columns:
all_but_column = None
all_but_column_values = None
for col in all_columns:
if col != column:
if all_but_column == None:
all_but_column = col
all_but_column_values = 'NEW.' + col
else:
all_but_column += ', ' + col
all_but_column_values += ', NEW.' + col
def row_diff_count(*args):
return len(args)
def db_create(filename):
# Create DB file
conn = sqlite3.connect(filename)
conn.create_function("row_diff_count", -1, row_diff_count, deterministic=True)
# Create DB tables
with open('cambalache-db.sql', 'r') as sql:
c = conn.cursor()
c.executescript(sql.read())
conn.commit()
sql.close()
db_create_history_tables(conn)
return conn
def db_import_object(c, builder_ver, ui_id, node, parent_id):
klass = node.get('class')
name = node.get('id')
# Insert object
c.execute("INSERT INTO object (type_id, name, parent_id, ui_id) VALUES (?, ?, ?, ?);",
(klass, name, parent_id, ui_id))
object_id = c.lastrowid
# Properties
for prop in node.iterfind('property'):
property_id = prop.get('name')
translatable = prop.get('translatable', None)
# Find owner type for property
c.execute("SELECT owner_id FROM property WHERE property_id=? AND owner_id IN (SELECT parent_id FROM type_tree WHERE type_id=? UNION SELECT ?);",
(property_id, klass, klass))
owner_id = c.fetchone()
# Insert property
if owner_id:
c.execute("INSERT INTO object_property (object_id, owner_id, property_id, value, translatable) VALUES (?, ?, ?, ?, ?);",
(object_id, owner_id[0], property_id, prop.text, translatable))
else:
print(f'Could not find owner type for {klass}:{property_id}')
# Signals
for signal in node.iterfind('signal'):
tokens = signal.get('name').split('::')
if len(tokens) > 1:
signal_id = tokens[0]
detail = tokens[1]
else:
signal_id = tokens[0]
detail = None
handler = signal.get('handler')
user_data = signal.get('object')
swap = signal.get('swapped')
after = signal.get('after')
# Find owner type for signal
c.execute("SELECT owner_id FROM signal WHERE signal_id=? AND owner_id IN (SELECT parent_id FROM type_tree WHERE type_id=? UNION SELECT ?);",
(signal_id, klass, klass))
owner_id = c.fetchone()
# Insert signal
c.execute("INSERT INTO object_signal (object_id, owner_id, signal_id, handler, detail, user_data, swap, after) VALUES (?, ?, ?, ?, ?, (SELECT object_id FROM object WHERE name=?), ?, ?);",
(object_id, owner_id[0] if owner_id else None, signal_id, handler, detail, user_data, swap, after))
# Children
for child in node.iterfind('child'):
obj = child.find('object')
if obj is not None:
db_import_object(c, builder_ver, None, obj, object_id)
# Packing properties
if builder_ver == 3:
# Gtk 3, packing props are sibling to <object>
packing = node.getnext()
if packing is not None and packing.tag != 'packing':
packing = None
else:
# Gtk 4, layout props are children of <object>
packing = node.find('layout')
if parent_id and packing is not None:
c.execute("SELECT type_id FROM object WHERE object_id=?;", (parent_id, ))
parent_type = c.fetchone()
if parent_type is None:
return
if builder_ver == 3:
# For Gtk 3 we fake a LayoutChild class for each GtkContainer
owner_id = f'Cambalache{parent_type[0]}LayoutChild'
else:
# FIXME: Need to get layout-manager-type from class
owner_id = f'{parent_type[0]}LayoutChild'
for prop in packing.iterfind('property'):
property_id = prop.get('name')
translatable = prop.get('translatable', None)
c.execute("INSERT INTO object_child_property (object_id, child_id, owner_id, property_id, value, translatable) VALUES (?, ?, ?, ?, ?, ?);",
(parent_id, object_id, owner_id, property_id, prop.text, translatable))
def db_import(conn, filename):
c = conn.cursor()
c.execute("INSERT INTO ui (name, filename) VALUES (?, ?);",
(os.path.basename(filename), filename))
ui_id = c.lastrowid
tree = etree.parse(filename)
root = tree.getroot()
# Requires
builder_ver = 4
for req in root.iterfind('requires'):
lib = req.get('lib')
version = req.get('version')
if lib == 'gtk+':
builder_ver = 3;
c.execute("INSERT INTO ui_library (ui_id, library_id, version) VALUES (last_insert_rowid(), ?, ?);",
(lib, version))
for child in root.iterfind('object'):
db_import_object(c, builder_ver, ui_id, child, None)
conn.commit()
def db_export_ui(conn, ui_id, filename):
def node_set(node, attr, val):
if val is not None:
node.set(attr, str(val))
def get_object(conn, builder_ver, object_id):
c = conn.cursor()
cc = conn.cursor()
obj = E.object()
c.execute('SELECT type_id, name FROM object WHERE object_id=?;', (object_id,))
row = c.fetchone()
node_set(obj, 'class', row[0])
node_set(obj, 'id', row[1])
# Properties
for row in c.execute('SELECT value, property_id FROM object_property WHERE object_id=?;',
(object_id,)):
obj.append(E.property(row[0], name=row[1]))
# Signals
for row in c.execute('SELECT signal_id, handler, detail, (SELECT name FROM object WHERE object_id=user_data), swap, after FROM object_signal WHERE object_id=?;',
(object_id,)):
node = E.signal(name=row[0], handler=row[1])
node_set(node, 'object', row[3])
node_set(node, 'swapped', row[4])
node_set(node, 'after', row[5])
obj.append(node)
# Children
for row in c.execute('SELECT object_id FROM object WHERE parent_id=?;', (object_id,)):
child_id = row[0]
child_obj = get_object(conn, builder_ver, child_id)
child = E.child(child_obj)
# Packing / Layout
layout = E('packing' if builder_ver == 3 else 'layout')
for prop in cc.execute('SELECT value, property_id FROM object_child_property WHERE object_id=? AND child_id=?;',
(object_id, child_id)):
layout.append(E.property(prop[0], name=prop[1]))
if len(layout) > 0:
if builder_ver == 3:
child.append(layout)
else:
child_obj.append(layout)
obj.append(child)
c.execute(f'''
CREATE TRIGGER on_{table}_update_{column} AFTER UPDATE OF {column} ON {table}
WHEN
(SELECT command, data FROM history WHERE history_id = {last_history_id})
IS NOT ('UPDATE', '{table}')
OR
(SELECT {all_but_column} FROM history_{table} WHERE history_id = {last_history_id})
IS NOT ({all_but_column_values})
BEGIN
INSERT INTO history (command, data) VALUES ('UPDATE', '{table}');
INSERT INTO history_{table} (history_id, {columns})
VALUES (last_insert_rowid(), {new_values});
END;
''')
c.execute(f'''
CREATE TRIGGER on_{table}_update_{column}_compress AFTER UPDATE OF {column} ON {table}
WHEN
(SELECT command, data FROM history WHERE history_id = {last_history_id})
IS ('UPDATE', '{table}')
AND
(SELECT {all_but_column} FROM history_{table} WHERE history_id = {last_history_id})
IS ({all_but_column_values})
BEGIN
UPDATE history_{table} SET {column}=NEW.{column} WHERE history_id = {last_history_id};
END;
''')
c.close()
cc.close()
return obj
c = conn.cursor()
node = E.interface()
# requires
builder_ver = 4
for row in c.execute('SELECT library_id, version FROM ui_library WHERE ui_id=?;', (ui_id,)):
node.append(E.requires(lib=row[0], version=row[1]))
if row[0] == 'gtk+':
builder_ver = 3;
# Iterate over toplovel objects
for row in c.execute('SELECT object_id FROM object WHERE ui_id=?;',
(ui_id,)):
child = get_object(conn, builder_ver, row[0])
node.append(child)
c.close()
# Dump xml to file
with open(filename, 'wb') as xml:
xml.write(etree.tostring(node,
pretty_print=True,
xml_declaration=True,
encoding='UTF-8'))
xml.close()
def db_export(conn):
c = conn.cursor()
for row in c.execute('SELECT ui_id, filename FROM ui;'):
db_export_ui(conn, row[0], row[1] + '.test.ui')
c.close()
if __name__ == "__main__":
if len(sys.argv) < 3:
print(f"Ussage: {sys.argv[0]} database.sqlite library.gir")
print(f"Ussage: {sys.argv[0]} library.gir database.sqlite")
exit()
conn = db_create(sys.argv[1])
db = CambalacheDb(sys.argv[2])
db.populate_from_gir(sys.argv[1])
lib = gir.GirData(sys.argv[2])
lib.populate_db(conn)
if len(sys.argv) >= 4:
db_import(conn, sys.argv[3])
conn.commit()
db_export(conn)
conn.close()

View File

@ -204,12 +204,12 @@ END;
CREATE TABLE ui (
ui_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
name TEXT UNIQUE,
filename TEXT UNIQUE,
description TEXT,
copyright TEXT,
authors TEXT,
license_id TEXT REFERENCES license,
filename TEXT,
translation_domain TEXT
);

247
cambalache/cmb_project.py Normal file
View File

@ -0,0 +1,247 @@
#
# CmbProject - Cambalache Project
#
# Copyright (C) 2020 Juan Pablo Ugarte - All Rights Reserved
#
# Unauthorized copying of this file, via any medium is strictly prohibited.
#
import os
import sys
import sqlite3
from lxml import etree
from lxml.builder import E
class CmbProject:
def __init__(self, filename):
self.conn = sqlite3.connect(filename)
self.conn.execute("PRAGMA foreign_keys = ON;")
def history_clear(self):
self.conn.execute("DELETE FROM history;")
self.conn.commit();
def _import_object(self, builder_ver, ui_id, node, parent_id):
c = self.conn.cursor()
klass = node.get('class')
name = node.get('id')
# Insert object
c.execute("INSERT INTO object (type_id, name, parent_id, ui_id) VALUES (?, ?, ?, ?);",
(klass, name, parent_id, ui_id))
object_id = c.lastrowid
# Properties
for prop in node.iterfind('property'):
property_id = prop.get('name')
translatable = prop.get('translatable', None)
# Find owner type for property
c.execute("SELECT owner_id FROM property WHERE property_id=? AND owner_id IN (SELECT parent_id FROM type_tree WHERE type_id=? UNION SELECT ?);",
(property_id, klass, klass))
owner_id = c.fetchone()
# Insert property
if owner_id:
c.execute("INSERT INTO object_property (object_id, owner_id, property_id, value, translatable) VALUES (?, ?, ?, ?, ?);",
(object_id, owner_id[0], property_id, prop.text, translatable))
else:
print(f'Could not find owner type for {klass}:{property_id}')
# Signals
for signal in node.iterfind('signal'):
tokens = signal.get('name').split('::')
if len(tokens) > 1:
signal_id = tokens[0]
detail = tokens[1]
else:
signal_id = tokens[0]
detail = None
handler = signal.get('handler')
user_data = signal.get('object')
swap = signal.get('swapped')
after = signal.get('after')
# Find owner type for signal
c.execute("SELECT owner_id FROM signal WHERE signal_id=? AND owner_id IN (SELECT parent_id FROM type_tree WHERE type_id=? UNION SELECT ?);",
(signal_id, klass, klass))
owner_id = c.fetchone()
# Insert signal
c.execute("INSERT INTO object_signal (object_id, owner_id, signal_id, handler, detail, user_data, swap, after) VALUES (?, ?, ?, ?, ?, (SELECT object_id FROM object WHERE name=?), ?, ?);",
(object_id, owner_id[0] if owner_id else None, signal_id, handler, detail, user_data, swap, after))
# Children
for child in node.iterfind('child'):
obj = child.find('object')
if obj is not None:
self._import_object(builder_ver, None, obj, object_id)
# Packing properties
if builder_ver == 3:
# Gtk 3, packing props are sibling to <object>
packing = node.getnext()
if packing is not None and packing.tag != 'packing':
packing = None
else:
# Gtk 4, layout props are children of <object>
packing = node.find('layout')
if parent_id and packing is not None:
c.execute("SELECT type_id FROM object WHERE object_id=?;", (parent_id, ))
parent_type = c.fetchone()
if parent_type is None:
return
if builder_ver == 3:
# For Gtk 3 we fake a LayoutChild class for each GtkContainer
owner_id = f'Cambalache{parent_type[0]}LayoutChild'
else:
# FIXME: Need to get layout-manager-type from class
owner_id = f'{parent_type[0]}LayoutChild'
for prop in packing.iterfind('property'):
property_id = prop.get('name')
translatable = prop.get('translatable', None)
c.execute("INSERT INTO object_child_property (object_id, child_id, owner_id, property_id, value, translatable) VALUES (?, ?, ?, ?, ?, ?);",
(parent_id, object_id, owner_id, property_id, prop.text, translatable))
c.close()
def import_file(self, filename, overwrite=False):
c = self.conn.cursor()
basename = os.path.basename(filename)
# Remove old UI
if overwrite:
c.execute("DELETE FROM ui WHERE name=?;", (basename, ))
c.execute("INSERT INTO ui (name, filename) VALUES (?, ?);",
(basename, filename))
ui_id = c.lastrowid
tree = etree.parse(filename)
root = tree.getroot()
# Requires
builder_ver = 4
for req in root.iterfind('requires'):
lib = req.get('lib')
version = req.get('version')
if lib == 'gtk+':
builder_ver = 3;
c.execute("INSERT INTO ui_library (ui_id, library_id, version) VALUES (last_insert_rowid(), ?, ?);",
(lib, version))
for child in root.iterfind('object'):
self._import_object(builder_ver, ui_id, child, None)
self.conn.commit()
c.close()
def _get_object(self, builder_ver, object_id):
def node_set(node, attr, val):
if val is not None:
node.set(attr, str(val))
c = self.conn.cursor()
cc = self.conn.cursor()
obj = E.object()
c.execute('SELECT type_id, name FROM object WHERE object_id=?;', (object_id,))
row = c.fetchone()
node_set(obj, 'class', row[0])
node_set(obj, 'id', row[1])
# Properties
for row in c.execute('SELECT value, property_id FROM object_property WHERE object_id=?;',
(object_id,)):
obj.append(E.property(row[0], name=row[1]))
# Signals
for row in c.execute('SELECT signal_id, handler, detail, (SELECT name FROM object WHERE object_id=user_data), swap, after FROM object_signal WHERE object_id=?;',
(object_id,)):
node = E.signal(name=row[0], handler=row[1])
node_set(node, 'object', row[3])
node_set(node, 'swapped', row[4])
node_set(node, 'after', row[5])
obj.append(node)
# Children
for row in c.execute('SELECT object_id FROM object WHERE parent_id=?;', (object_id,)):
child_id = row[0]
child_obj = self._get_object(builder_ver, child_id)
child = E.child(child_obj)
# Packing / Layout
layout = E('packing' if builder_ver == 3 else 'layout')
for prop in cc.execute('SELECT value, property_id FROM object_child_property WHERE object_id=? AND child_id=?;',
(object_id, child_id)):
layout.append(E.property(prop[0], name=prop[1]))
if len(layout) > 0:
if builder_ver == 3:
child.append(layout)
else:
child_obj.append(layout)
obj.append(child)
c.close()
cc.close()
return obj
def _export_ui(self, ui_id, filename):
c = self.conn.cursor()
node = E.interface()
# requires
builder_ver = 4
for row in c.execute('SELECT library_id, version FROM ui_library WHERE ui_id=?;', (ui_id,)):
node.append(E.requires(lib=row[0], version=row[1]))
if row[0] == 'gtk+':
builder_ver = 3;
# Iterate over toplovel objects
for row in c.execute('SELECT object_id FROM object WHERE ui_id=?;',
(ui_id,)):
child = self._get_object(builder_ver, row[0])
node.append(child)
c.close()
# Dump xml to file
with open(filename, 'wb') as xml:
xml.write(etree.tostring(node,
pretty_print=True,
xml_declaration=True,
encoding='UTF-8'))
xml.close()
def export(self):
c = self.conn.cursor()
for row in c.execute('SELECT ui_id, filename FROM ui;'):
self._export_ui(row[0], row[1] + '.test.ui')
c.close()
if __name__ == "__main__":
if len(sys.argv) < 3:
print(f"Ussage: {sys.argv[0]} db.sqlite import.ui")
exit()
project = CmbProject(sys.argv[1])
project.import_file(sys.argv[2], True)
project.history_clear()
project.export()