¡@

Home 

OpenStack Study: session.py

OpenStack Index

**** CubicPower OpenStack Study ****

# vim: tabstop=4 shiftwidth=4 softtabstop=4

# Copyright 2010 United States Government as represented by the

# Administrator of the National Aeronautics and Space Administration.

# All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

"""Session Handling for SQLAlchemy backend.

Initializing:

* Call set_defaults with the minimal of the following kwargs:

sql_connection, sqlite_db

Example:

session.set_defaults(

sql_connection="sqlite:///var/lib/cinder/sqlite.db",

sqlite_db="/var/lib/cinder/sqlite.db")

Recommended ways to use sessions within this framework:

* Don't use them explicitly; this is like running with AUTOCOMMIT=1.

model_query() will implicitly use a session when called without one

supplied. This is the ideal situation because it will allow queries

to be automatically retried if the database connection is interrupted.

Note: Automatic retry will be enabled in a future patch.

It is generally fine to issue several queries in a row like this. Even though

they may be run in separate transactions and/or separate sessions, each one

will see the data from the prior calls. If needed, undo- or rollback-like

functionality should be handled at a logical level. For an example, look at

the code around quotas and reservation_rollback().

Examples:

**** CubicPower OpenStack Study ****

def set_defaults(sql_connection, sqlite_db):

    """Set defaults for configuration variables."""

    cfg.set_defaults(database_opts,

                     connection=sql_connection)

    cfg.set_defaults(sqlite_db_opts,

                     sqlite_db=sqlite_db)

**** CubicPower OpenStack Study ****

def cleanup():

    global _ENGINE, _MAKER

    if _MAKER:

        _MAKER.close_all()

        _MAKER = None

    if _ENGINE:

        _ENGINE.dispose()

        _ENGINE = None

**** CubicPower OpenStack Study ****

class SqliteForeignKeysListener(PoolListener):

"""

Ensures that the foreign key constraints are enforced in SQLite.

The foreign key constraints are disabled by

**** CubicPower OpenStack Study ****

    def connect(self, dbapi_con, con_record):

        dbapi_con.execute('pragma foreign_keys=ON')

def get_session(autocommit=True, expire_on_commit=False,

                sqlite_fk=False):

    """Return a SQLAlchemy session."""

    global _MAKER

    if _MAKER is None:

        engine = get_engine(sqlite_fk=sqlite_fk)

        _MAKER = get_maker(engine, autocommit, expire_on_commit)

    session = _MAKER()

    return session

# note(boris-42): In current versions of DB backends unique constraint

# violation messages follow the structure:

#

# sqlite:

# 1 column - (IntegrityError) column c1 is not unique

# N columns - (IntegrityError) column c1, c2, ..., N are not unique

#

# postgres:

# 1 column - (IntegrityError) duplicate key value violates unique

#               constraint "users_c1_key"

# N columns - (IntegrityError) duplicate key value violates unique

#               constraint "name_of_our_constraint"

#

# mysql:

# 1 column - (IntegrityError) (1062, "Duplicate entry 'value_of_c1' for key

#               'c1'")

# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined

#               with -' for key 'name_of_our_constraint'")

_DUP_KEY_RE_DB = {

    "sqlite": re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"),

    "postgresql": re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),

    "mysql": re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$")

}

def _raise_if_duplicate_entry_error(integrity_error, engine_name):

    """

    In this function will be raised DBDuplicateEntry exception if integrity

    error wrap unique constraint violation.

    """

**** CubicPower OpenStack Study ****

def get_session(autocommit=True, expire_on_commit=False,

                sqlite_fk=False):

    """Return a SQLAlchemy session."""

    global _MAKER

    if _MAKER is None:

        engine = get_engine(sqlite_fk=sqlite_fk)

        _MAKER = get_maker(engine, autocommit, expire_on_commit)

    session = _MAKER()

    return session

# note(boris-42): In current versions of DB backends unique constraint

# violation messages follow the structure:

#

# sqlite:

# 1 column - (IntegrityError) column c1 is not unique

# N columns - (IntegrityError) column c1, c2, ..., N are not unique

#

# postgres:

