1230 lines
47 KiB
Python
1230 lines
47 KiB
Python
# strategies.py
|
|
# Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Michael Bayer mike_mp@zzzcomputing.com
|
|
#
|
|
# This module is part of SQLAlchemy and is released under
|
|
# the MIT License: http://www.opensource.org/licenses/mit-license.php
|
|
|
|
"""sqlalchemy.orm.interfaces.LoaderStrategy
|
|
implementations, and related MapperOptions."""
|
|
|
|
from sqlalchemy import exc as sa_exc
|
|
from sqlalchemy import sql, util, log
|
|
from sqlalchemy.sql import util as sql_util
|
|
from sqlalchemy.sql import visitors, expression, operators
|
|
from sqlalchemy.orm import mapper, attributes, interfaces, exc as orm_exc
|
|
from sqlalchemy.orm.interfaces import (
|
|
LoaderStrategy, StrategizedOption, MapperOption, PropertyOption,
|
|
serialize_path, deserialize_path, StrategizedProperty
|
|
)
|
|
from sqlalchemy.orm import session as sessionlib
|
|
from sqlalchemy.orm import util as mapperutil
|
|
import itertools
|
|
|
|
def _register_attribute(strategy, mapper, useobject,
|
|
compare_function=None,
|
|
typecallable=None,
|
|
copy_function=None,
|
|
mutable_scalars=False,
|
|
uselist=False,
|
|
callable_=None,
|
|
proxy_property=None,
|
|
active_history=False,
|
|
impl_class=None,
|
|
**kw
|
|
):
|
|
|
|
prop = strategy.parent_property
|
|
attribute_ext = list(util.to_list(prop.extension, default=[]))
|
|
|
|
if useobject and prop.single_parent:
|
|
attribute_ext.insert(0, _SingleParentValidator(prop))
|
|
|
|
if prop.key in prop.parent._validators:
|
|
attribute_ext.insert(0,
|
|
mapperutil.Validator(prop.key, prop.parent._validators[prop.key])
|
|
)
|
|
|
|
if useobject:
|
|
attribute_ext.append(sessionlib.UOWEventHandler(prop.key))
|
|
|
|
|
|
for m in mapper.polymorphic_iterator():
|
|
if prop is m._props.get(prop.key):
|
|
|
|
attributes.register_attribute_impl(
|
|
m.class_,
|
|
prop.key,
|
|
parent_token=prop,
|
|
mutable_scalars=mutable_scalars,
|
|
uselist=uselist,
|
|
copy_function=copy_function,
|
|
compare_function=compare_function,
|
|
useobject=useobject,
|
|
extension=attribute_ext,
|
|
trackparent=useobject,
|
|
typecallable=typecallable,
|
|
callable_=callable_,
|
|
active_history=active_history,
|
|
impl_class=impl_class,
|
|
**kw
|
|
)
|
|
|
|
class UninstrumentedColumnLoader(LoaderStrategy):
|
|
"""Represent the a non-instrumented MapperProperty.
|
|
|
|
The polymorphic_on argument of mapper() often results in this,
|
|
if the argument is against the with_polymorphic selectable.
|
|
|
|
"""
|
|
def init(self):
|
|
self.columns = self.parent_property.columns
|
|
|
|
def setup_query(self, context, entity, path, adapter,
|
|
column_collection=None, **kwargs):
|
|
for c in self.columns:
|
|
if adapter:
|
|
c = adapter.columns[c]
|
|
column_collection.append(c)
|
|
|
|
def create_row_processor(self, selectcontext, path, mapper, row, adapter):
|
|
return None, None
|
|
|
|
class ColumnLoader(LoaderStrategy):
|
|
"""Strategize the loading of a plain column-based MapperProperty."""
|
|
|
|
def init(self):
|
|
self.columns = self.parent_property.columns
|
|
self.is_composite = hasattr(self.parent_property, 'composite_class')
|
|
|
|
def setup_query(self, context, entity, path, adapter,
|
|
column_collection=None, **kwargs):
|
|
for c in self.columns:
|
|
if adapter:
|
|
c = adapter.columns[c]
|
|
column_collection.append(c)
|
|
|
|
def init_class_attribute(self, mapper):
|
|
self.is_class_level = True
|
|
coltype = self.columns[0].type
|
|
# TODO: check all columns ? check for foreign key as well?
|
|
active_history = self.columns[0].primary_key
|
|
|
|
_register_attribute(self, mapper, useobject=False,
|
|
compare_function=coltype.compare_values,
|
|
copy_function=coltype.copy_value,
|
|
mutable_scalars=self.columns[0].type.is_mutable(),
|
|
active_history = active_history
|
|
)
|
|
|
|
def create_row_processor(self, selectcontext, path, mapper, row, adapter):
|
|
key, col = self.key, self.columns[0]
|
|
if adapter:
|
|
col = adapter.columns[col]
|
|
|
|
if col is not None and col in row:
|
|
def new_execute(state, dict_, row):
|
|
dict_[key] = row[col]
|
|
else:
|
|
def new_execute(state, dict_, row):
|
|
state.expire_attribute_pre_commit(dict_, key)
|
|
return new_execute, None
|
|
|
|
log.class_logger(ColumnLoader)
|
|
|
|
class CompositeColumnLoader(ColumnLoader):
|
|
"""Strategize the loading of a composite column-based MapperProperty."""
|
|
|
|
def init_class_attribute(self, mapper):
|
|
self.is_class_level = True
|
|
self.logger.info("%s register managed composite attribute", self)
|
|
|
|
def copy(obj):
|
|
if obj is None:
|
|
return None
|
|
return self.parent_property.\
|
|
composite_class(*obj.__composite_values__())
|
|
|
|
def compare(a, b):
|
|
if a is None or b is None:
|
|
return a is b
|
|
|
|
for col, aprop, bprop in zip(self.columns,
|
|
a.__composite_values__(),
|
|
b.__composite_values__()):
|
|
if not col.type.compare_values(aprop, bprop):
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
_register_attribute(self, mapper, useobject=False,
|
|
compare_function=compare,
|
|
copy_function=copy,
|
|
mutable_scalars=True
|
|
#active_history ?
|
|
)
|
|
|
|
def create_row_processor(self, selectcontext, path, mapper,
|
|
row, adapter):
|
|
key = self.key
|
|
columns = self.columns
|
|
composite_class = self.parent_property.composite_class
|
|
if adapter:
|
|
columns = [adapter.columns[c] for c in columns]
|
|
|
|
for c in columns:
|
|
if c not in row:
|
|
def new_execute(state, dict_, row):
|
|
state.expire_attribute_pre_commit(dict_, key)
|
|
break
|
|
else:
|
|
def new_execute(state, dict_, row):
|
|
dict_[key] = composite_class(*[row[c] for c in columns])
|
|
|
|
return new_execute, None
|
|
|
|
log.class_logger(CompositeColumnLoader)
|
|
|
|
class DeferredColumnLoader(LoaderStrategy):
|
|
"""Strategize the loading of a deferred column-based MapperProperty."""
|
|
|
|
def create_row_processor(self, selectcontext, path, mapper, row, adapter):
|
|
col = self.columns[0]
|
|
if adapter:
|
|
col = adapter.columns[col]
|
|
|
|
key = self.key
|
|
if col in row:
|
|
return self.parent_property._get_strategy(ColumnLoader).\
|
|
create_row_processor(
|
|
selectcontext, path, mapper, row, adapter)
|
|
|
|
elif not self.is_class_level:
|
|
def new_execute(state, dict_, row):
|
|
state.set_callable(dict_, key, LoadDeferredColumns(state, key))
|
|
else:
|
|
def new_execute(state, dict_, row):
|
|
# reset state on the key so that deferred callables
|
|
# fire off on next access.
|
|
state.reset(dict_, key)
|
|
|
|
return new_execute, None
|
|
|
|
def init(self):
|
|
if hasattr(self.parent_property, 'composite_class'):
|
|
raise NotImplementedError("Deferred loading for composite "
|
|
"types not implemented yet")
|
|
self.columns = self.parent_property.columns
|
|
self.group = self.parent_property.group
|
|
|
|
def init_class_attribute(self, mapper):
|
|
self.is_class_level = True
|
|
|
|
_register_attribute(self, mapper, useobject=False,
|
|
compare_function=self.columns[0].type.compare_values,
|
|
copy_function=self.columns[0].type.copy_value,
|
|
mutable_scalars=self.columns[0].type.is_mutable(),
|
|
callable_=self._class_level_loader,
|
|
expire_missing=False
|
|
)
|
|
|
|
def setup_query(self, context, entity, path, adapter,
|
|
only_load_props=None, **kwargs):
|
|
if (
|
|
self.group is not None and
|
|
context.attributes.get(('undefer', self.group), False)
|
|
) or (only_load_props and self.key in only_load_props):
|
|
self.parent_property._get_strategy(ColumnLoader).\
|
|
setup_query(context, entity,
|
|
path, adapter, **kwargs)
|
|
|
|
def _class_level_loader(self, state):
|
|
if not mapperutil._state_has_identity(state):
|
|
return None
|
|
|
|
return LoadDeferredColumns(state, self.key)
|
|
|
|
|
|
log.class_logger(DeferredColumnLoader)
|
|
|
|
class LoadDeferredColumns(object):
|
|
"""serializable loader object used by DeferredColumnLoader"""
|
|
|
|
def __init__(self, state, key):
|
|
self.state, self.key = state, key
|
|
|
|
def __call__(self, **kw):
|
|
if kw.get('passive') is attributes.PASSIVE_NO_FETCH:
|
|
return attributes.PASSIVE_NO_RESULT
|
|
|
|
state = self.state
|
|
|
|
localparent = mapper._state_mapper(state)
|
|
|
|
prop = localparent.get_property(self.key)
|
|
strategy = prop._get_strategy(DeferredColumnLoader)
|
|
|
|
if strategy.group:
|
|
toload = [
|
|
p.key for p in
|
|
localparent.iterate_properties
|
|
if isinstance(p, StrategizedProperty) and
|
|
isinstance(p.strategy, DeferredColumnLoader) and
|
|
p.group==strategy.group
|
|
]
|
|
else:
|
|
toload = [self.key]
|
|
|
|
# narrow the keys down to just those which have no history
|
|
group = [k for k in toload if k in state.unmodified]
|
|
|
|
if strategy._should_log_debug():
|
|
strategy.logger.debug(
|
|
"deferred load %s group %s",
|
|
(mapperutil.state_attribute_str(state, self.key),
|
|
group and ','.join(group) or 'None')
|
|
)
|
|
|
|
session = sessionlib._state_session(state)
|
|
if session is None:
|
|
raise orm_exc.DetachedInstanceError(
|
|
"Parent instance %s is not bound to a Session; "
|
|
"deferred load operation of attribute '%s' cannot proceed" %
|
|
(mapperutil.state_str(state), self.key)
|
|
)
|
|
|
|
query = session.query(localparent)
|
|
ident = state.key[1]
|
|
query._get(None, ident=ident,
|
|
only_load_props=group, refresh_state=state)
|
|
return attributes.ATTR_WAS_SET
|
|
|
|
class DeferredOption(StrategizedOption):
|
|
propagate_to_loaders = True
|
|
|
|
def __init__(self, key, defer=False):
|
|
super(DeferredOption, self).__init__(key)
|
|
self.defer = defer
|
|
|
|
def get_strategy_class(self):
|
|
if self.defer:
|
|
return DeferredColumnLoader
|
|
else:
|
|
return ColumnLoader
|
|
|
|
class UndeferGroupOption(MapperOption):
|
|
propagate_to_loaders = True
|
|
|
|
def __init__(self, group):
|
|
self.group = group
|
|
|
|
def process_query(self, query):
|
|
query._attributes[('undefer', self.group)] = True
|
|
|
|
class AbstractRelationshipLoader(LoaderStrategy):
|
|
"""LoaderStratgies which deal with related objects."""
|
|
|
|
def init(self):
|
|
self.mapper = self.parent_property.mapper
|
|
self.target = self.parent_property.target
|
|
self.table = self.parent_property.table
|
|
self.uselist = self.parent_property.uselist
|
|
|
|
class NoLoader(AbstractRelationshipLoader):
|
|
"""Strategize a relationship() that doesn't load data automatically."""
|
|
|
|
def init_class_attribute(self, mapper):
|
|
self.is_class_level = True
|
|
|
|
_register_attribute(self, mapper,
|
|
useobject=True,
|
|
uselist=self.parent_property.uselist,
|
|
typecallable = self.parent_property.collection_class,
|
|
)
|
|
|
|
def create_row_processor(self, selectcontext, path, mapper, row, adapter):
|
|
def new_execute(state, dict_, row):
|
|
state.initialize(self.key)
|
|
return new_execute, None
|
|
|
|
log.class_logger(NoLoader)
|
|
|
|
class LazyLoader(AbstractRelationshipLoader):
|
|
"""Strategize a relationship() that loads when first accessed."""
|
|
|
|
def init(self):
|
|
super(LazyLoader, self).init()
|
|
self.__lazywhere, \
|
|
self.__bind_to_col, \
|
|
self._equated_columns = self._create_lazy_clause(self.parent_property)
|
|
|
|
self.logger.info("%s lazy loading clause %s", self, self.__lazywhere)
|
|
|
|
# determine if our "lazywhere" clause is the same as the mapper's
|
|
# get() clause. then we can just use mapper.get()
|
|
#from sqlalchemy.orm import query
|
|
self.use_get = not self.uselist and \
|
|
self.mapper._get_clause[0].compare(
|
|
self.__lazywhere,
|
|
use_proxies=True,
|
|
equivalents=self.mapper._equivalent_columns
|
|
)
|
|
|
|
if self.use_get:
|
|
for col in self._equated_columns.keys():
|
|
if col in self.mapper._equivalent_columns:
|
|
for c in self.mapper._equivalent_columns[col]:
|
|
self._equated_columns[c] = self._equated_columns[col]
|
|
|
|
self.logger.info("%s will use query.get() to "
|
|
"optimize instance loads" % self)
|
|
|
|
def init_class_attribute(self, mapper):
|
|
self.is_class_level = True
|
|
|
|
# MANYTOONE currently only needs the
|
|
# "old" value for delete-orphan
|
|
# cascades. the required _SingleParentValidator
|
|
# will enable active_history
|
|
# in that case. otherwise we don't need the
|
|
# "old" value during backref operations.
|
|
_register_attribute(self,
|
|
mapper,
|
|
useobject=True,
|
|
callable_=self._class_level_loader,
|
|
uselist = self.parent_property.uselist,
|
|
typecallable = self.parent_property.collection_class,
|
|
active_history = \
|
|
self.parent_property.direction is not \
|
|
interfaces.MANYTOONE or \
|
|
not self.use_get,
|
|
)
|
|
|
|
def lazy_clause(self, state, reverse_direction=False,
|
|
alias_secondary=False, adapt_source=None):
|
|
if state is None:
|
|
return self._lazy_none_clause(
|
|
reverse_direction,
|
|
adapt_source=adapt_source)
|
|
|
|
if not reverse_direction:
|
|
criterion, bind_to_col, rev = \
|
|
self.__lazywhere, \
|
|
self.__bind_to_col, \
|
|
self._equated_columns
|
|
else:
|
|
criterion, bind_to_col, rev = \
|
|
LazyLoader._create_lazy_clause(
|
|
self.parent_property,
|
|
reverse_direction=reverse_direction)
|
|
|
|
if reverse_direction:
|
|
mapper = self.parent_property.mapper
|
|
else:
|
|
mapper = self.parent_property.parent
|
|
|
|
def visit_bindparam(bindparam):
|
|
if bindparam.key in bind_to_col:
|
|
# use the "committed" (database) version to get
|
|
# query column values
|
|
# also its a deferred value; so that when used
|
|
# by Query, the committed value is used
|
|
# after an autoflush occurs
|
|
o = state.obj() # strong ref
|
|
bindparam.value = \
|
|
lambda: mapper._get_committed_attr_by_column(
|
|
o, bind_to_col[bindparam.key])
|
|
|
|
if self.parent_property.secondary is not None and alias_secondary:
|
|
criterion = sql_util.ClauseAdapter(
|
|
self.parent_property.secondary.alias()).\
|
|
traverse(criterion)
|
|
|
|
criterion = visitors.cloned_traverse(
|
|
criterion, {}, {'bindparam':visit_bindparam})
|
|
if adapt_source:
|
|
criterion = adapt_source(criterion)
|
|
return criterion
|
|
|
|
def _lazy_none_clause(self, reverse_direction=False, adapt_source=None):
|
|
if not reverse_direction:
|
|
criterion, bind_to_col, rev = \
|
|
self.__lazywhere, \
|
|
self.__bind_to_col,\
|
|
self._equated_columns
|
|
else:
|
|
criterion, bind_to_col, rev = \
|
|
LazyLoader._create_lazy_clause(
|
|
self.parent_property,
|
|
reverse_direction=reverse_direction)
|
|
|
|
criterion = sql_util.adapt_criterion_to_null(criterion, bind_to_col)
|
|
|
|
if adapt_source:
|
|
criterion = adapt_source(criterion)
|
|
return criterion
|
|
|
|
def _class_level_loader(self, state):
|
|
if not mapperutil._state_has_identity(state):
|
|
return None
|
|
|
|
return LoadLazyAttribute(state, self.key)
|
|
|
|
def create_row_processor(self, selectcontext, path, mapper, row, adapter):
|
|
key = self.key
|
|
if not self.is_class_level:
|
|
def new_execute(state, dict_, row):
|
|
# we are not the primary manager for this attribute
|
|
# on this class - set up a
|
|
# per-instance lazyloader, which will override the
|
|
# class-level behavior.
|
|
# this currently only happens when using a
|
|
# "lazyload" option on a "no load"
|
|
# attribute - "eager" attributes always have a
|
|
# class-level lazyloader installed.
|
|
state.set_callable(dict_, key, LoadLazyAttribute(state, key))
|
|
else:
|
|
def new_execute(state, dict_, row):
|
|
# we are the primary manager for this attribute on
|
|
# this class - reset its
|
|
# per-instance attribute state, so that the class-level
|
|
# lazy loader is
|
|
# executed when next referenced on this instance.
|
|
# this is needed in
|
|
# populate_existing() types of scenarios to reset
|
|
# any existing state.
|
|
state.reset(dict_, key)
|
|
|
|
return new_execute, None
|
|
|
|
@classmethod
|
|
def _create_lazy_clause(cls, prop, reverse_direction=False):
|
|
binds = util.column_dict()
|
|
lookup = util.column_dict()
|
|
equated_columns = util.column_dict()
|
|
|
|
if reverse_direction and prop.secondaryjoin is None:
|
|
for l, r in prop.local_remote_pairs:
|
|
_list = lookup.setdefault(r, [])
|
|
_list.append((r, l))
|
|
equated_columns[l] = r
|
|
else:
|
|
for l, r in prop.local_remote_pairs:
|
|
_list = lookup.setdefault(l, [])
|
|
_list.append((l, r))
|
|
equated_columns[r] = l
|
|
|
|
def col_to_bind(col):
|
|
if col in lookup:
|
|
for tobind, equated in lookup[col]:
|
|
if equated in binds:
|
|
return None
|
|
if col not in binds:
|
|
binds[col] = sql.bindparam(None, None, type_=col.type)
|
|
return binds[col]
|
|
return None
|
|
|
|
lazywhere = prop.primaryjoin
|
|
|
|
if prop.secondaryjoin is None or not reverse_direction:
|
|
lazywhere = visitors.replacement_traverse(
|
|
lazywhere, {}, col_to_bind)
|
|
|
|
if prop.secondaryjoin is not None:
|
|
secondaryjoin = prop.secondaryjoin
|
|
if reverse_direction:
|
|
secondaryjoin = visitors.replacement_traverse(
|
|
secondaryjoin, {}, col_to_bind)
|
|
lazywhere = sql.and_(lazywhere, secondaryjoin)
|
|
|
|
bind_to_col = dict((binds[col].key, col) for col in binds)
|
|
|
|
return lazywhere, bind_to_col, equated_columns
|
|
|
|
log.class_logger(LazyLoader)
|
|
|
|
class LoadLazyAttribute(object):
|
|
"""serializable loader object used by LazyLoader"""
|
|
|
|
def __init__(self, state, key):
|
|
self.state, self.key = state, key
|
|
|
|
def __getstate__(self):
|
|
return (self.state, self.key)
|
|
|
|
def __setstate__(self, state):
|
|
self.state, self.key = state
|
|
|
|
def __call__(self, **kw):
|
|
state = self.state
|
|
instance_mapper = mapper._state_mapper(state)
|
|
prop = instance_mapper.get_property(self.key)
|
|
strategy = prop._get_strategy(LazyLoader)
|
|
|
|
if kw.get('passive') is attributes.PASSIVE_NO_FETCH and \
|
|
not strategy.use_get:
|
|
return attributes.PASSIVE_NO_RESULT
|
|
|
|
if strategy._should_log_debug():
|
|
strategy.logger.debug("loading %s",
|
|
mapperutil.state_attribute_str(
|
|
state, self.key))
|
|
|
|
session = sessionlib._state_session(state)
|
|
if session is None:
|
|
raise orm_exc.DetachedInstanceError(
|
|
"Parent instance %s is not bound to a Session; "
|
|
"lazy load operation of attribute '%s' cannot proceed" %
|
|
(mapperutil.state_str(state), self.key)
|
|
)
|
|
|
|
q = session.query(prop.mapper)._adapt_all_clauses()
|
|
|
|
if state.load_path:
|
|
q = q._with_current_path(state.load_path + (self.key,))
|
|
|
|
# if we have a simple primary key load, use mapper.get()
|
|
# to possibly save a DB round trip
|
|
if strategy.use_get:
|
|
ident = []
|
|
allnulls = True
|
|
for primary_key in prop.mapper.primary_key:
|
|
val = instance_mapper.\
|
|
_get_committed_state_attr_by_column(
|
|
state,
|
|
strategy._equated_columns[primary_key],
|
|
**kw)
|
|
if val is attributes.PASSIVE_NO_RESULT:
|
|
return val
|
|
allnulls = allnulls and val is None
|
|
ident.append(val)
|
|
|
|
if allnulls:
|
|
return None
|
|
|
|
if state.load_options:
|
|
q = q._conditional_options(*state.load_options)
|
|
|
|
key = prop.mapper.identity_key_from_primary_key(ident)
|
|
return q._get(key, ident, **kw)
|
|
|
|
|
|
if prop.order_by:
|
|
q = q.order_by(*util.to_list(prop.order_by))
|
|
|
|
if state.load_options:
|
|
q = q._conditional_options(*state.load_options)
|
|
q = q.filter(strategy.lazy_clause(state))
|
|
|
|
result = q.all()
|
|
if strategy.uselist:
|
|
return result
|
|
else:
|
|
l = len(result)
|
|
if l:
|
|
if l > 1:
|
|
util.warn(
|
|
"Multiple rows returned with "
|
|
"uselist=False for lazily-loaded attribute '%s' "
|
|
% prop)
|
|
|
|
return result[0]
|
|
else:
|
|
return None
|
|
|
|
class SubqueryLoader(AbstractRelationshipLoader):
|
|
def init(self):
|
|
super(SubqueryLoader, self).init()
|
|
self.join_depth = self.parent_property.join_depth
|
|
|
|
def init_class_attribute(self, mapper):
|
|
self.parent_property.\
|
|
_get_strategy(LazyLoader).\
|
|
init_class_attribute(mapper)
|
|
|
|
def setup_query(self, context, entity,
|
|
path, adapter, column_collection=None,
|
|
parentmapper=None, **kwargs):
|
|
|
|
if not context.query._enable_eagerloads:
|
|
return
|
|
|
|
path = path + (self.key, )
|
|
|
|
# build up a path indicating the path from the leftmost
|
|
# entity to the thing we're subquery loading.
|
|
subq_path = context.attributes.get(('subquery_path', None), ())
|
|
|
|
subq_path = subq_path + path
|
|
|
|
reduced_path = interfaces._reduce_path(path)
|
|
|
|
# join-depth / recursion check
|
|
if ("loaderstrategy", reduced_path) not in context.attributes:
|
|
if self.join_depth:
|
|
if len(path) / 2 > self.join_depth:
|
|
return
|
|
else:
|
|
if self.mapper.base_mapper in interfaces._reduce_path(subq_path):
|
|
return
|
|
|
|
orig_query = context.attributes.get(
|
|
("orig_query", SubqueryLoader),
|
|
context.query)
|
|
|
|
# determine attributes of the leftmost mapper
|
|
if self.parent.isa(subq_path[0]) and self.key==subq_path[1]:
|
|
leftmost_mapper, leftmost_prop = \
|
|
self.parent, self.parent_property
|
|
else:
|
|
leftmost_mapper, leftmost_prop = \
|
|
subq_path[0], \
|
|
subq_path[0].get_property(subq_path[1])
|
|
leftmost_cols, remote_cols = self._local_remote_columns(leftmost_prop)
|
|
|
|
leftmost_attr = [
|
|
leftmost_mapper._get_col_to_prop(c).class_attribute
|
|
for c in leftmost_cols
|
|
]
|
|
|
|
# reformat the original query
|
|
# to look only for significant columns
|
|
q = orig_query._clone()
|
|
# TODO: why does polymporphic etc. require hardcoding
|
|
# into _adapt_col_list ? Does query.add_columns(...) work
|
|
# with polymorphic loading ?
|
|
q._set_entities(q._adapt_col_list(leftmost_attr))
|
|
|
|
# don't need ORDER BY if no limit/offset
|
|
if q._limit is None and q._offset is None:
|
|
q._order_by = None
|
|
|
|
# the original query now becomes a subquery
|
|
# which we'll join onto.
|
|
embed_q = q.with_labels().subquery()
|
|
left_alias = mapperutil.AliasedClass(leftmost_mapper, embed_q)
|
|
|
|
# q becomes a new query. basically doing a longhand
|
|
# "from_self()". (from_self() itself not quite industrial
|
|
# strength enough for all contingencies...but very close)
|
|
|
|
q = q.session.query(self.mapper)
|
|
q._attributes = {
|
|
("orig_query", SubqueryLoader): orig_query,
|
|
('subquery_path', None) : subq_path
|
|
}
|
|
|
|
# figure out what's being joined. a.k.a. the fun part
|
|
to_join = [
|
|
(subq_path[i], subq_path[i+1])
|
|
for i in xrange(0, len(subq_path), 2)
|
|
]
|
|
|
|
if len(to_join) < 2:
|
|
parent_alias = left_alias
|
|
else:
|
|
parent_alias = mapperutil.AliasedClass(self.parent)
|
|
|
|
local_cols, remote_cols = \
|
|
self._local_remote_columns(self.parent_property)
|
|
|
|
local_attr = [
|
|
getattr(parent_alias, self.parent._get_col_to_prop(c).key)
|
|
for c in local_cols
|
|
]
|
|
q = q.order_by(*local_attr)
|
|
q = q.add_columns(*local_attr)
|
|
|
|
for i, (mapper, key) in enumerate(to_join):
|
|
|
|
# we need to use query.join() as opposed to
|
|
# orm.join() here because of the
|
|
# rich behavior it brings when dealing with
|
|
# "with_polymorphic" mappers. "aliased"
|
|
# and "from_joinpoint" take care of most of
|
|
# the chaining and aliasing for us.
|
|
|
|
first = i == 0
|
|
middle = i < len(to_join) - 1
|
|
second_to_last = i == len(to_join) - 2
|
|
|
|
if first:
|
|
attr = getattr(left_alias, key)
|
|
else:
|
|
attr = key
|
|
|
|
if second_to_last:
|
|
q = q.join((parent_alias, attr), from_joinpoint=True)
|
|
else:
|
|
q = q.join(attr, aliased=middle, from_joinpoint=True)
|
|
|
|
# propagate loader options etc. to the new query.
|
|
# these will fire relative to subq_path.
|
|
q = q._with_current_path(subq_path)
|
|
q = q._conditional_options(*orig_query._with_options)
|
|
|
|
if self.parent_property.order_by:
|
|
# if there's an ORDER BY, alias it the same
|
|
# way joinedloader does, but we have to pull out
|
|
# the "eagerjoin" from the query.
|
|
# this really only picks up the "secondary" table
|
|
# right now.
|
|
eagerjoin = q._from_obj[0]
|
|
eager_order_by = \
|
|
eagerjoin._target_adapter.\
|
|
copy_and_process(
|
|
util.to_list(
|
|
self.parent_property.order_by
|
|
)
|
|
)
|
|
q = q.order_by(*eager_order_by)
|
|
|
|
# add new query to attributes to be picked up
|
|
# by create_row_processor
|
|
context.attributes[('subquery', reduced_path)] = q
|
|
|
|
def _local_remote_columns(self, prop):
|
|
if prop.secondary is None:
|
|
return zip(*prop.local_remote_pairs)
|
|
else:
|
|
return \
|
|
[p[0] for p in prop.synchronize_pairs],\
|
|
[
|
|
p[0] for p in prop.
|
|
secondary_synchronize_pairs
|
|
]
|
|
|
|
def create_row_processor(self, context, path, mapper, row, adapter):
|
|
path = path + (self.key,)
|
|
|
|
path = interfaces._reduce_path(path)
|
|
|
|
if ('subquery', path) not in context.attributes:
|
|
return None, None
|
|
|
|
local_cols, remote_cols = self._local_remote_columns(self.parent_property)
|
|
|
|
remote_attr = [
|
|
self.mapper._get_col_to_prop(c).key
|
|
for c in remote_cols]
|
|
|
|
q = context.attributes[('subquery', path)]
|
|
|
|
collections = dict(
|
|
(k, [v[0] for v in v])
|
|
for k, v in itertools.groupby(
|
|
q,
|
|
lambda x:x[1:]
|
|
))
|
|
|
|
if adapter:
|
|
local_cols = [adapter.columns[c] for c in local_cols]
|
|
|
|
if self.uselist:
|
|
def execute(state, dict_, row):
|
|
collection = collections.get(
|
|
tuple([row[col] for col in local_cols]),
|
|
()
|
|
)
|
|
state.get_impl(self.key).\
|
|
set_committed_value(state, dict_, collection)
|
|
else:
|
|
def execute(state, dict_, row):
|
|
collection = collections.get(
|
|
tuple([row[col] for col in local_cols]),
|
|
(None,)
|
|
)
|
|
if len(collection) > 1:
|
|
util.warn(
|
|
"Multiple rows returned with "
|
|
"uselist=False for eagerly-loaded attribute '%s' "
|
|
% self)
|
|
|
|
scalar = collection[0]
|
|
state.get_impl(self.key).\
|
|
set_committed_value(state, dict_, scalar)
|
|
|
|
return execute, None
|
|
|
|
log.class_logger(SubqueryLoader)
|
|
|
|
class EagerLoader(AbstractRelationshipLoader):
|
|
"""Strategize a relationship() that loads within the process
|
|
of the parent object being selected."""
|
|
|
|
def init(self):
|
|
super(EagerLoader, self).init()
|
|
self.join_depth = self.parent_property.join_depth
|
|
|
|
def init_class_attribute(self, mapper):
|
|
self.parent_property.\
|
|
_get_strategy(LazyLoader).init_class_attribute(mapper)
|
|
|
|
def setup_query(self, context, entity, path, adapter, \
|
|
column_collection=None, parentmapper=None,
|
|
**kwargs):
|
|
"""Add a left outer join to the statement thats being constructed."""
|
|
|
|
if not context.query._enable_eagerloads:
|
|
return
|
|
|
|
path = path + (self.key,)
|
|
|
|
reduced_path = interfaces._reduce_path(path)
|
|
|
|
# check for user-defined eager alias
|
|
if ("user_defined_eager_row_processor", reduced_path) in\
|
|
context.attributes:
|
|
clauses = context.attributes[
|
|
("user_defined_eager_row_processor",
|
|
reduced_path)]
|
|
|
|
adapter = entity._get_entity_clauses(context.query, context)
|
|
if adapter and clauses:
|
|
context.attributes[
|
|
("user_defined_eager_row_processor",
|
|
reduced_path)] = clauses = clauses.wrap(adapter)
|
|
elif adapter:
|
|
context.attributes[
|
|
("user_defined_eager_row_processor",
|
|
reduced_path)] = clauses = adapter
|
|
|
|
add_to_collection = context.primary_columns
|
|
|
|
else:
|
|
# check for join_depth or basic recursion,
|
|
# if the current path was not explicitly stated as
|
|
# a desired "loaderstrategy" (i.e. via query.options())
|
|
if ("loaderstrategy", reduced_path) not in context.attributes:
|
|
if self.join_depth:
|
|
if len(path) / 2 > self.join_depth:
|
|
return
|
|
else:
|
|
if self.mapper.base_mapper in reduced_path:
|
|
return
|
|
|
|
clauses = mapperutil.ORMAdapter(
|
|
mapperutil.AliasedClass(self.mapper),
|
|
equivalents=self.mapper._equivalent_columns,
|
|
adapt_required=True)
|
|
|
|
if self.parent_property.direction != interfaces.MANYTOONE:
|
|
context.multi_row_eager_loaders = True
|
|
|
|
context.create_eager_joins.append(
|
|
(self._create_eager_join, context,
|
|
entity, path, adapter,
|
|
parentmapper, clauses)
|
|
)
|
|
|
|
add_to_collection = context.secondary_columns
|
|
context.attributes[
|
|
("eager_row_processor", reduced_path)
|
|
] = clauses
|
|
|
|
for value in self.mapper._iterate_polymorphic_properties():
|
|
value.setup(
|
|
context,
|
|
entity,
|
|
path + (self.mapper,),
|
|
clauses,
|
|
parentmapper=self.mapper,
|
|
column_collection=add_to_collection)
|
|
|
|
def _create_eager_join(self, context, entity,
|
|
path, adapter, parentmapper, clauses):
|
|
|
|
if parentmapper is None:
|
|
localparent = entity.mapper
|
|
else:
|
|
localparent = parentmapper
|
|
|
|
# whether or not the Query will wrap the selectable in a subquery,
|
|
# and then attach eager load joins to that (i.e., in the case of
|
|
# LIMIT/OFFSET etc.)
|
|
should_nest_selectable = context.multi_row_eager_loaders and \
|
|
context.query._should_nest_selectable
|
|
|
|
entity_key = None
|
|
if entity not in context.eager_joins and \
|
|
not should_nest_selectable and \
|
|
context.from_clause:
|
|
index, clause = \
|
|
sql_util.find_join_source(
|
|
context.from_clause, entity.selectable)
|
|
if clause is not None:
|
|
# join to an existing FROM clause on the query.
|
|
# key it to its list index in the eager_joins dict.
|
|
# Query._compile_context will adapt as needed and
|
|
# append to the FROM clause of the select().
|
|
entity_key, default_towrap = index, clause
|
|
|
|
if entity_key is None:
|
|
entity_key, default_towrap = entity, entity.selectable
|
|
|
|
towrap = context.eager_joins.setdefault(entity_key, default_towrap)
|
|
|
|
join_to_left = False
|
|
if adapter:
|
|
if getattr(adapter, 'aliased_class', None):
|
|
onclause = getattr(
|
|
adapter.aliased_class, self.key,
|
|
self.parent_property)
|
|
else:
|
|
onclause = getattr(
|
|
mapperutil.AliasedClass(
|
|
self.parent,
|
|
adapter.selectable
|
|
),
|
|
self.key, self.parent_property
|
|
)
|
|
|
|
if onclause is self.parent_property:
|
|
# TODO: this is a temporary hack to
|
|
# account for polymorphic eager loads where
|
|
# the eagerload is referencing via of_type().
|
|
join_to_left = True
|
|
else:
|
|
onclause = self.parent_property
|
|
|
|
innerjoin = context.attributes.get(
|
|
("eager_join_type", path),
|
|
self.parent_property.innerjoin)
|
|
|
|
context.eager_joins[entity_key] = eagerjoin = \
|
|
mapperutil.join(
|
|
towrap,
|
|
clauses.aliased_class,
|
|
onclause,
|
|
join_to_left=join_to_left,
|
|
isouter=not innerjoin
|
|
)
|
|
|
|
# send a hint to the Query as to where it may "splice" this join
|
|
eagerjoin.stop_on = entity.selectable
|
|
|
|
if self.parent_property.secondary is None and \
|
|
not parentmapper:
|
|
# for parentclause that is the non-eager end of the join,
|
|
# ensure all the parent cols in the primaryjoin are actually
|
|
# in the
|
|
# columns clause (i.e. are not deferred), so that aliasing applied
|
|
# by the Query propagates those columns outward.
|
|
# This has the effect
|
|
# of "undefering" those columns.
|
|
for col in sql_util.find_columns(
|
|
self.parent_property.primaryjoin):
|
|
if localparent.mapped_table.c.contains_column(col):
|
|
if adapter:
|
|
col = adapter.columns[col]
|
|
context.primary_columns.append(col)
|
|
|
|
if self.parent_property.order_by:
|
|
context.eager_order_by += \
|
|
eagerjoin._target_adapter.\
|
|
copy_and_process(
|
|
util.to_list(
|
|
self.parent_property.order_by
|
|
)
|
|
)
|
|
|
|
|
|
def _create_eager_adapter(self, context, row, adapter, path):
|
|
reduced_path = interfaces._reduce_path(path)
|
|
if ("user_defined_eager_row_processor", reduced_path) in \
|
|
context.attributes:
|
|
decorator = context.attributes[
|
|
("user_defined_eager_row_processor",
|
|
reduced_path)]
|
|
# user defined eagerloads are part of the "primary"
|
|
# portion of the load.
|
|
# the adapters applied to the Query should be honored.
|
|
if context.adapter and decorator:
|
|
decorator = decorator.wrap(context.adapter)
|
|
elif context.adapter:
|
|
decorator = context.adapter
|
|
elif ("eager_row_processor", reduced_path) in context.attributes:
|
|
decorator = context.attributes[
|
|
("eager_row_processor", reduced_path)]
|
|
else:
|
|
return False
|
|
|
|
try:
|
|
identity_key = self.mapper.identity_key_from_row(row, decorator)
|
|
return decorator
|
|
except KeyError, k:
|
|
# no identity key - dont return a row
|
|
# processor, will cause a degrade to lazy
|
|
return False
|
|
|
|
def create_row_processor(self, context, path, mapper, row, adapter):
|
|
path = path + (self.key,)
|
|
|
|
eager_adapter = self._create_eager_adapter(
|
|
context,
|
|
row,
|
|
adapter, path)
|
|
|
|
if eager_adapter is not False:
|
|
key = self.key
|
|
_instance = self.mapper._instance_processor(
|
|
context,
|
|
path + (self.mapper,),
|
|
eager_adapter)
|
|
|
|
if not self.uselist:
|
|
def new_execute(state, dict_, row):
|
|
# set a scalar object instance directly on the parent
|
|
# object, bypassing InstrumentedAttribute event handlers.
|
|
dict_[key] = _instance(row, None)
|
|
|
|
def existing_execute(state, dict_, row):
|
|
# call _instance on the row, even though the object has
|
|
# been created, so that we further descend into properties
|
|
existing = _instance(row, None)
|
|
if existing is not None \
|
|
and key in dict_ \
|
|
and existing is not dict_[key]:
|
|
util.warn(
|
|
"Multiple rows returned with "
|
|
"uselist=False for eagerly-loaded attribute '%s' "
|
|
% self)
|
|
return new_execute, existing_execute
|
|
else:
|
|
def new_execute(state, dict_, row):
|
|
collection = attributes.init_state_collection(
|
|
state, dict_, key)
|
|
result_list = util.UniqueAppender(collection,
|
|
'append_without_event')
|
|
context.attributes[(state, key)] = result_list
|
|
_instance(row, result_list)
|
|
|
|
def existing_execute(state, dict_, row):
|
|
if (state, key) in context.attributes:
|
|
result_list = context.attributes[(state, key)]
|
|
else:
|
|
# appender_key can be absent from context.attributes
|
|
# with isnew=False when self-referential eager loading
|
|
# is used; the same instance may be present in two
|
|
# distinct sets of result columns
|
|
collection = attributes.init_state_collection(state,
|
|
dict_, key)
|
|
result_list = util.UniqueAppender(
|
|
collection,
|
|
'append_without_event')
|
|
context.attributes[(state, key)] = result_list
|
|
_instance(row, result_list)
|
|
return new_execute, existing_execute
|
|
else:
|
|
return self.parent_property.\
|
|
_get_strategy(LazyLoader).\
|
|
create_row_processor(
|
|
context, path,
|
|
mapper, row, adapter)
|
|
|
|
log.class_logger(EagerLoader)
|
|
|
|
class EagerLazyOption(StrategizedOption):
|
|
def __init__(self, key, lazy=True, chained=False,
|
|
propagate_to_loaders=True
|
|
):
|
|
super(EagerLazyOption, self).__init__(key)
|
|
self.lazy = lazy
|
|
self.chained = chained
|
|
self.propagate_to_loaders = propagate_to_loaders
|
|
self.strategy_cls = factory(lazy)
|
|
|
|
@property
|
|
def is_eager(self):
|
|
return self.lazy in (False, 'joined', 'subquery')
|
|
|
|
@property
|
|
def is_chained(self):
|
|
return self.is_eager and self.chained
|
|
|
|
def get_strategy_class(self):
|
|
return self.strategy_cls
|
|
|
|
def factory(identifier):
|
|
if identifier is False or identifier == 'joined':
|
|
return EagerLoader
|
|
elif identifier is None or identifier == 'noload':
|
|
return NoLoader
|
|
elif identifier is False or identifier == 'select':
|
|
return LazyLoader
|
|
elif identifier == 'subquery':
|
|
return SubqueryLoader
|
|
else:
|
|
return LazyLoader
|
|
|
|
|
|
|
|
class EagerJoinOption(PropertyOption):
|
|
|
|
def __init__(self, key, innerjoin, chained=False):
|
|
super(EagerJoinOption, self).__init__(key)
|
|
self.innerjoin = innerjoin
|
|
self.chained = chained
|
|
|
|
def is_chained(self):
|
|
return self.chained
|
|
|
|
def process_query_property(self, query, paths, mappers):
|
|
if self.is_chained():
|
|
for path in paths:
|
|
query._attributes[("eager_join_type", path)] = self.innerjoin
|
|
else:
|
|
query._attributes[("eager_join_type", paths[-1])] = self.innerjoin
|
|
|
|
class LoadEagerFromAliasOption(PropertyOption):
|
|
|
|
def __init__(self, key, alias=None):
|
|
super(LoadEagerFromAliasOption, self).__init__(key)
|
|
if alias is not None:
|
|
if not isinstance(alias, basestring):
|
|
m, alias, is_aliased_class = mapperutil._entity_info(alias)
|
|
self.alias = alias
|
|
|
|
def process_query_property(self, query, paths, mappers):
|
|
if self.alias is not None:
|
|
if isinstance(self.alias, basestring):
|
|
mapper = mappers[-1]
|
|
(root_mapper, propname) = paths[-1][-2:]
|
|
prop = mapper.get_property(propname, resolve_synonyms=True)
|
|
self.alias = prop.target.alias(self.alias)
|
|
query._attributes[
|
|
("user_defined_eager_row_processor",
|
|
interfaces._reduce_path(paths[-1]))
|
|
] = sql_util.ColumnAdapter(self.alias)
|
|
else:
|
|
(root_mapper, propname) = paths[-1][-2:]
|
|
mapper = mappers[-1]
|
|
prop = mapper.get_property(propname, resolve_synonyms=True)
|
|
adapter = query._polymorphic_adapters.get(prop.mapper, None)
|
|
query._attributes[
|
|
("user_defined_eager_row_processor",
|
|
interfaces._reduce_path(paths[-1]))] = adapter
|
|
|
|
class _SingleParentValidator(interfaces.AttributeExtension):
|
|
def __init__(self, prop):
|
|
self.prop = prop
|
|
|
|
def _do_check(self, state, value, oldvalue, initiator):
|
|
if value is not None:
|
|
hasparent = initiator.hasparent(attributes.instance_state(value))
|
|
if hasparent and oldvalue is not value:
|
|
raise sa_exc.InvalidRequestError(
|
|
"Instance %s is already associated with an instance "
|
|
"of %s via its %s attribute, and is only allowed a "
|
|
"single parent." %
|
|
(mapperutil.instance_str(value), state.class_, self.prop)
|
|
)
|
|
return value
|
|
|
|
def append(self, state, value, initiator):
|
|
return self._do_check(state, value, None, initiator)
|
|
|
|
def set(self, state, value, oldvalue, initiator):
|
|
return self._do_check(state, value, oldvalue, initiator)
|
|
|
|
|