dibbler/sqlalchemy/testing/suite/test_results.py

368 lines
12 KiB
Python
Raw Normal View History

2017-04-15 18:33:29 +02:00
from .. import fixtures, config
from ..config import requirements
from .. import exclusions
from ..assertions import eq_
from .. import engines
from ... import testing
from sqlalchemy import Integer, String, select, util, sql, DateTime, text, func
import datetime
from ..schema import Table, Column
class RowFetchTest(fixtures.TablesTest):
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table('plain_pk', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(50))
)
Table('has_dates', metadata,
Column('id', Integer, primary_key=True),
Column('today', DateTime)
)
@classmethod
def insert_data(cls):
config.db.execute(
cls.tables.plain_pk.insert(),
[
{"id": 1, "data": "d1"},
{"id": 2, "data": "d2"},
{"id": 3, "data": "d3"},
]
)
config.db.execute(
cls.tables.has_dates.insert(),
[
{"id": 1, "today": datetime.datetime(2006, 5, 12, 12, 0, 0)}
]
)
def test_via_string(self):
row = config.db.execute(
self.tables.plain_pk.select().
order_by(self.tables.plain_pk.c.id)
).first()
eq_(
row['id'], 1
)
eq_(
row['data'], "d1"
)
def test_via_int(self):
row = config.db.execute(
self.tables.plain_pk.select().
order_by(self.tables.plain_pk.c.id)
).first()
eq_(
row[0], 1
)
eq_(
row[1], "d1"
)
def test_via_col_object(self):
row = config.db.execute(
self.tables.plain_pk.select().
order_by(self.tables.plain_pk.c.id)
).first()
eq_(
row[self.tables.plain_pk.c.id], 1
)
eq_(
row[self.tables.plain_pk.c.data], "d1"
)
@requirements.duplicate_names_in_cursor_description
def test_row_with_dupe_names(self):
result = config.db.execute(
select([self.tables.plain_pk.c.data,
self.tables.plain_pk.c.data.label('data')]).
order_by(self.tables.plain_pk.c.id)
)
row = result.first()
eq_(result.keys(), ['data', 'data'])
eq_(row, ('d1', 'd1'))
def test_row_w_scalar_select(self):
"""test that a scalar select as a column is returned as such
and that type conversion works OK.
(this is half a SQLAlchemy Core test and half to catch database
backends that may have unusual behavior with scalar selects.)
"""
datetable = self.tables.has_dates
s = select([datetable.alias('x').c.today]).as_scalar()
s2 = select([datetable.c.id, s.label('somelabel')])
row = config.db.execute(s2).first()
eq_(row['somelabel'], datetime.datetime(2006, 5, 12, 12, 0, 0))
class PercentSchemaNamesTest(fixtures.TablesTest):
"""tests using percent signs, spaces in table and column names.
This is a very fringe use case, doesn't work for MySQL
or PostgreSQL. the requirement, "percent_schema_names",
is marked "skip" by default.
"""
__requires__ = ('percent_schema_names', )
__backend__ = True
@classmethod
def define_tables(cls, metadata):
cls.tables.percent_table = Table('percent%table', metadata,
Column("percent%", Integer),
Column(
"spaces % more spaces", Integer),
)
cls.tables.lightweight_percent_table = sql.table(
'percent%table', sql.column("percent%"),
sql.column("spaces % more spaces")
)
def test_single_roundtrip(self):
percent_table = self.tables.percent_table
for params in [
{'percent%': 5, 'spaces % more spaces': 12},
{'percent%': 7, 'spaces % more spaces': 11},
{'percent%': 9, 'spaces % more spaces': 10},
{'percent%': 11, 'spaces % more spaces': 9}
]:
config.db.execute(percent_table.insert(), params)
self._assert_table()
def test_executemany_roundtrip(self):
percent_table = self.tables.percent_table
config.db.execute(
percent_table.insert(),
{'percent%': 5, 'spaces % more spaces': 12}
)
config.db.execute(
percent_table.insert(),
[{'percent%': 7, 'spaces % more spaces': 11},
{'percent%': 9, 'spaces % more spaces': 10},
{'percent%': 11, 'spaces % more spaces': 9}]
)
self._assert_table()
def _assert_table(self):
percent_table = self.tables.percent_table
lightweight_percent_table = self.tables.lightweight_percent_table
for table in (
percent_table,
percent_table.alias(),
lightweight_percent_table,
lightweight_percent_table.alias()):
eq_(
list(
config.db.execute(
table.select().order_by(table.c['percent%'])
)
),
[
(5, 12),
(7, 11),
(9, 10),
(11, 9)
]
)
eq_(
list(
config.db.execute(
table.select().
where(table.c['spaces % more spaces'].in_([9, 10])).
order_by(table.c['percent%']),
)
),
[
(9, 10),
(11, 9)
]
)
row = config.db.execute(table.select().
order_by(table.c['percent%'])).first()
eq_(row['percent%'], 5)
eq_(row['spaces % more spaces'], 12)
eq_(row[table.c['percent%']], 5)
eq_(row[table.c['spaces % more spaces']], 12)
config.db.execute(
percent_table.update().values(
{percent_table.c['spaces % more spaces']: 15}
)
)
eq_(
list(
config.db.execute(
percent_table.
select().
order_by(percent_table.c['percent%'])
)
),
[(5, 15), (7, 15), (9, 15), (11, 15)]
)
class ServerSideCursorsTest(fixtures.TestBase, testing.AssertsExecutionResults):
__requires__ = ('server_side_cursors', )
__backend__ = True
def _is_server_side(self, cursor):
if self.engine.url.drivername == 'postgresql':
return cursor.name
elif self.engine.url.drivername == 'mysql':
sscursor = __import__('MySQLdb.cursors').cursors.SSCursor
return isinstance(cursor, sscursor)
elif self.engine.url.drivername == 'mysql+pymysql':
sscursor = __import__('pymysql.cursors').cursors.SSCursor
return isinstance(cursor, sscursor)
else:
return False
def _fixture(self, server_side_cursors):
self.engine = engines.testing_engine(
options={'server_side_cursors': server_side_cursors}
)
return self.engine
def tearDown(self):
engines.testing_reaper.close_all()
self.engine.dispose()
def test_global_string(self):
engine = self._fixture(True)
result = engine.execute('select 1')
assert self._is_server_side(result.cursor)
def test_global_text(self):
engine = self._fixture(True)
result = engine.execute(text('select 1'))
assert self._is_server_side(result.cursor)
def test_global_expr(self):
engine = self._fixture(True)
result = engine.execute(select([1]))
assert self._is_server_side(result.cursor)
def test_global_off_explicit(self):
engine = self._fixture(False)
result = engine.execute(text('select 1'))
# It should be off globally ...
assert not self._is_server_side(result.cursor)
def test_stmt_option(self):
engine = self._fixture(False)
s = select([1]).execution_options(stream_results=True)
result = engine.execute(s)
# ... but enabled for this one.
assert self._is_server_side(result.cursor)
def test_conn_option(self):
engine = self._fixture(False)
# and this one
result = \
engine.connect().execution_options(stream_results=True).\
execute('select 1'
)
assert self._is_server_side(result.cursor)
def test_stmt_enabled_conn_option_disabled(self):
engine = self._fixture(False)
s = select([1]).execution_options(stream_results=True)
# not this one
result = \
engine.connect().execution_options(stream_results=False).\
execute(s)
assert not self._is_server_side(result.cursor)
def test_stmt_option_disabled(self):
engine = self._fixture(True)
s = select([1]).execution_options(stream_results=False)
result = engine.execute(s)
assert not self._is_server_side(result.cursor)
def test_aliases_and_ss(self):
engine = self._fixture(False)
s1 = select([1]).execution_options(stream_results=True).alias()
result = engine.execute(s1)
assert self._is_server_side(result.cursor)
# s1's options shouldn't affect s2 when s2 is used as a
# from_obj.
s2 = select([1], from_obj=s1)
result = engine.execute(s2)
assert not self._is_server_side(result.cursor)
def test_for_update_expr(self):
engine = self._fixture(True)
s1 = select([1], for_update=True)
result = engine.execute(s1)
assert self._is_server_side(result.cursor)
def test_for_update_string(self):
engine = self._fixture(True)
result = engine.execute('SELECT 1 FOR UPDATE')
assert self._is_server_side(result.cursor)
def test_text_no_ss(self):
engine = self._fixture(False)
s = text('select 42')
result = engine.execute(s)
assert not self._is_server_side(result.cursor)
def test_text_ss_option(self):
engine = self._fixture(False)
s = text('select 42').execution_options(stream_results=True)
result = engine.execute(s)
assert self._is_server_side(result.cursor)
@testing.provide_metadata
def test_roundtrip(self):
md = self.metadata
engine = self._fixture(True)
test_table = Table('test_table', md,
Column('id', Integer, primary_key=True),
Column('data', String(50)))
test_table.create(checkfirst=True)
test_table.insert().execute(data='data1')
test_table.insert().execute(data='data2')
eq_(test_table.select().execute().fetchall(), [(1, 'data1'
), (2, 'data2')])
test_table.update().where(
test_table.c.id == 2).values(
data=test_table.c.data +
' updated').execute()
eq_(test_table.select().execute().fetchall(),
[(1, 'data1'), (2, 'data2 updated')])
test_table.delete().execute()
eq_(select([func.count('*')]).select_from(test_table).scalar(), 0)