¡@

Home 

OpenStack Study: reaper.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright (c) 2010-2012 OpenStack Foundation

#

# Licensed under the Apache License, Version 2.0 (the "License");

# you may not use this file except in compliance with the License.

# You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or

# implied.

# See the License for the specific language governing permissions and

# limitations under the License.

import os

import random

from swift import gettext_ as _

from logging import DEBUG

from math import sqrt

from time import time, ctime

from eventlet import GreenPool, sleep, Timeout

import swift.common.db

from swift.account.backend import AccountBroker, DATADIR

from swift.common.direct_client import direct_delete_container, \

direct_delete_object, direct_get_container

from swift.common.exceptions import ClientException

from swift.common.ring import Ring

from swift.common.utils import get_logger, whataremyips, ismount, \

config_true_value

from swift.common.daemon import Daemon

**** CubicPower OpenStack Study ****

class AccountReaper(Daemon):

"""

Removes data from status=DELETED accounts. These are accounts that have

been asked to be removed by the reseller via services

remove_storage_account XMLRPC call.

The account is not deleted immediately by the services call, but instead

the account is simply marked for deletion by setting the status column in

the account_stat table of the account database. This account reaper scans

for such accounts and removes the data in the background. The background

deletion process will occur on the primary account server for the account.

:param server_conf: The [account-server] dictionary of the account server

configuration file

:param reaper_conf: The [account-reaper] dictionary of the account server

configuration file

See the etc/account-server.conf-sample for information on the possible

configuration parameters.

"""

**** CubicPower OpenStack Study ****

    def __init__(self, conf):

        self.conf = conf

        self.logger = get_logger(conf, log_route='account-reaper')

        self.devices = conf.get('devices', '/srv/node')

        self.mount_check = config_true_value(conf.get('mount_check', 'true'))

        self.interval = int(conf.get('interval', 3600))

        self.swift_dir = conf.get('swift_dir', '/etc/swift')

        self.account_ring = None

        self.container_ring = None

        self.object_ring = None

        self.node_timeout = int(conf.get('node_timeout', 10))

        self.conn_timeout = float(conf.get('conn_timeout', 0.5))

        self.myips = whataremyips()

        self.concurrency = int(conf.get('concurrency', 25))

        self.container_concurrency = self.object_concurrency = \

            sqrt(self.concurrency)

        self.container_pool = GreenPool(size=self.container_concurrency)

        swift.common.db.DB_PREALLOCATION = \

            config_true_value(conf.get('db_preallocation', 'f'))

        self.delay_reaping = int(conf.get('delay_reaping') or 0)

        reap_warn_after = float(conf.get('reap_warn_after') or 86400 * 30)

        self.reap_not_done_after = reap_warn_after + self.delay_reaping

**** CubicPower OpenStack Study ****

    def get_account_ring(self):

        """The account :class:`swift.common.ring.Ring` for the cluster."""

        if not self.account_ring:

            self.account_ring = Ring(self.swift_dir, ring_name='account')

        return self.account_ring

**** CubicPower OpenStack Study ****

    def get_container_ring(self):

        """The container :class:`swift.common.ring.Ring` for the cluster."""

        if not self.container_ring:

            self.container_ring = Ring(self.swift_dir, ring_name='container')

        return self.container_ring

**** CubicPower OpenStack Study ****

    def get_object_ring(self):

        """The object :class:`swift.common.ring.Ring` for the cluster."""

        if not self.object_ring:

            self.object_ring = Ring(self.swift_dir, ring_name='object')

        return self.object_ring

**** CubicPower OpenStack Study ****

    def run_forever(self, *args, **kwargs):

        """Main entry point when running the reaper in normal daemon mode.

        This repeatedly calls :func:`reap_once` no quicker than the

        configuration interval.

        """

        self.logger.debug(_('Daemon started.'))

        sleep(random.random() * self.interval)

        while True:

            begin = time()

            self.run_once()

            elapsed = time() - begin

            if elapsed < self.interval:

                sleep(self.interval - elapsed)

