dibbler/sqlalchemy/engine/reflection.py

371 lines
13 KiB
Python

"""Provides an abstraction for obtaining database schema information.
Usage Notes:
Here are some general conventions when accessing the low level inspector
methods such as get_table_names, get_columns, etc.
1. Inspector methods return lists of dicts in most cases for the following
reasons:
* They're both standard types that can be serialized.
* Using a dict instead of a tuple allows easy expansion of attributes.
* Using a list for the outer structure maintains order and is easy to work
with (e.g. list comprehension [d['name'] for d in cols]).
2. Records that contain a name, such as the column name in a column record
use the key 'name'. So for most return values, each record will have a
'name' attribute..
"""
import sqlalchemy
from sqlalchemy import exc, sql
from sqlalchemy import util
from sqlalchemy.types import TypeEngine
from sqlalchemy import schema as sa_schema
@util.decorator
def cache(fn, self, con, *args, **kw):
info_cache = kw.get('info_cache', None)
if info_cache is None:
return fn(self, con, *args, **kw)
key = (
fn.__name__,
tuple(a for a in args if isinstance(a, basestring)),
tuple((k, v) for k, v in kw.iteritems() if isinstance(v, (basestring, int, float)))
)
ret = info_cache.get(key)
if ret is None:
ret = fn(self, con, *args, **kw)
info_cache[key] = ret
return ret
class Inspector(object):
"""Performs database schema inspection.
The Inspector acts as a proxy to the dialects' reflection methods and
provides higher level functions for accessing database schema information.
"""
def __init__(self, conn):
"""Initialize the instance.
:param conn: a :class:`~sqlalchemy.engine.base.Connectable`
"""
self.conn = conn
# set the engine
if hasattr(conn, 'engine'):
self.engine = conn.engine
else:
self.engine = conn
self.dialect = self.engine.dialect
self.info_cache = {}
@classmethod
def from_engine(cls, engine):
if hasattr(engine.dialect, 'inspector'):
return engine.dialect.inspector(engine)
return Inspector(engine)
@property
def default_schema_name(self):
return self.dialect.default_schema_name
def get_schema_names(self):
"""Return all schema names.
"""
if hasattr(self.dialect, 'get_schema_names'):
return self.dialect.get_schema_names(self.conn,
info_cache=self.info_cache)
return []
def get_table_names(self, schema=None, order_by=None):
"""Return all table names in `schema`.
:param schema: Optional, retrieve names from a non-default schema.
:param order_by: Optional, may be the string "foreign_key" to sort
the result on foreign key dependencies.
This should probably not return view names or maybe it should return
them with an indicator t or v.
"""
if hasattr(self.dialect, 'get_table_names'):
tnames = self.dialect.get_table_names(self.conn,
schema,
info_cache=self.info_cache)
else:
tnames = self.engine.table_names(schema)
if order_by == 'foreign_key':
ordered_tnames = tnames[:]
# Order based on foreign key dependencies.
for tname in tnames:
table_pos = tnames.index(tname)
fkeys = self.get_foreign_keys(tname, schema)
for fkey in fkeys:
rtable = fkey['referred_table']
if rtable in ordered_tnames:
ref_pos = ordered_tnames.index(rtable)
# Make sure it's lower in the list than anything it
# references.
if table_pos > ref_pos:
ordered_tnames.pop(table_pos) # rtable moves up 1
# insert just below rtable
ordered_tnames.index(ref_pos, tname)
tnames = ordered_tnames
return tnames
def get_table_options(self, table_name, schema=None, **kw):
if hasattr(self.dialect, 'get_table_options'):
return self.dialect.get_table_options(self.conn, table_name, schema,
info_cache=self.info_cache,
**kw)
return {}
def get_view_names(self, schema=None):
"""Return all view names in `schema`.
:param schema: Optional, retrieve names from a non-default schema.
"""
return self.dialect.get_view_names(self.conn, schema,
info_cache=self.info_cache)
def get_view_definition(self, view_name, schema=None):
"""Return definition for `view_name`.
:param schema: Optional, retrieve names from a non-default schema.
"""
return self.dialect.get_view_definition(
self.conn, view_name, schema, info_cache=self.info_cache)
def get_columns(self, table_name, schema=None, **kw):
"""Return information about columns in `table_name`.
Given a string `table_name` and an optional string `schema`, return
column information as a list of dicts with these keys:
name
the column's name
type
:class:`~sqlalchemy.types.TypeEngine`
nullable
boolean
default
the column's default value
attrs
dict containing optional column attributes
"""
col_defs = self.dialect.get_columns(self.conn, table_name, schema,
info_cache=self.info_cache,
**kw)
for col_def in col_defs:
# make this easy and only return instances for coltype
coltype = col_def['type']
if not isinstance(coltype, TypeEngine):
col_def['type'] = coltype()
return col_defs
def get_primary_keys(self, table_name, schema=None, **kw):
"""Return information about primary keys in `table_name`.
Given a string `table_name`, and an optional string `schema`, return
primary key information as a list of column names.
"""
pkeys = self.dialect.get_primary_keys(self.conn, table_name, schema,
info_cache=self.info_cache,
**kw)
return pkeys
def get_foreign_keys(self, table_name, schema=None, **kw):
"""Return information about foreign_keys in `table_name`.
Given a string `table_name`, and an optional string `schema`, return
foreign key information as a list of dicts with these keys:
constrained_columns
a list of column names that make up the foreign key
referred_schema
the name of the referred schema
referred_table
the name of the referred table
referred_columns
a list of column names in the referred table that correspond to
constrained_columns
\**kw
other options passed to the dialect's get_foreign_keys() method.
"""
fk_defs = self.dialect.get_foreign_keys(self.conn, table_name, schema,
info_cache=self.info_cache,
**kw)
return fk_defs
def get_indexes(self, table_name, schema=None, **kw):
"""Return information about indexes in `table_name`.
Given a string `table_name` and an optional string `schema`, return
index information as a list of dicts with these keys:
name
the index's name
column_names
list of column names in order
unique
boolean
\**kw
other options passed to the dialect's get_indexes() method.
"""
indexes = self.dialect.get_indexes(self.conn, table_name,
schema,
info_cache=self.info_cache, **kw)
return indexes
def reflecttable(self, table, include_columns):
dialect = self.conn.dialect
# MySQL dialect does this. Applicable with other dialects?
if hasattr(dialect, '_connection_charset') \
and hasattr(dialect, '_adjust_casing'):
charset = dialect._connection_charset
dialect._adjust_casing(table)
# table attributes we might need.
reflection_options = dict(
(k, table.kwargs.get(k)) for k in dialect.reflection_options if k in table.kwargs)
schema = table.schema
table_name = table.name
# apply table options
tbl_opts = self.get_table_options(table_name, schema, **table.kwargs)
if tbl_opts:
table.kwargs.update(tbl_opts)
# table.kwargs will need to be passed to each reflection method. Make
# sure keywords are strings.
tblkw = table.kwargs.copy()
for (k, v) in tblkw.items():
del tblkw[k]
tblkw[str(k)] = v
# Py2K
if isinstance(schema, str):
schema = schema.decode(dialect.encoding)
if isinstance(table_name, str):
table_name = table_name.decode(dialect.encoding)
# end Py2K
# columns
found_table = False
for col_d in self.get_columns(table_name, schema, **tblkw):
found_table = True
name = col_d['name']
if include_columns and name not in include_columns:
continue
coltype = col_d['type']
col_kw = {
'nullable':col_d['nullable'],
}
if 'autoincrement' in col_d:
col_kw['autoincrement'] = col_d['autoincrement']
if 'quote' in col_d:
col_kw['quote'] = col_d['quote']
colargs = []
if col_d.get('default') is not None:
# the "default" value is assumed to be a literal SQL expression,
# so is wrapped in text() so that no quoting occurs on re-issuance.
colargs.append(sa_schema.DefaultClause(sql.text(col_d['default'])))
if 'sequence' in col_d:
# TODO: mssql, maxdb and sybase are using this.
seq = col_d['sequence']
sequence = sa_schema.Sequence(seq['name'], 1, 1)
if 'start' in seq:
sequence.start = seq['start']
if 'increment' in seq:
sequence.increment = seq['increment']
colargs.append(sequence)
col = sa_schema.Column(name, coltype, *colargs, **col_kw)
table.append_column(col)
if not found_table:
raise exc.NoSuchTableError(table.name)
# Primary keys
primary_key_constraint = sa_schema.PrimaryKeyConstraint(*[
table.c[pk] for pk in self.get_primary_keys(table_name, schema, **tblkw)
if pk in table.c
])
table.append_constraint(primary_key_constraint)
# Foreign keys
fkeys = self.get_foreign_keys(table_name, schema, **tblkw)
for fkey_d in fkeys:
conname = fkey_d['name']
constrained_columns = fkey_d['constrained_columns']
referred_schema = fkey_d['referred_schema']
referred_table = fkey_d['referred_table']
referred_columns = fkey_d['referred_columns']
refspec = []
if referred_schema is not None:
sa_schema.Table(referred_table, table.metadata,
autoload=True, schema=referred_schema,
autoload_with=self.conn,
**reflection_options
)
for column in referred_columns:
refspec.append(".".join(
[referred_schema, referred_table, column]))
else:
sa_schema.Table(referred_table, table.metadata, autoload=True,
autoload_with=self.conn,
**reflection_options
)
for column in referred_columns:
refspec.append(".".join([referred_table, column]))
table.append_constraint(
sa_schema.ForeignKeyConstraint(constrained_columns, refspec,
conname, link_to_name=True))
# Indexes
indexes = self.get_indexes(table_name, schema)
for index_d in indexes:
name = index_d['name']
columns = index_d['column_names']
unique = index_d['unique']
flavor = index_d.get('type', 'unknown type')
if include_columns and \
not set(columns).issubset(include_columns):
util.warn(
"Omitting %s KEY for (%s), key covers omitted columns." %
(flavor, ', '.join(columns)))
continue
sa_schema.Index(name, *[table.columns[c] for c in columns],
**dict(unique=unique))