¡@

Home 

OpenStack Study: test_tasks.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2013 IBM Corp.

# All Rights Reserved.

#

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import os

import uuid

import fixtures

import requests

from glance.openstack.common import jsonutils

from glance.tests import functional

TENANT1 = str(uuid.uuid4())

TENANT2 = str(uuid.uuid4())

TENANT3 = str(uuid.uuid4())

TENANT4 = str(uuid.uuid4())

**** CubicPower OpenStack Study ****

class TestTasks(functional.FunctionalTest):

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(TestTasks, self).setUp()

        self.cleanup()

        self.file_path = self._stash_file()

        self.api_server.deployment_flavor = 'noauth'

        self.start_servers(**self.__dict__.copy())

**** CubicPower OpenStack Study ****

    def _url(self, path):

        return 'http://127.0.0.1:%d%s' % (self.api_port, path)

**** CubicPower OpenStack Study ****

    def _headers(self, custom_headers=None):

        base_headers = {

            'X-Identity-Status': 'Confirmed',

            'X-Auth-Token': '932c5c84-02ac-4fe5-a9ba-620af0e2bb96',

            'X-User-Id': 'f9a41d13-0c13-47e9-bee2-ce4e8bfe958e',

            'X-Tenant-Id': TENANT1,

            'X-Roles': 'member',

        }

        base_headers.update(custom_headers or {})

        return base_headers

**** CubicPower OpenStack Study ****

    def _stash_file(self):

        self.tmp_dir = self.useFixture(fixtures.TempDir()).path

        self.store_dir = os.path.join(self.tmp_dir, 'images')

        os.mkdir(self.store_dir)

        file_path = os.path.join(self.store_dir, 'foo')

        with open(file_path, 'w') as f:

            f.write('blah')

        return 'file://%s' % file_path

**** CubicPower OpenStack Study ****

    def test_task_lifecycle(self):

        # Task list should be empty

        path = self._url('/v2/tasks')

        response = requests.get(path, headers=self._headers())

        self.assertEqual(200, response.status_code)

        tasks = jsonutils.loads(response.text)['tasks']

        self.assertEqual(0, len(tasks))

        # Create a task

        path = self._url('/v2/tasks')

        headers = self._headers({'content-type': 'application/json'})

        data = jsonutils.dumps({

            "type": "import",

            "input": {

                "import_from": self.file_path,

                "import_from_format": "qcow2",

                "image_properties": {

                    'disk_format': 'vhd',

                    'container_format': 'ovf'

                }

            }

        })

        response = requests.post(path, headers=headers, data=data)

        self.assertEqual(201, response.status_code)

        # Returned task entity should have a generated id and status

        task = jsonutils.loads(response.text)

        task_id = task['id']

        self.assertTrue('Location' in response.headers)

        self.assertEqual(path + '/' + task_id, response.headers['Location'])

        checked_keys = set([u'created_at',

                            u'id',

                            u'input',

                            u'owner',

                            u'schema',

                            u'self',

                            u'status',

                            u'type',

                            u'updated_at'])

        self.assertEqual(set(task.keys()), checked_keys)

        expected_task = {

            'status': 'pending',

            'type': 'import',

            'input': {

                "import_from": self.file_path,

                "import_from_format": "qcow2",

                "image_properties": {

                    'disk_format': 'vhd',

                    'container_format': 'ovf'

                }},

            'schema': '/v2/schemas/task',

        }

        for key, value in expected_task.items():

            self.assertEqual(task[key], value, key)

        # Tasks list should now have one entry

        path = self._url('/v2/tasks')

        response = requests.get(path, headers=self._headers())

        self.assertEqual(200, response.status_code)

        tasks = jsonutils.loads(response.text)['tasks']

        self.assertEqual(1, len(tasks))

        self.assertEqual(tasks[0]['id'], task_id)

        # Attempt to delete a task

        path = self._url('/v2/tasks/%s' % tasks[0]['id'])

        response = requests.delete(path, headers=self._headers())

        self.assertEqual(405, response.status_code)

        self.assertIsNotNone(response.headers.get('Allow'))

        self.assertEqual('GET', response.headers.get('Allow'))

        self.stop_servers()