Some times happens that in my job I have to manage data from many different sources on the fly without the real necessity of building around them a model. While I'm using web2py for some project of mine I thought to develop a plugin to solve this problem in order to have a solution that can be used each time I need in different projects. This is how I did it and this is what you need if you have the same porposes.
The modules (modules/plugin_lookout.py)
Some usefull functions and a custom validator for string that can be used in sql table naming.
#!/usr/bin/env python # coding: utf8 from gluon import * from dal import regex_python_keywords from validators import Validator, translate import re class IS_VALID_SQL_TABLE_NAME(Validator): ''' Checks if the field value is a valid SQL table name. Arguments: db: an instance of the dal class that represent the database that will contain the table name check_reserved: list of adapters to check tablenames and column names against sql reserved keywords. (Default ("common", )) * "common" List of sql keywords that are common to all database types such as "SELECT, INSERT". (recommended) * "all" Checks against all known SQL keywords. (not recommended) <adaptername> Checks against the specific adapters list of keywords (recommended) Examples: #Check if text string is a good sql table name: INPUT(_type="text", _name="name", requires=IS_VALID_SQL_TABLE_NAME(db)) #Check if text string is a good sql table name specific for postgres dialect: INPUT(_type="text", _name="name", requires=IS_VALID_SQL_TABLE_NAME(db, check_reserved=("postgres", ))) >>> IS_VALID_SQL_TABLE_NAME(db)("") ('', 'invalid table name: ') >>> IS_VALID_SQL_TABLE_NAME(db)("foo") ('foo', None) >>> IS_VALID_SQL_TABLE_NAME(db)("test") ('test', 'table/attribute name already defined: test') >>> IS_VALID_SQL_TABLE_NAME(db)("select") ('select', 'invalid table/column name "select" is a "COMMON" reserved SQL keyword') ''' def __init__(self, db, check_reserved=('common', )): self.db = db self.check_reserved = check_reserved if self.check_reserved: from reserved_sql_keywords import ADAPTERS as RSK self.RSK = RSK def __call__(self, value): if re.compile('[^0-9a-zA-Z_]').findall(value): return (value, translate('only [0-9a-zA-Z_] allowed in table and field names, received %s' % value)) elif value.startswith('_') or regex_python_keywords.match(value) or not value: return (value, translate('invalid table name: %s' % value)) elif value.lower() in self.db.tables or hasattr(self.db,value.lower()): return (value, translate('table/attribute name already defined: %s' % value)) elif self.check_reserved: # Validates ``name`` against SQL keywords #+ Uses self.check_reserve which is a list of #+ operators to use. #+ self.check_reserved #+ ['common', 'postgres', 'mysql'] #+ self.check_reserved #+ ['all'] for backend in self.check_reserved: if value.upper() in self.RSK[backend]: return (value, translate('invalid table/column name "%s" is a "%s" reserved SQL keyword' % (value, backend.upper()))) return (value, None) def db_got_table(db, table): ''' db: the database connection where to look for table: the table name to look for ''' db_type = db._uri[:8] msg = '' sql_src = "SELECT 1 FROM %s WHERE 1=2" % table try: db.executesql(sql_src) except db._adapter.driver.OperationalError, error: msg = str(error) if msg == 'no such table: %s' % table: answare = False else: raise db._adapter.driver.OperationalError(msg) else: answare = True return answare, msg
The meta model (models/plugin_lookout.py)
I call it meta model because it's a model for a database that will host information about database connection informations and content.
plugin_lookout_connections
from random import randint db.define_table('plugin_lookout_connections', Field('alias', readable=False, required=True, writable=False), Field('dsn'), # requires=IS_EXPR('value.count("%s")==1') Field('pwd', 'password', readable=False), Field('is_active', 'boolean', default=True), auth.signature.created_by, format = '%(alias)s' # lambda r: r.dsn.replace('%s', randint(8,16)*'*') ) db.plugin_lookout_connections.alias.requires = IS_NOT_IN_DB(db, 'plugin_lookout_connections.alias') f_max = db.plugin_lookout_connections.id.max() id_max = db().select(f_max).first()[f_max] or 0 db.plugin_lookout_connections.alias.default = 'db_%s' % (int(id_max) + 1)
At this point you're able to instantiate your defined db connections in this way
def define_dbs(): plugin_lookout_dbs = Storage() for conn in db(db.plugin_lookout_connections.is_active==True).select(): if conn.dsn.count('%s')==1: dsn = conn.dsn % conn.pwd else: dsn = conn.dsn try: plugin_lookout_dbs[conn.alias] = DAL(dsn) except Exception, error: pass return plugin_lookout_dbs for k,v in define_dbs().items(): if k not in globals().keys() + ['db']: exec('%s=v' % k) del v
plugin_lookout_tables
db.define_table('plugin_lookout_tables', Field('table_name', label=T('Table name'), required=True, ondelete='CASCADE' ), Field('table_migrate', 'boolean', default=False, readable=False, writable=False, label='Migrate', comment=T('Create the table?')), Field('table_singular', label=T('Singular')), Field('table_plural', label=T('Plural')), Field('restricted', 'boolean', default=False), Field('connection', db.plugin_lookout_connections, required=True, requires = IS_IN_DB(db(db.plugin_lookout_connections.created_by==auth.user_id), 'plugin_lookout_connections.id', '%(dsn)s') ), Field('is_active', 'boolean', default=True, label=T('Active'), comment=T('Let the table be recognized from db?')), Field('is_view', 'boolean', label=T('View'), default=False, writable=False, readable=False), auth.signature.created_by, format='%(tab_name)s', singular="Tabella", plural="Tabelle" )
plugin_lookout_fields
field_types = [('string', 'String'), ('integer', 'Integer'), ('double', 'Double'), ('boolean', 'Boolean (i.e. only True or False)'), ('date', 'Date'), ('datetime', 'Date with time'), ('time', 'Time'), ('id', 'Id'), ('geometry', 'Geometry'), ('geography', 'Geography')] db.define_table('plugin_lookout_fields', Field('tables', 'list:reference db.plugin_lookout_tables', label=T('Table name'), required=True, requires=IS_IN_DB(db, 'plugin_lookout_tables.id', '%(table_name)s', multiple=True)), Field('field_name', label=T('Field name'), required=True, requires=IS_MATCH('^[a-z0-9_]*$', error_message='Nome campo non valido.')), Field('field_type', label=T('Field type'), comment=T('default: "string"'), requires=IS_EMPTY_OR(IS_IN_SET(field_types))), Field('field_format', length=25, label=T('Format'), comment = T('Date/time format (Optional. Only for date and datetime field type)')), Field('field_length', 'integer', label=T('Length'), comment=T('Field lenght')), Field('field_label', label=T('Label')), Field('field_comment', label=T('Comment')) )
and now you're able to define tables in the appropriate db connection associated to it more or less in the same way used for db objects before.
def define_tables(fake_migrate=False): try: import ppygis except: geom_representation = lambda value: value else: geom_representation = lambda value: str(ppygis.Geometry.read_ewkb(value)) join = db.plugin_lookout_tables.connection == db.plugin_lookout_connections.id where = (db.plugin_lookout_tables.is_active==True)&(db.plugin_lookout_connections.is_active==True) res_tables = db(join&where).select(db.plugin_lookout_connections.ALL, db.plugin_lookout_tables.ALL) res_fields = db(db.plugin_lookout_fields).select(orderby=db.plugin_lookout_fields.tables) translate = dict( field_type = 'type', field_label = 'label', field_length = 'length', field_comment = 'comment' ) validators = dict( date = IS_DATE, datetime = IS_DATETIME, time = IS_TIME ) for rec_table in res_tables: field_list = list() geoms = dict(geometry=[], geography=[]) for rec_field in res_fields.find(lambda row: rec_table.plugin_lookout_tables.id in row.tables): # n_r_f: Not Required Fields. Ovvero campi non obbligatori n_r_f = [i for i in rec_field.as_dict() if not db.plugin_lookout_fields[i].required] kwargs = dict([(translate[k],rec_field[k]) for k in n_r_f if rec_field[k] not in ('', None, ) and translate.get(k)]) if rec_field.field_type in ('date', 'datetime', 'time', ) and rec_field.field_format: kwargs['requires'] = IS_EMPTY_OR(validators.get(rec_field.field_type)(format=rec_field.field_format)) elif rec_field.field_type in ('geometry', 'geography', ): if not hasattr(Field, 'st_asgeojson'): geoms[rec_field.field_type].append(rec_field.field_name) kwargs['type'] = 'text' kwargs['writable'] = False # if unsupported geometryes are managed as visible only text record kwargs['represent'] = lambda value,row: '%s ...' % geom_representation(value)[:50] field_list.append(Field(rec_field.field_name, **kwargs)) mydb = globals()[rec_table.plugin_lookout_connections.alias] table = mydb.Table(mydb, rec_table.plugin_lookout_tables.table_name, *field_list) if not hasattr(mydb, rec_table.plugin_lookout_tables.table_name.lower()): t_kwargs = dict([(k,rec_table.plugin_lookout_tables['table_%s' %k]) for k in ('singular', 'plural', ) if rec_table.plugin_lookout_tables['table_%s' %k] not in ('', None, )]) mydb.define_table(rec_table.plugin_lookout_tables.table_name, migrate=rec_table.plugin_lookout_tables.table_migrate, fake_migrate=fake_migrate, *field_list, **t_kwargs ) if rec_table.plugin_lookout_tables.table_migrate: for k,v in geoms.items(): for i in v: r = mydb.executesql("select data_type from information_schema.columns where table_name='%s' AND column_name='%s'" % (rec_table.plugin_lookout_tables.tab_name, i)) if r[0][0] != 'text': mydb.executesql('ALTER TABLE %s ALTER COLUMN %s TYPE %s;' % (t.tab_name, i, k)) define_tables()
The controllers (controllers/plugin_lookout.py)
def conn_onvalidation(form): ''' The callback function for the connection form ''' if form.vars.pwd: uri = form.vars.dsn % form.vars.pwd else: uri = form.vars.dsn if db._uri == uri: form.vars.alias = 'db' res = db(db.plugin_lookout_connections.dsn==form.vars.dsn).select() if res: form.vars.alias = res.first().alias @auth.requires_login() def plugin_lookout_connections(): db.plugin_lookout_connections.dsn.represent = lambda v, r: '%s: %s' % (r.alias, v.replace('%s', '<password>')) form = SQLFORM.smartgrid(db.plugin_lookout_connections, onvalidation = conn_onvalidation, linked_tables = [] ) return dict(form=form) def tab_onvalidation(form): ''' The callback function for the tables form ''' # on create from plugin_lookout import db_got_table if 'new' in request.args: db_alias = db(db.plugin_lookout_connections.id==form.vars.connection).select(db.plugin_lookout_connections.alias).first().alias mydb = globals()[db_alias] tab_in_sqldb, msg = db_got_table(mydb, form.vars.table_name) if tab_in_sqldb: form.vars.table_migrate = False else: form.vars.table_migrate = True form.errors.table_name = IS_VALID_SQL_TABLE_NAME( globals()[db(db.plugin_lookout_connections.id==request.vars.connection)\ .select(db.plugin_lookout_connections.alias).first().alias], check_reserved=('common', 'postgres', ) )(form.vars.table_name)[1] @auth.requires_login() def plugin_lookout_tables(): ''' The create/edit tables controllers ''' if 'edit' in request.args: db.plugin_lookout_tables.table_name.writable = False db.plugin_lookout_tables.connection.writable = False db.plugin_lookout_tables.table_name.represent = lambda val,row: A(row.table_name, _href=URL('plugin_lookout_external_tables', vars=dict(id=row.id))) form = SQLFORM.smartgrid(db.plugin_lookout_tables, onvalidation=tab_onvalidation, deletable=False, linked_tables=[] ) return dict(form=form) @auth.requires_login() def plugin_lookout_table_remove(): ''' Controller for removing configured tables ''' message = T('This controller is for table deletion. Warning! Only active table can be deleted.') form = SQLFORM.factory( Field('table_id', label=T('Table name'), comment=T('Choose the table to delete.'), requires=IS_IN_DB(db, 'plugin_lookout_tables.id', '%(table_name)s') ) ) if form.accepts(request, session, formname='form_one'): tab = db(db.plugin_lookout_tables.id==form.vars.table_id).select().first() key = '%s_%s' % (globals()[tab.connection.alias]._uri_hash, tab.table_name) db(db.auth_group.role.contains(key)).delete() for row in db(db.plugin_lookout_fields.tables.contains(form.vars.table_id)).select(): ids = row.tables ids.remove(int(form.vars.table_id)) row.update_record(tables=ids) if tab.table_migrate: try: globals()[tab.connection.alias][tab.table_name].drop() except Exception, error: session.flash = T('Table not removed: %s') % str(error) else: db(db.plugin_lookout_tables.id==form.vars.table_id).delete() elif tab.is_view: db.executesql('DROP VIEW %s CASCADE;' % tab.table_name) db(db.plugin_lookout_tables.id==form.vars.table_id).delete() else: db(db.plugin_lookout_tables.id==form.vars.table_id).delete() unusefull_fields_set = db(db.plugin_lookout_fields.tables==[]) if unusefull_fields_set.count() > 0: unusefull_fields_set.delete() redirect(URL('plugin_lookout_tables')) if request.extension == 'load': return dict(form=form) else: return dict(form=form, message=message) db.plugin_lookout_fields.tables.represent = lambda id,row: CAT(*[CAT(A('%s ' %i.table_name, _href = URL('plugin_lookout_tables', args = ['plugin_lookout_tables', 'view','plugin_lookout_tables', i.id], user_signature = True)), BR() ) for i in db(db.plugin_lookout_tables.id.belongs(id))\ .select(db.plugin_lookout_tables.id, db.plugin_lookout_tables.table_name)]) @auth.requires_login() def plugin_lookout_fields(): ''' The fields controllers ''' form = SQLFORM.smartgrid(db.plugin_lookout_fields, linked_tables=['plugin_lookout_tables'], editable=False, deletable=False) return locals() @auth.requires_login() def plugin_lookout_external_tables(): ''' Controller for manage data inside table that are not part of the model ''' message = 'Here you can see the date inside the tables you have configured' table_id = request.vars.id or session.plugin_lookout_external_tables_id or redirect(URL('plugin_lookout_tables')) session.plugin_lookout_external_tables_id = table_id check_message = IS_IN_DB(db, 'plugin_lookout_tables.id')(table_id)[1] if check_message: session.flash = T('You cannot have access to the table "%s" through this resource. %s' %(table_id, check_message)) redirect(URL('plugin_lookout_tables')) rec_table = db(db.plugin_lookout_tables.id==table_id).select().first() mydb = globals()[rec_table.connection.alias] if rec_table.table_name not in mydb.tables: session.flash = T('Table "%s" is not recognized from db model. maybe it\'s not active') % rec_table.table_name redirect(URL('plugin_lookout_tables')) grid=SQLFORM.smartgrid(mydb[rec_table.table_name], deletable=writable, editable=writable, create=writable) if request.extension == 'load': return dict(grid=grid) else: return dict(grid=grid, message=message)
I reduced this recipe at the minimum concepts not reporting how I decided to manage users permissions. At the moment you can find the whole code hosted at sourceforge at this url: https://sourceforge.net/p/pluginlookout/wiki/Home/ .
Maybe soon it will be moved to github. I will be happy if someone would like to contribute in any way like suggestion, bug reporting and so on. If you want to try it you can download and install the file package.
Once you've done you can try exploring the plugin_lookout menu. You can:
- create your own db connection. If identical to the one of your model it will be recognized and the same db object will be used.
- set up the table you want to explore through this plugin. If already existent you have to know the precise name, otherwise a new table will be created.
- In order to access the data you have to define fields for the table through the dedicated controller.
- to explore table data just follow their linked name in the table controller
For the next futureI'm thinking about usefull implementation such as the possibility to automatically inspect db content such as table names and field feature and some procedure to import data from common file format such as csv or ESRI shape files or a tool for mooving tables between databases (in this way it's possible to think to adopt sqlite db files for sharing data in a more robut way respect to the csv format).
Enjoy
Comments (5)
1
manuele 12 years ago
some new feature added till now:
I hope to receive some comment about other usefull implementation and better coding solutions! ;)
0
manuele 10 years ago
just a test
0
manuele 10 years ago
Note: for an easiest solution you can consider to sobstitute the plugin_lookout_fields table with a json field in plugin_lookout_tables in witch list the table fields configurations.
0
andrew-willimott-10653 12 years ago
Hi Manuele,
Thanks for sharing. Could you please explain what you mean by "manage data from many different sources on the fly". Does that mean you just want to query the other sources, or are you trying to update or alter the other sources ?
I need something similar, just to extract the table definitions and to select the data, but I do not want to alter the other sources.
Thanks,
Andrew
replies (2)
0
manuele 12 years ago
Ok from now you can find the updated code on github at this link: https://github.com/manuelep/plugin_lookout