¡@

Home 

OpenStack Study: create_volume.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import taskflow.engines

from taskflow.patterns import linear_flow

from taskflow import task

from cinder import exception

from cinder import flow_utils

from cinder.openstack.common import excutils

from cinder.openstack.common import log as logging

from cinder import rpc

from cinder import utils

from cinder.volume.flows import common

LOG = logging.getLogger(__name__)

ACTION = 'volume:create'

**** CubicPower OpenStack Study ****

class ExtractSchedulerSpecTask(flow_utils.CinderTask):

"""Extracts a spec object from a partial and/or incomplete request spec.

Reversion strategy: N/A

"""

**** CubicPower OpenStack Study ****

    def __init__(self, db, **kwargs):

        super(ExtractSchedulerSpecTask, self).__init__(addons=[ACTION],

                                                       **kwargs)

        self.db = db

**** CubicPower OpenStack Study ****

    def _populate_request_spec(self, context, volume_id, snapshot_id,

                               image_id):

        # Create the full request spec using the volume_id.

        #

        # NOTE(harlowja): this will fetch the volume from the database, if

        # the volume has been deleted before we got here then this should fail.

        #

        # In the future we might want to have a lock on the volume_id so that

        # the volume can not be deleted while its still being created?

        if not volume_id:

            msg = _("No volume_id provided to populate a request_spec from")

            raise exception.InvalidInput(reason=msg)

        volume_ref = self.db.volume_get(context, volume_id)

        volume_type_id = volume_ref.get('volume_type_id')

        vol_type = self.db.volume_type_get(context, volume_type_id)

        return {

            'volume_id': volume_id,

            'snapshot_id': snapshot_id,

            'image_id': image_id,

            'volume_properties': {

                'size': utils.as_int(volume_ref.get('size'), quiet=False),

                'availability_zone': volume_ref.get('availability_zone'),

                'volume_type_id': volume_type_id,

            },

            'volume_type': list(dict(vol_type).iteritems()),

        }

**** CubicPower OpenStack Study ****

    def execute(self, context, request_spec, volume_id, snapshot_id,

                image_id):

        # For RPC version < 1.2 backward compatibility

        if request_spec is None:

            request_spec = self._populate_request_spec(context, volume_id,

                                                       snapshot_id, image_id)

        return {

            'request_spec': request_spec,

        }

def get_flow(context, db, driver, request_spec=None,

             filter_properties=None,

             volume_id=None, snapshot_id=None, image_id=None):

    """Constructs and returns the scheduler entrypoint flow.

    This flow will do the following:

    1. Inject keys & values for dependent tasks.

    2. Extracts a scheduler specification from the provided inputs.

    3. Attaches 2 activated only on *failure* tasks (one to update the db

       status and one to notify on the MQ of the failure that occurred).

    4. Uses provided driver to to then select and continue processing of

       volume request.

    """

    create_what = {

        'context': context,

        'raw_request_spec': request_spec,

        'filter_properties': filter_properties,

        'volume_id': volume_id,

        'snapshot_id': snapshot_id,

        'image_id': image_id,

    }

    flow_name = ACTION.replace(":", "_") + "_scheduler"

    scheduler_flow = linear_flow.Flow(flow_name)

    # This will extract and clean the spec from the starting values.

    scheduler_flow.add(ExtractSchedulerSpecTask(

        db,

        rebind={'request_spec': 'raw_request_spec'}))

**** CubicPower OpenStack Study ****

def get_flow(context, db, driver, request_spec=None,

             filter_properties=None,

             volume_id=None, snapshot_id=None, image_id=None):

    """Constructs and returns the scheduler entrypoint flow.

    This flow will do the following:

    1. Inject keys & values for dependent tasks.

    2. Extracts a scheduler specification from the provided inputs.

    3. Attaches 2 activated only on *failure* tasks (one to update the db

       status and one to notify on the MQ of the failure that occurred).

    4. Uses provided driver to to then select and continue processing of

       volume request.

    """

    create_what = {

        'context': context,

        'raw_request_spec': request_spec,

        'filter_properties': filter_properties,

        'volume_id': volume_id,

        'snapshot_id': snapshot_id,

        'image_id': image_id,

    }

    flow_name = ACTION.replace(":", "_") + "_scheduler"

    scheduler_flow = linear_flow.Flow(flow_name)

    # This will extract and clean the spec from the starting values.

    scheduler_flow.add(ExtractSchedulerSpecTask(

        db,

        rebind={'request_spec': 'raw_request_spec'}))