# 1 column - (IntegrityError) duplicate key value violates unique

#               constraint "users_c1_key"

# N columns - (IntegrityError) duplicate key value violates unique

#               constraint "name_of_our_constraint"

#

# mysql:

# 1 column - (IntegrityError) (1062, "Duplicate entry 'value_of_c1' for key

#               'c1'")

# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined

#               with -' for key 'name_of_our_constraint'")

_DUP_KEY_RE_DB = {

    "sqlite": re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"),

    "postgresql": re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),

    "mysql": re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$")

}

**** CubicPower OpenStack Study ****

def _raise_if_duplicate_entry_error(integrity_error, engine_name):

    """

    In this function will be raised DBDuplicateEntry exception if integrity

    error wrap unique constraint violation.

    """

**** CubicPower OpenStack Study ****

    def get_columns_from_uniq_cons_or_name(columns):

        # note(boris-42): UniqueConstraint name convention: "uniq_c1_x_c2_x_c3"

        # means that columns c1, c2, c3 are in UniqueConstraint.

        uniqbase = "uniq_"

        if not columns.startswith(uniqbase):

            if engine_name == "postgresql":

                return [columns[columns.index("_") + 1:columns.rindex("_")]]

            return [columns]

        return columns[len(uniqbase):].split("_x_")

    if engine_name not in ["mysql", "sqlite", "postgresql"]:

        return

    m = _DUP_KEY_RE_DB[engine_name].match(integrity_error.message)

    if not m:

        return

    columns = m.group(1)

    if engine_name == "sqlite":

        columns = columns.strip().split(", ")

    else:

        columns = get_columns_from_uniq_cons_or_name(columns)

    raise exception.DBDuplicateEntry(columns, integrity_error)

# NOTE(comstud): In current versions of DB backends, Deadlock violation

# messages follow the structure:

#

# mysql:

# (OperationalError) (1213, 'Deadlock found when trying to get lock; try '

#                     'restarting transaction')  

_DEADLOCK_RE_DB = {

    "mysql": re.compile(r"^.*\(1213, 'Deadlock.*")

}

def _raise_if_deadlock_error(operational_error, engine_name):

    """

    Raise DBDeadlock exception if OperationalError contains a Deadlock

    condition.

    """

    re = _DEADLOCK_RE_DB.get(engine_name)

    if re is None:

        return

    m = re.match(operational_error.message)

    if not m:

        return

    raise exception.DBDeadlock(operational_error)

def _wrap_db_error(f):

**** CubicPower OpenStack Study ****

def _raise_if_deadlock_error(operational_error, engine_name):

    """

    Raise DBDeadlock exception if OperationalError contains a Deadlock

    condition.

    """

    re = _DEADLOCK_RE_DB.get(engine_name)

    if re is None:

        return

    m = re.match(operational_error.message)

    if not m:

        return

    raise exception.DBDeadlock(operational_error)

**** CubicPower OpenStack Study ****

def _wrap_db_error(f):

**** CubicPower OpenStack Study ****

    def _wrap(*args, **kwargs):

        try:

            return f(*args, **kwargs)

        except UnicodeEncodeError:

            raise exception.DBInvalidUnicodeParameter()

        # note(boris-42): We should catch unique constraint violation and

        # wrap it by our own DBDuplicateEntry exception. Unique constraint

        # violation is wrapped by IntegrityError.

        except sqla_exc.OperationalError as e:

            _raise_if_deadlock_error(e, get_engine().name)

            # NOTE(comstud): A lot of code is checking for OperationalError

            # so let's not wrap it for now.

            raise

        except sqla_exc.IntegrityError as e:

            # note(boris-42): SqlAlchemy doesn't unify errors from different

            # DBs so we must do this. Also in some tables (for example

            # instance_types) there are more than one unique constraint. This

            # means we should get names of columns, which values violate

            # unique constraint, from error message.

            _raise_if_duplicate_entry_error(e, get_engine().name)

            raise exception.DBError(e)

        except Exception as e:

            LOG.exception(_('DB exception wrapped.'))

            raise exception.DBError(e)

    _wrap.func_name = f.func_name

    return _wrap