**** CubicPower OpenStack Study ****

    def run_once(self, *args, **kwargs):

        """

        Main entry point when running the reaper in 'once' mode, where it will

        do a single pass over all accounts on the server. This is called

        repeatedly by :func:`run_forever`. This will call :func:`reap_device`

        once for each device on the server.

        """

        self.logger.debug(_('Begin devices pass: %s'), self.devices)

        begin = time()

        try:

            for device in os.listdir(self.devices):

                if self.mount_check and not ismount(

                        os.path.join(self.devices, device)):

                    self.logger.increment('errors')

                    self.logger.debug(

                        _('Skipping %s as it is not mounted'), device)

                    continue

                self.reap_device(device)

        except (Exception, Timeout):

            self.logger.exception(_("Exception in top-level account reaper "

                                    "loop"))

        elapsed = time() - begin

        self.logger.info(_('Devices pass completed: %.02fs'), elapsed)

**** CubicPower OpenStack Study ****

    def reap_device(self, device):

        """

        Called once per pass for each device on the server. This will scan the

        accounts directory for the device, looking for partitions this device

        is the primary for, then looking for account databases that are marked

        status=DELETED and still have containers and calling

        :func:`reap_account`. Account databases marked status=DELETED that no

        longer have containers will eventually be permanently removed by the

        reclaim process within the account replicator (see

        :mod:`swift.db_replicator`).

        :param device: The device to look for accounts to be deleted.

        """

        datadir = os.path.join(self.devices, device, DATADIR)

        if not os.path.exists(datadir):

            return

        for partition in os.listdir(datadir):

            partition_path = os.path.join(datadir, partition)

            if not partition.isdigit():

                continue

            nodes = self.get_account_ring().get_part_nodes(int(partition))

            if nodes[0]['ip'] not in self.myips or \

                    not os.path.isdir(partition_path):

                continue

            for suffix in os.listdir(partition_path):

                suffix_path = os.path.join(partition_path, suffix)

                if not os.path.isdir(suffix_path):

                    continue

                for hsh in os.listdir(suffix_path):

                    hsh_path = os.path.join(suffix_path, hsh)

                    if not os.path.isdir(hsh_path):

                        continue

                    for fname in sorted(os.listdir(hsh_path), reverse=True):

                        if fname.endswith('.ts'):

                            break

                        elif fname.endswith('.db'):

                            self.start_time = time()

                            broker = \

                                AccountBroker(os.path.join(hsh_path, fname))

                            if broker.is_status_deleted() and \

                                    not broker.empty():

                                self.reap_account(broker, partition, nodes)