**** CubicPower OpenStack Study ****

    def schedule_create_volume(context, request_spec, filter_properties):

        def _log_failure(cause):

            LOG.error(_("Failed to schedule_create_volume: %(cause)s") %

                      {'cause': cause})

        def _notify_failure(cause):

            """When scheduling fails send out a event that it failed."""

            topic = "scheduler.create_volume"

            payload = {

                'request_spec': request_spec,

                'volume_properties': request_spec.get('volume_properties', {}),

                'volume_id': volume_id,

                'state': 'error',

                'method': 'create_volume',

                'reason': cause,

            }

            try:

                rpc.get_notifier('scheduler').error(context, topic, payload)

            except exception.CinderException:

                LOG.exception(_("Failed notifying on %(topic)s "

                                "payload %(payload)s") % {'topic': topic,

                                                          'payload': payload})

        try:

            driver.schedule_create_volume(context, request_spec,

                                          filter_properties)

        except exception.NoValidHost as e:

            # Not host found happened, notify on the scheduler queue and log

            # that this happened and set the volume to errored out and

            # *do not* reraise the error (since whats the point).

            _notify_failure(e)

            _log_failure(e)

            common.error_out_volume(context, db, volume_id, reason=e)

        except Exception as e:

            # Some other error happened, notify on the scheduler queue and log

            # that this happened and set the volume to errored out and

            # *do* reraise the error.

            with excutils.save_and_reraise_exception():

                _notify_failure(e)

                _log_failure(e)

                common.error_out_volume(context, db, volume_id, reason=e)

    scheduler_flow.add(task.FunctorTask(schedule_create_volume))

    # Now load (but do not run) the flow using the provided initial data.

    return taskflow.engines.load(scheduler_flow, store=create_what)

**** CubicPower OpenStack Study ****

        def _log_failure(cause):

            LOG.error(_("Failed to schedule_create_volume: %(cause)s") %

                      {'cause': cause})

**** CubicPower OpenStack Study ****

        def _notify_failure(cause):

            """When scheduling fails send out a event that it failed."""

            topic = "scheduler.create_volume"

            payload = {

                'request_spec': request_spec,

                'volume_properties': request_spec.get('volume_properties', {}),

                'volume_id': volume_id,

                'state': 'error',

                'method': 'create_volume',

                'reason': cause,

            }

            try:

                rpc.get_notifier('scheduler').error(context, topic, payload)

            except exception.CinderException:

                LOG.exception(_("Failed notifying on %(topic)s "

                                "payload %(payload)s") % {'topic': topic,

                                                          'payload': payload})

        try:

            driver.schedule_create_volume(context, request_spec,

                                          filter_properties)

        except exception.NoValidHost as e:

            # Not host found happened, notify on the scheduler queue and log

            # that this happened and set the volume to errored out and

            # *do not* reraise the error (since whats the point).

            _notify_failure(e)

            _log_failure(e)

            common.error_out_volume(context, db, volume_id, reason=e)

        except Exception as e:

            # Some other error happened, notify on the scheduler queue and log

            # that this happened and set the volume to errored out and

            # *do* reraise the error.

            with excutils.save_and_reraise_exception():

                _notify_failure(e)

                _log_failure(e)

                common.error_out_volume(context, db, volume_id, reason=e)

    scheduler_flow.add(task.FunctorTask(schedule_create_volume))

    # Now load (but do not run) the flow using the provided initial data.

    return taskflow.engines.load(scheduler_flow, store=create_what)