def get_engine(sqlite_fk=False):

    """Return a SQLAlchemy engine."""

    global _ENGINE

    if _ENGINE is None:

        _ENGINE = create_engine(CONF.database.connection,

                                sqlite_fk=sqlite_fk)

    return _ENGINE

def _synchronous_switch_listener(dbapi_conn, connection_rec):

    """Switch sqlite connections to non-synchronous mode."""

    dbapi_conn.execute("PRAGMA synchronous = OFF")

def _add_regexp_listener(dbapi_con, con_record):

    """Add REGEXP function to sqlite connections."""

**** CubicPower OpenStack Study ****

def get_engine(sqlite_fk=False):

    """Return a SQLAlchemy engine."""

    global _ENGINE

    if _ENGINE is None:

        _ENGINE = create_engine(CONF.database.connection,

                                sqlite_fk=sqlite_fk)

    return _ENGINE

**** CubicPower OpenStack Study ****

def _synchronous_switch_listener(dbapi_conn, connection_rec):

    """Switch sqlite connections to non-synchronous mode."""

    dbapi_conn.execute("PRAGMA synchronous = OFF")

**** CubicPower OpenStack Study ****

def _add_regexp_listener(dbapi_con, con_record):

    """Add REGEXP function to sqlite connections."""

**** CubicPower OpenStack Study ****

    def regexp(expr, item):

        reg = re.compile(expr)

        return reg.search(six.text_type(item)) is not None

    dbapi_con.create_function('regexp', 2, regexp)

def _greenthread_yield(dbapi_con, con_record):

    """

    Ensure other greenthreads get a chance to execute by forcing a context

    switch. With common database backends (eg MySQLdb and sqlite), there is

    no implicit yield caused by network I/O since they are implemented by

    C libraries that eventlet cannot monkey patch.

    """

    greenthread.sleep(0)

def _ping_listener(dbapi_conn, connection_rec, connection_proxy):

    """

    Ensures that MySQL connections checked out of the

    pool are alive.

    Borrowed from:

    http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f

    """

    try:

        dbapi_conn.cursor().execute('select 1')

    except dbapi_conn.OperationalError as ex:

        if ex.args[0] in (2006, 2013, 2014, 2045, 2055):

            LOG.warn(_('Got mysql server has gone away: %s'), ex)

            raise sqla_exc.DisconnectionError("Database server went away")

        else:

            raise

def _is_db_connection_error(args):

    """Return True if error in connecting to db."""

    # NOTE(adam_g): This is currently MySQL specific and needs to be extended

    #               to support Postgres and others.

    conn_err_codes = ('2002', '2003', '2006')

    for err_code in conn_err_codes:

        if args.find(err_code) != -1:

            return True

    return False

def create_engine(sql_connection, sqlite_fk=False):

    """Return a new SQLAlchemy engine."""

    connection_dict = sqlalchemy.engine.url.make_url(sql_connection)

    engine_args = {

        "pool_recycle": CONF.database.idle_timeout,

        "echo": False,

        'convert_unicode': True,

    }

    # Map our SQL debug level to SQLAlchemy's options

    if CONF.database.connection_debug >= 100:

        engine_args['echo'] = 'debug'

    elif CONF.database.connection_debug >= 50:

        engine_args['echo'] = True

    if "sqlite" in connection_dict.drivername:

        if sqlite_fk:

            engine_args["listeners"] = [SqliteForeignKeysListener()]

        engine_args["poolclass"] = NullPool

        if CONF.database.connection == "sqlite://":

            engine_args["poolclass"] = StaticPool

            engine_args["connect_args"] = {'check_same_thread': False}

    else:

        engine_args['pool_size'] = CONF.database.max_pool_size

        if CONF.database.max_overflow is not None:

            engine_args['max_overflow'] = CONF.database.max_overflow

    engine = sqlalchemy.create_engine(sql_connection, **engine_args)

    sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield)

    if 'mysql' in connection_dict.drivername:

        sqlalchemy.event.listen(engine, 'checkout', _ping_listener)

    elif 'sqlite' in connection_dict.drivername:

        if not CONF.sqlite_synchronous:

            sqlalchemy.event.listen(engine, 'connect',

                                    _synchronous_switch_listener)

        sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener)

    if (CONF.database.connection_trace and

            engine.dialect.dbapi.__name__ == 'MySQLdb'):

        _patch_mysqldb_with_stacktrace_comments()

    try:

        engine.connect()

    except sqla_exc.OperationalError as e:

        if not _is_db_connection_error(e.args[0]):

            raise

        remaining = CONF.database.max_retries

        if remaining == -1:

            remaining = 'infinite'

        while True:

            msg = _('SQL connection failed. %s attempts left.')

            LOG.warn(msg % remaining)

            if remaining != 'infinite':

                remaining -= 1

            time.sleep(CONF.database.retry_interval)

            try:

                engine.connect()

                break

            except sqla_exc.OperationalError as e:

                if (remaining != 'infinite' and remaining == 0) or \

                        not _is_db_connection_error(e.args[0]):

                    raise

    return engine