**** CubicPower OpenStack Study ****

    def reap_account(self, broker, partition, nodes):

        """

        Called once per pass for each account this server is the primary for

        and attempts to delete the data for the given account. The reaper will

        only delete one account at any given time. It will call

        :func:`reap_container` up to sqrt(self.concurrency) times concurrently

        while reaping the account.

        If there is any exception while deleting a single container, the

        process will continue for any other containers and the failed

        containers will be tried again the next time this function is called

        with the same parameters.

        If there is any exception while listing the containers for deletion,

        the process will stop (but will obviously be tried again the next time

        this function is called with the same parameters). This isn't likely

        since the listing comes from the local database.

        After the process completes (successfully or not) statistics about what

        was accomplished will be logged.

        This function returns nothing and should raise no exception but only

        update various self.stats_* values for what occurs.

        :param broker: The AccountBroker for the account to delete.

        :param partition: The partition in the account ring the account is on.

        :param nodes: The primary node dicts for the account to delete.

        .. seealso::

            :class:`swift.account.backend.AccountBroker` for the broker class.

        .. seealso::

            :func:`swift.common.ring.Ring.get_nodes` for a description

            of the node dicts.

        """

        begin = time()

        info = broker.get_info()

        if time() - float(info['delete_timestamp']) <= self.delay_reaping:

            return False

        account = info['account']

        self.logger.info(_('Beginning pass on account %s'), account)

        self.stats_return_codes = {}

        self.stats_containers_deleted = 0

        self.stats_objects_deleted = 0

        self.stats_containers_remaining = 0

        self.stats_objects_remaining = 0

        self.stats_containers_possibly_remaining = 0

        self.stats_objects_possibly_remaining = 0

        try:

            marker = ''

            while True:

                containers = \

                    list(broker.list_containers_iter(1000, marker, None, None,

                                                     None))

                if not containers:

                    break

                try:

                    for (container, _junk, _junk, _junk) in containers:

                        self.container_pool.spawn(self.reap_container, account,

                                                  partition, nodes, container)

                    self.container_pool.waitall()

                except (Exception, Timeout):

                    self.logger.exception(

                        _('Exception with containers for account %s'), account)

                marker = containers[-1][0]

                if marker == '':

                    break

            log = 'Completed pass on account %s' % account

        except (Exception, Timeout):

            self.logger.exception(

                _('Exception with account %s'), account)

            log = _('Incomplete pass on account %s') % account

        if self.stats_containers_deleted:

            log += _(', %s containers deleted') % self.stats_containers_deleted

        if self.stats_objects_deleted:

            log += _(', %s objects deleted') % self.stats_objects_deleted

        if self.stats_containers_remaining:

            log += _(', %s containers remaining') % \

                self.stats_containers_remaining

        if self.stats_objects_remaining:

            log += _(', %s objects remaining') % self.stats_objects_remaining

        if self.stats_containers_possibly_remaining:

            log += _(', %s containers possibly remaining') % \

                self.stats_containers_possibly_remaining

        if self.stats_objects_possibly_remaining:

            log += _(', %s objects possibly remaining') % \

                self.stats_objects_possibly_remaining

        if self.stats_return_codes:

            log += _(', return codes: ')

            for code in sorted(self.stats_return_codes):

                log += '%s %sxxs, ' % (self.stats_return_codes[code], code)

            log = log[:-2]

        log += _(', elapsed: %.02fs') % (time() - begin)

        self.logger.info(log)

        self.logger.timing_since('timing', self.start_time)

        if self.stats_containers_remaining and \

           begin - float(info['delete_timestamp']) >= self.reap_not_done_after:

            self.logger.warn(_('Account %s has not been reaped since %s') %

                             (account, ctime(float(info['delete_timestamp']))))

        return True

**** CubicPower OpenStack Study ****

    def reap_container(self, account, account_partition, account_nodes,

                       container):

        """

        Deletes the data and the container itself for the given container. This

        will call :func:`reap_object` up to sqrt(self.concurrency) times

        concurrently for the objects in the container.

        If there is any exception while deleting a single object, the process

        will continue for any other objects in the container and the failed

        objects will be tried again the next time this function is called with

        the same parameters.

        If there is any exception while listing the objects for deletion, the

        process will stop (but will obviously be tried again the next time this

        function is called with the same parameters). This is a possibility

        since the listing comes from querying just the primary remote container

        server.

        Once all objects have been attempted to be deleted, the container

        itself will be attempted to be deleted by sending a delete request to

        all container nodes. The format of the delete request is such that each

        container server will update a corresponding account server, removing

        the container from the account's listing.

        This function returns nothing and should raise no exception but only

        update various self.stats_* values for what occurs.

        :param account: The name of the account for the container.

        :param account_partition: The partition for the account on the account

                                  ring.

        :param account_nodes: The primary node dicts for the account.

        :param container: The name of the container to delete.

        * See also: :func:`swift.common.ring.Ring.get_nodes` for a description

          of the account node dicts.

        """

        account_nodes = list(account_nodes)

        part, nodes = self.get_container_ring().get_nodes(account, container)

        node = nodes[-1]

        pool = GreenPool(size=self.object_concurrency)

        marker = ''

        while True:

            objects = None

            try:

                objects = direct_get_container(

                    node, part, account, container,

                    marker=marker,

                    conn_timeout=self.conn_timeout,

                    response_timeout=self.node_timeout)[1]

                self.stats_return_codes[2] = \

                    self.stats_return_codes.get(2, 0) + 1

                self.logger.increment('return_codes.2')

            except ClientException as err:

                if self.logger.getEffectiveLevel() <= DEBUG:

                    self.logger.exception(

                        _('Exception with %(ip)s:%(port)s/%(device)s'), node)

                self.stats_return_codes[err.http_status / 100] = \

                    self.stats_return_codes.get(err.http_status / 100, 0) + 1

                self.logger.increment(

                    'return_codes.%d' % (err.http_status / 100,))

            if not objects:

                break

            try:

                for obj in objects:

                    if isinstance(obj['name'], unicode):

                        obj['name'] = obj['name'].encode('utf8')

                    pool.spawn(self.reap_object, account, container, part,

                               nodes, obj['name'])

                pool.waitall()

            except (Exception, Timeout):

                self.logger.exception(_('Exception with objects for container '

                                        '%(container)s for account %(account)s'

                                        ),

                                      {'container': container,

                                       'account': account})

            marker = objects[-1]['name']

            if marker == '':

                break

        successes = 0

        failures = 0

        for node in nodes:

            anode = account_nodes.pop()

            try:

                direct_delete_container(

                    node, part, account, container,

                    conn_timeout=self.conn_timeout,

                    response_timeout=self.node_timeout,

                    headers={'X-Account-Host': '%(ip)s:%(port)s' % anode,

                             'X-Account-Partition': str(account_partition),

                             'X-Account-Device': anode['device'],

                             'X-Account-Override-Deleted': 'yes'})

                successes += 1

                self.stats_return_codes[2] = \

                    self.stats_return_codes.get(2, 0) + 1

                self.logger.increment('return_codes.2')

            except ClientException as err:

                if self.logger.getEffectiveLevel() <= DEBUG:

                    self.logger.exception(

                        _('Exception with %(ip)s:%(port)s/%(device)s'), node)

                failures += 1

                self.logger.increment('containers_failures')

                self.stats_return_codes[err.http_status / 100] = \

                    self.stats_return_codes.get(err.http_status / 100, 0) + 1

                self.logger.increment(

                    'return_codes.%d' % (err.http_status / 100,))

        if successes > failures:

            self.stats_containers_deleted += 1

            self.logger.increment('containers_deleted')

        elif not successes:

            self.stats_containers_remaining += 1

            self.logger.increment('containers_remaining')

        else:

            self.stats_containers_possibly_remaining += 1

            self.logger.increment('containers_possibly_remaining')

