dibbler/sqlalchemy/engine/url.py

262 lines
8.3 KiB
Python
Raw Normal View History

2017-04-15 18:27:12 +02:00
# engine/url.py
# Copyright (C) 2005-2017 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
2010-05-07 19:33:49 +02:00
"""Provides the :class:`~sqlalchemy.engine.url.URL` class which encapsulates
information about a database connection specification.
2017-04-15 18:27:12 +02:00
The URL object is created automatically when
:func:`~sqlalchemy.engine.create_engine` is called with a string
argument; alternatively, the URL is a public-facing construct which can
2010-05-07 19:33:49 +02:00
be used directly and is also accepted directly by ``create_engine()``.
"""
2017-04-15 18:27:12 +02:00
import re
from .. import exc, util
from . import Dialect
from ..dialects import registry, plugins
2010-05-07 19:33:49 +02:00
class URL(object):
"""
Represent the components of a URL used to connect to a database.
This object is suitable to be passed directly to a
2017-04-15 18:27:12 +02:00
:func:`~sqlalchemy.create_engine` call. The fields of the URL are parsed
from a string by the :func:`.make_url` function. the string
2010-05-07 19:33:49 +02:00
format of the URL is an RFC-1738-style string.
All initialization parameters are available as public attributes.
:param drivername: the name of the database backend.
This name will correspond to a module in sqlalchemy/databases
or a third party plug-in.
:param username: The user name.
:param password: database password.
:param host: The name of the host.
:param port: The port number.
:param database: The database name.
:param query: A dictionary of options to be passed to the
dialect and/or the DBAPI upon connect.
"""
def __init__(self, drivername, username=None, password=None,
host=None, port=None, database=None, query=None):
self.drivername = drivername
self.username = username
self.password = password
self.host = host
if port is not None:
self.port = int(port)
else:
self.port = None
self.database = database
self.query = query or {}
2017-04-15 18:27:12 +02:00
def __to_string__(self, hide_password=True):
2010-05-07 19:33:49 +02:00
s = self.drivername + "://"
if self.username is not None:
2017-04-15 18:27:12 +02:00
s += _rfc_1738_quote(self.username)
2010-05-07 19:33:49 +02:00
if self.password is not None:
2017-04-15 18:27:12 +02:00
s += ':' + ('***' if hide_password
else _rfc_1738_quote(self.password))
2010-05-07 19:33:49 +02:00
s += "@"
if self.host is not None:
2017-04-15 18:27:12 +02:00
if ':' in self.host:
s += "[%s]" % self.host
else:
s += self.host
2010-05-07 19:33:49 +02:00
if self.port is not None:
s += ':' + str(self.port)
if self.database is not None:
s += '/' + self.database
if self.query:
2017-04-15 18:27:12 +02:00
keys = list(self.query)
2010-05-07 19:33:49 +02:00
keys.sort()
s += '?' + "&".join("%s=%s" % (k, self.query[k]) for k in keys)
return s
2017-04-15 18:27:12 +02:00
def __str__(self):
return self.__to_string__(hide_password=False)
def __repr__(self):
return self.__to_string__()
2010-05-07 19:33:49 +02:00
def __hash__(self):
return hash(str(self))
def __eq__(self, other):
return \
isinstance(other, URL) and \
self.drivername == other.drivername and \
self.username == other.username and \
self.password == other.password and \
self.host == other.host and \
self.database == other.database and \
self.query == other.query
2017-04-15 18:27:12 +02:00
def get_backend_name(self):
if '+' not in self.drivername:
return self.drivername
else:
return self.drivername.split('+')[0]
2010-05-07 19:33:49 +02:00
2017-04-15 18:27:12 +02:00
def get_driver_name(self):
if '+' not in self.drivername:
return self.get_dialect().driver
else:
return self.drivername.split('+')[1]
2010-05-07 19:33:49 +02:00
2017-04-15 18:27:12 +02:00
def _instantiate_plugins(self, kwargs):
plugin_names = util.to_list(self.query.get('plugin', ()))
return [
plugins.load(plugin_name)(self, kwargs)
for plugin_name in plugin_names
]
def _get_entrypoint(self):
"""Return the "entry point" dialect class.
This is normally the dialect itself except in the case when the
returned class implements the get_dialect_cls() method.
2010-05-07 19:33:49 +02:00
"""
2017-04-15 18:27:12 +02:00
if '+' not in self.drivername:
name = self.drivername
else:
name = self.drivername.replace('+', '.')
cls = registry.load(name)
# check for legacy dialects that
# would return a module with 'dialect' as the
# actual class
if hasattr(cls, 'dialect') and \
isinstance(cls.dialect, type) and \
issubclass(cls.dialect, Dialect):
return cls.dialect
2010-05-07 19:33:49 +02:00
else:
2017-04-15 18:27:12 +02:00
return cls
def get_dialect(self):
"""Return the SQLAlchemy database dialect class corresponding
to this URL's driver name.
"""
entrypoint = self._get_entrypoint()
dialect_cls = entrypoint.get_dialect_cls(self)
return dialect_cls
2010-05-07 19:33:49 +02:00
def translate_connect_args(self, names=[], **kw):
2017-04-15 18:27:12 +02:00
r"""Translate url attributes into a dictionary of connection arguments.
2010-05-07 19:33:49 +02:00
Returns attributes of this url (`host`, `database`, `username`,
`password`, `port`) as a plain dictionary. The attribute names are
used as the keys by default. Unset or false attributes are omitted
from the final dictionary.
:param \**kw: Optional, alternate key names for url attributes.
2017-04-15 18:27:12 +02:00
:param names: Deprecated. Same purpose as the keyword-based alternate
names, but correlates the name to the original positionally.
2010-05-07 19:33:49 +02:00
"""
translated = {}
attribute_names = ['host', 'database', 'username', 'password', 'port']
for sname in attribute_names:
if names:
name = names.pop(0)
elif sname in kw:
name = kw[sname]
else:
name = sname
if name is not None and getattr(self, sname, False):
translated[name] = getattr(self, sname)
return translated
2017-04-15 18:27:12 +02:00
2010-05-07 19:33:49 +02:00
def make_url(name_or_url):
"""Given a string or unicode instance, produce a new URL instance.
The given string is parsed according to the RFC 1738 spec. If an
existing URL object is passed, just returns the object.
"""
2017-04-15 18:27:12 +02:00
if isinstance(name_or_url, util.string_types):
2010-05-07 19:33:49 +02:00
return _parse_rfc1738_args(name_or_url)
else:
return name_or_url
2017-04-15 18:27:12 +02:00
2010-05-07 19:33:49 +02:00
def _parse_rfc1738_args(name):
pattern = re.compile(r'''
(?P<name>[\w\+]+)://
(?:
(?P<username>[^:/]*)
2017-04-15 18:27:12 +02:00
(?::(?P<password>.*))?
2010-05-07 19:33:49 +02:00
@)?
(?:
2017-04-15 18:27:12 +02:00
(?:
\[(?P<ipv6host>[^/]+)\] |
(?P<ipv4host>[^/:]+)
)?
2010-05-07 19:33:49 +02:00
(?::(?P<port>[^/]*))?
)?
(?:/(?P<database>.*))?
2017-04-15 18:27:12 +02:00
''', re.X)
2010-05-07 19:33:49 +02:00
m = pattern.match(name)
if m is not None:
components = m.groupdict()
if components['database'] is not None:
tokens = components['database'].split('?', 2)
components['database'] = tokens[0]
2017-04-15 18:27:12 +02:00
query = (
len(tokens) > 1 and dict(util.parse_qsl(tokens[1]))) or None
if util.py2k and query is not None:
2010-05-07 19:33:49 +02:00
query = dict((k.encode('ascii'), query[k]) for k in query)
else:
query = None
components['query'] = query
2017-04-15 18:27:12 +02:00
if components['username'] is not None:
components['username'] = _rfc_1738_unquote(components['username'])
2010-05-07 19:33:49 +02:00
if components['password'] is not None:
2017-04-15 18:27:12 +02:00
components['password'] = _rfc_1738_unquote(components['password'])
2010-05-07 19:33:49 +02:00
2017-04-15 18:27:12 +02:00
ipv4host = components.pop('ipv4host')
ipv6host = components.pop('ipv6host')
components['host'] = ipv4host or ipv6host
2010-05-07 19:33:49 +02:00
name = components.pop('name')
return URL(name, **components)
else:
raise exc.ArgumentError(
"Could not parse rfc1738 URL from string '%s'" % name)
2017-04-15 18:27:12 +02:00
def _rfc_1738_quote(text):
return re.sub(r'[:@/]', lambda m: "%%%X" % ord(m.group(0)), text)
def _rfc_1738_unquote(text):
return util.unquote(text)
2010-05-07 19:33:49 +02:00
def _parse_keyvalue_args(name):
2017-04-15 18:27:12 +02:00
m = re.match(r'(\w+)://(.*)', name)
2010-05-07 19:33:49 +02:00
if m is not None:
(name, args) = m.group(1, 2)
2017-04-15 18:27:12 +02:00
opts = dict(util.parse_qsl(args))
2010-05-07 19:33:49 +02:00
return URL(name, *opts)
else:
return None