**** CubicPower OpenStack Study ****

def _greenthread_yield(dbapi_con, con_record):

    """

    Ensure other greenthreads get a chance to execute by forcing a context

    switch. With common database backends (eg MySQLdb and sqlite), there is

    no implicit yield caused by network I/O since they are implemented by

    C libraries that eventlet cannot monkey patch.

    """

    greenthread.sleep(0)

**** CubicPower OpenStack Study ****

def _ping_listener(dbapi_conn, connection_rec, connection_proxy):

    """

    Ensures that MySQL connections checked out of the

    pool are alive.

    Borrowed from:

    http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f

    """

    try:

        dbapi_conn.cursor().execute('select 1')

    except dbapi_conn.OperationalError as ex:

        if ex.args[0] in (2006, 2013, 2014, 2045, 2055):

            LOG.warn(_('Got mysql server has gone away: %s'), ex)

            raise sqla_exc.DisconnectionError("Database server went away")

        else:

            raise

**** CubicPower OpenStack Study ****

def _is_db_connection_error(args):

    """Return True if error in connecting to db."""

    # NOTE(adam_g): This is currently MySQL specific and needs to be extended

    #               to support Postgres and others.

    conn_err_codes = ('2002', '2003', '2006')

    for err_code in conn_err_codes:

        if args.find(err_code) != -1:

            return True

    return False

**** CubicPower OpenStack Study ****

def create_engine(sql_connection, sqlite_fk=False):

    """Return a new SQLAlchemy engine."""

    connection_dict = sqlalchemy.engine.url.make_url(sql_connection)

    engine_args = {

        "pool_recycle": CONF.database.idle_timeout,

        "echo": False,

        'convert_unicode': True,

    }

    # Map our SQL debug level to SQLAlchemy's options

    if CONF.database.connection_debug >= 100:

        engine_args['echo'] = 'debug'

    elif CONF.database.connection_debug >= 50:

        engine_args['echo'] = True

    if "sqlite" in connection_dict.drivername:

        if sqlite_fk:

            engine_args["listeners"] = [SqliteForeignKeysListener()]

        engine_args["poolclass"] = NullPool

        if CONF.database.connection == "sqlite://":

            engine_args["poolclass"] = StaticPool

            engine_args["connect_args"] = {'check_same_thread': False}

    else:

        engine_args['pool_size'] = CONF.database.max_pool_size

        if CONF.database.max_overflow is not None:

            engine_args['max_overflow'] = CONF.database.max_overflow

    engine = sqlalchemy.create_engine(sql_connection, **engine_args)

    sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield)

    if 'mysql' in connection_dict.drivername:

        sqlalchemy.event.listen(engine, 'checkout', _ping_listener)

    elif 'sqlite' in connection_dict.drivername:

        if not CONF.sqlite_synchronous:

            sqlalchemy.event.listen(engine, 'connect',

                                    _synchronous_switch_listener)

        sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener)

    if (CONF.database.connection_trace and

            engine.dialect.dbapi.__name__ == 'MySQLdb'):

        _patch_mysqldb_with_stacktrace_comments()

    try:

        engine.connect()

    except sqla_exc.OperationalError as e:

        if not _is_db_connection_error(e.args[0]):

            raise

        remaining = CONF.database.max_retries

        if remaining == -1:

            remaining = 'infinite'

        while True:

            msg = _('SQL connection failed. %s attempts left.')

            LOG.warn(msg % remaining)

            if remaining != 'infinite':

                remaining -= 1

            time.sleep(CONF.database.retry_interval)

            try:

                engine.connect()

                break

            except sqla_exc.OperationalError as e:

                if (remaining != 'infinite' and remaining == 0) or \

                        not _is_db_connection_error(e.args[0]):

                    raise

    return engine