**** CubicPower OpenStack Study ****

    def reap_object(self, account, container, container_partition,

                    container_nodes, obj):

        """

        Deletes the given object by issuing a delete request to each node for

        the object. The format of the delete request is such that each object

        server will update a corresponding container server, removing the

        object from the container's listing.

        This function returns nothing and should raise no exception but only

        update various self.stats_* values for what occurs.

        :param account: The name of the account for the object.

        :param container: The name of the container for the object.

        :param container_partition: The partition for the container on the

                                    container ring.

        :param container_nodes: The primary node dicts for the container.

        :param obj: The name of the object to delete.

        * See also: :func:`swift.common.ring.Ring.get_nodes` for a description

          of the container node dicts.

        """

        container_nodes = list(container_nodes)

        part, nodes = self.get_object_ring().get_nodes(account, container, obj)

        successes = 0

        failures = 0

        for node in nodes:

            cnode = container_nodes.pop()

            try:

                direct_delete_object(

                    node, part, account, container, obj,

                    conn_timeout=self.conn_timeout,

                    response_timeout=self.node_timeout,

                    headers={'X-Container-Host': '%(ip)s:%(port)s' % cnode,

                             'X-Container-Partition': str(container_partition),

                             'X-Container-Device': cnode['device']})

                successes += 1

                self.stats_return_codes[2] = \

                    self.stats_return_codes.get(2, 0) + 1

                self.logger.increment('return_codes.2')

            except ClientException as err:

                if self.logger.getEffectiveLevel() <= DEBUG:

                    self.logger.exception(

                        _('Exception with %(ip)s:%(port)s/%(device)s'), node)

                failures += 1

                self.logger.increment('objects_failures')

                self.stats_return_codes[err.http_status / 100] = \

                    self.stats_return_codes.get(err.http_status / 100, 0) + 1

                self.logger.increment(

                    'return_codes.%d' % (err.http_status / 100,))

            if successes > failures:

                self.stats_objects_deleted += 1

                self.logger.increment('objects_deleted')

            elif not successes:

                self.stats_objects_remaining += 1

                self.logger.increment('objects_remaining')

            else:

                self.stats_objects_possibly_remaining += 1

                self.logger.increment('objects_possibly_remaining')