from .. import fixtures, config from ..config import requirements from .. import exclusions from ..assertions import eq_ from .. import engines from ... import testing from sqlalchemy import Integer, String, select, util, sql, DateTime, text, func import datetime from ..schema import Table, Column class RowFetchTest(fixtures.TablesTest): __backend__ = True @classmethod def define_tables(cls, metadata): Table('plain_pk', metadata, Column('id', Integer, primary_key=True), Column('data', String(50)) ) Table('has_dates', metadata, Column('id', Integer, primary_key=True), Column('today', DateTime) ) @classmethod def insert_data(cls): config.db.execute( cls.tables.plain_pk.insert(), [ {"id": 1, "data": "d1"}, {"id": 2, "data": "d2"}, {"id": 3, "data": "d3"}, ] ) config.db.execute( cls.tables.has_dates.insert(), [ {"id": 1, "today": datetime.datetime(2006, 5, 12, 12, 0, 0)} ] ) def test_via_string(self): row = config.db.execute( self.tables.plain_pk.select(). order_by(self.tables.plain_pk.c.id) ).first() eq_( row['id'], 1 ) eq_( row['data'], "d1" ) def test_via_int(self): row = config.db.execute( self.tables.plain_pk.select(). order_by(self.tables.plain_pk.c.id) ).first() eq_( row[0], 1 ) eq_( row[1], "d1" ) def test_via_col_object(self): row = config.db.execute( self.tables.plain_pk.select(). order_by(self.tables.plain_pk.c.id) ).first() eq_( row[self.tables.plain_pk.c.id], 1 ) eq_( row[self.tables.plain_pk.c.data], "d1" ) @requirements.duplicate_names_in_cursor_description def test_row_with_dupe_names(self): result = config.db.execute( select([self.tables.plain_pk.c.data, self.tables.plain_pk.c.data.label('data')]). order_by(self.tables.plain_pk.c.id) ) row = result.first() eq_(result.keys(), ['data', 'data']) eq_(row, ('d1', 'd1')) def test_row_w_scalar_select(self): """test that a scalar select as a column is returned as such and that type conversion works OK. (this is half a SQLAlchemy Core test and half to catch database backends that may have unusual behavior with scalar selects.) """ datetable = self.tables.has_dates s = select([datetable.alias('x').c.today]).as_scalar() s2 = select([datetable.c.id, s.label('somelabel')]) row = config.db.execute(s2).first() eq_(row['somelabel'], datetime.datetime(2006, 5, 12, 12, 0, 0)) class PercentSchemaNamesTest(fixtures.TablesTest): """tests using percent signs, spaces in table and column names. This is a very fringe use case, doesn't work for MySQL or PostgreSQL. the requirement, "percent_schema_names", is marked "skip" by default. """ __requires__ = ('percent_schema_names', ) __backend__ = True @classmethod def define_tables(cls, metadata): cls.tables.percent_table = Table('percent%table', metadata, Column("percent%", Integer), Column( "spaces % more spaces", Integer), ) cls.tables.lightweight_percent_table = sql.table( 'percent%table', sql.column("percent%"), sql.column("spaces % more spaces") ) def test_single_roundtrip(self): percent_table = self.tables.percent_table for params in [ {'percent%': 5, 'spaces % more spaces': 12}, {'percent%': 7, 'spaces % more spaces': 11}, {'percent%': 9, 'spaces % more spaces': 10}, {'percent%': 11, 'spaces % more spaces': 9} ]: config.db.execute(percent_table.insert(), params) self._assert_table() def test_executemany_roundtrip(self): percent_table = self.tables.percent_table config.db.execute( percent_table.insert(), {'percent%': 5, 'spaces % more spaces': 12} ) config.db.execute( percent_table.insert(), [{'percent%': 7, 'spaces % more spaces': 11}, {'percent%': 9, 'spaces % more spaces': 10}, {'percent%': 11, 'spaces % more spaces': 9}] ) self._assert_table() def _assert_table(self): percent_table = self.tables.percent_table lightweight_percent_table = self.tables.lightweight_percent_table for table in ( percent_table, percent_table.alias(), lightweight_percent_table, lightweight_percent_table.alias()): eq_( list( config.db.execute( table.select().order_by(table.c['percent%']) ) ), [ (5, 12), (7, 11), (9, 10), (11, 9) ] ) eq_( list( config.db.execute( table.select(). where(table.c['spaces % more spaces'].in_([9, 10])). order_by(table.c['percent%']), ) ), [ (9, 10), (11, 9) ] ) row = config.db.execute(table.select(). order_by(table.c['percent%'])).first() eq_(row['percent%'], 5) eq_(row['spaces % more spaces'], 12) eq_(row[table.c['percent%']], 5) eq_(row[table.c['spaces % more spaces']], 12) config.db.execute( percent_table.update().values( {percent_table.c['spaces % more spaces']: 15} ) ) eq_( list( config.db.execute( percent_table. select(). order_by(percent_table.c['percent%']) ) ), [(5, 15), (7, 15), (9, 15), (11, 15)] ) class ServerSideCursorsTest(fixtures.TestBase, testing.AssertsExecutionResults): __requires__ = ('server_side_cursors', ) __backend__ = True def _is_server_side(self, cursor): if self.engine.url.drivername == 'postgresql': return cursor.name elif self.engine.url.drivername == 'mysql': sscursor = __import__('MySQLdb.cursors').cursors.SSCursor return isinstance(cursor, sscursor) elif self.engine.url.drivername == 'mysql+pymysql': sscursor = __import__('pymysql.cursors').cursors.SSCursor return isinstance(cursor, sscursor) else: return False def _fixture(self, server_side_cursors): self.engine = engines.testing_engine( options={'server_side_cursors': server_side_cursors} ) return self.engine def tearDown(self): engines.testing_reaper.close_all() self.engine.dispose() def test_global_string(self): engine = self._fixture(True) result = engine.execute('select 1') assert self._is_server_side(result.cursor) def test_global_text(self): engine = self._fixture(True) result = engine.execute(text('select 1')) assert self._is_server_side(result.cursor) def test_global_expr(self): engine = self._fixture(True) result = engine.execute(select([1])) assert self._is_server_side(result.cursor) def test_global_off_explicit(self): engine = self._fixture(False) result = engine.execute(text('select 1')) # It should be off globally ... assert not self._is_server_side(result.cursor) def test_stmt_option(self): engine = self._fixture(False) s = select([1]).execution_options(stream_results=True) result = engine.execute(s) # ... but enabled for this one. assert self._is_server_side(result.cursor) def test_conn_option(self): engine = self._fixture(False) # and this one result = \ engine.connect().execution_options(stream_results=True).\ execute('select 1' ) assert self._is_server_side(result.cursor) def test_stmt_enabled_conn_option_disabled(self): engine = self._fixture(False) s = select([1]).execution_options(stream_results=True) # not this one result = \ engine.connect().execution_options(stream_results=False).\ execute(s) assert not self._is_server_side(result.cursor) def test_stmt_option_disabled(self): engine = self._fixture(True) s = select([1]).execution_options(stream_results=False) result = engine.execute(s) assert not self._is_server_side(result.cursor) def test_aliases_and_ss(self): engine = self._fixture(False) s1 = select([1]).execution_options(stream_results=True).alias() result = engine.execute(s1) assert self._is_server_side(result.cursor) # s1's options shouldn't affect s2 when s2 is used as a # from_obj. s2 = select([1], from_obj=s1) result = engine.execute(s2) assert not self._is_server_side(result.cursor) def test_for_update_expr(self): engine = self._fixture(True) s1 = select([1], for_update=True) result = engine.execute(s1) assert self._is_server_side(result.cursor) def test_for_update_string(self): engine = self._fixture(True) result = engine.execute('SELECT 1 FOR UPDATE') assert self._is_server_side(result.cursor) def test_text_no_ss(self): engine = self._fixture(False) s = text('select 42') result = engine.execute(s) assert not self._is_server_side(result.cursor) def test_text_ss_option(self): engine = self._fixture(False) s = text('select 42').execution_options(stream_results=True) result = engine.execute(s) assert self._is_server_side(result.cursor) @testing.provide_metadata def test_roundtrip(self): md = self.metadata engine = self._fixture(True) test_table = Table('test_table', md, Column('id', Integer, primary_key=True), Column('data', String(50))) test_table.create(checkfirst=True) test_table.insert().execute(data='data1') test_table.insert().execute(data='data2') eq_(test_table.select().execute().fetchall(), [(1, 'data1' ), (2, 'data2')]) test_table.update().where( test_table.c.id == 2).values( data=test_table.c.data + ' updated').execute() eq_(test_table.select().execute().fetchall(), [(1, 'data1'), (2, 'data2 updated')]) test_table.delete().execute() eq_(select([func.count('*')]).select_from(test_table).scalar(), 0)