**** CubicPower OpenStack Study ****

class Query(sqlalchemy.orm.query.Query):

"""Subclass of sqlalchemy.query with soft_delete() method."""

**** CubicPower OpenStack Study ****

    def soft_delete(self, synchronize_session='evaluate'):

        return self.update({'deleted': literal_column('id'),

                            'updated_at': literal_column('updated_at'),

                            'deleted_at': timeutils.utcnow()},

                           synchronize_session=synchronize_session)

**** CubicPower OpenStack Study ****

class Session(sqlalchemy.orm.session.Session):

"""Custom Session class to avoid SqlAlchemy Session monkey patching."""

@_wrap_db_error

**** CubicPower OpenStack Study ****

    def query(self, *args, **kwargs):

        return super(Session, self).query(*args, **kwargs)

    @_wrap_db_error

**** CubicPower OpenStack Study ****

    def flush(self, *args, **kwargs):

        return super(Session, self).flush(*args, **kwargs)

    @_wrap_db_error

**** CubicPower OpenStack Study ****

    def execute(self, *args, **kwargs):

        return super(Session, self).execute(*args, **kwargs)

def get_maker(engine, autocommit=True, expire_on_commit=False):

    """Return a SQLAlchemy sessionmaker using the given engine."""

    return sqlalchemy.orm.sessionmaker(bind=engine,

                                       class_=Session,

                                       autocommit=autocommit,

                                       expire_on_commit=expire_on_commit,

                                       query_cls=Query)

def _patch_mysqldb_with_stacktrace_comments():

    """Adds current stack trace as a comment in queries by patching

    MySQLdb.cursors.BaseCursor._do_query.

    """

    import MySQLdb.cursors

    import traceback

    old_mysql_do_query = MySQLdb.cursors.BaseCursor._do_query

**** CubicPower OpenStack Study ****

def get_maker(engine, autocommit=True, expire_on_commit=False):

    """Return a SQLAlchemy sessionmaker using the given engine."""

    return sqlalchemy.orm.sessionmaker(bind=engine,

                                       class_=Session,

                                       autocommit=autocommit,

                                       expire_on_commit=expire_on_commit,

                                       query_cls=Query)

**** CubicPower OpenStack Study ****

def _patch_mysqldb_with_stacktrace_comments():

    """Adds current stack trace as a comment in queries by patching

    MySQLdb.cursors.BaseCursor._do_query.

    """

    import MySQLdb.cursors

    import traceback

    old_mysql_do_query = MySQLdb.cursors.BaseCursor._do_query

**** CubicPower OpenStack Study ****

    def _do_query(self, q):

        stack = ''

        for file, line, method, function in traceback.extract_stack():

            # exclude various common things from trace

            if file.endswith('session.py') and method == '_do_query':

                continue

            if file.endswith('api.py') and method == 'wrapper':

                continue

            if file.endswith('utils.py') and method == '_inner':

                continue

            if file.endswith('exception.py') and method == '_wrap':

                continue

            # db/api is just a wrapper around db/sqlalchemy/api

            if file.endswith('db/api.py'):

                continue

            # only trace inside cinder

            index = file.rfind('cinder')

            if index == -1:

                continue

            stack += "File:%s:%s Method:%s() Line:%s | " \

                     % (file[index:], line, method, function)

        # strip trailing " | " from stack

        if stack:

            stack = stack[:-3]

            qq = "%s /* %s */" % (q, stack)

        else:

            qq = q

        old_mysql_do_query(self, qq)

    setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query)