¡@

Home 

OpenStack Study: manager.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright (c) 2010-2012 OpenStack Foundation

#

# Licensed under the Apache License, Version 2.0 (the "License");

# you may not use this file except in compliance with the License.

# You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or

# implied.

# See the License for the specific language governing permissions and

# limitations under the License.

import functools

import errno

import os

import resource

import signal

import time

import subprocess

import re

from swift import gettext_ as _

from swift.common.utils import search_tree, remove_file, write_file

SWIFT_DIR = '/etc/swift'

RUN_DIR = '/var/run/swift'

# auth-server has been removed from ALL_SERVERS, start it explicitly

ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor',

'container-replicator', 'container-server', 'container-sync',

'container-updater', 'object-auditor', 'object-server',

'object-expirer', 'object-replicator', 'object-updater',

'proxy-server', 'account-replicator', 'account-reaper']

MAIN_SERVERS = ['proxy-server', 'account-server', 'container-server',

'object-server']

REST_SERVERS = [s for s in ALL_SERVERS if s not in MAIN_SERVERS]

GRACEFUL_SHUTDOWN_SERVERS = MAIN_SERVERS + ['auth-server']

START_ONCE_SERVERS = REST_SERVERS

# These are servers that match a type (account-*, container-*, object-*) but

# don't use that type-server.conf file and instead use their own.

STANDALONE_SERVERS = ['object-expirer']

KILL_WAIT = 15 # seconds to wait for servers to die (by default)

WARNING_WAIT = 3 # seconds to wait after message that may just be a warning

MAX_DESCRIPTORS = 32768

MAX_MEMORY = (1024 * 1024 * 1024) * 2 # 2 GB

MAX_PROCS = 8192 # workers * disks * threads_per_disk, can get high

**** CubicPower OpenStack Study ****

def setup_env():

    """Try to increase resource limits of the OS. Move PYTHON_EGG_CACHE to /tmp

    """

    try:

        resource.setrlimit(resource.RLIMIT_NOFILE,

                           (MAX_DESCRIPTORS, MAX_DESCRIPTORS))

    except ValueError:

        print _("WARNING: Unable to modify file descriptor limit.  "

                "Running as non-root?")

    try:

        resource.setrlimit(resource.RLIMIT_DATA,

                           (MAX_MEMORY, MAX_MEMORY))

    except ValueError:

        print _("WARNING: Unable to modify memory limit.  "

                "Running as non-root?")

    try:

        resource.setrlimit(resource.RLIMIT_NPROC,

                           (MAX_PROCS, MAX_PROCS))

    except ValueError:

        print _("WARNING: Unable to modify max process limit.  "

                "Running as non-root?")

    # Set PYTHON_EGG_CACHE if it isn't already set

    os.environ.setdefault('PYTHON_EGG_CACHE', '/tmp')

**** CubicPower OpenStack Study ****

def command(func):

    """

    Decorator to declare which methods are accessible as commands, commands

    always return 1 or 0, where 0 should indicate success.

    :param func: function to make public

    """

    func.publicly_accessible = True

    @functools.wraps(func)

    def wrapped(*a, **kw):

        rv = func(*a, **kw)

        return 1 if rv else 0

    return wrapped

**** CubicPower OpenStack Study ****

 def wrapped(*a, **kw):

        rv = func(*a, **kw)

        return 1 if rv else 0

    return wrapped

**** CubicPower OpenStack Study ****

def watch_server_pids(server_pids, interval=1, **kwargs):

    """Monitor a collection of server pids yielding back those pids that

    aren't responding to signals.

    :param server_pids: a dict, lists of pids [int,...] keyed on

                        Server objects

    """

    status = {}

    start = time.time()

    end = start + interval

    server_pids = dict(server_pids)  # make a copy

    while True:

        for server, pids in server_pids.items():

            for pid in pids:

                try:

                    # let pid stop if it wants to

                    os.waitpid(pid, os.WNOHANG)

                except OSError as e:

                    if e.errno not in (errno.ECHILD, errno.ESRCH):

                        raise  # else no such child/process

            # check running pids for server

            status[server] = server.get_running_pids(**kwargs)

            for pid in pids:

                # original pids no longer in running pids!

                if pid not in status[server]:

                    yield server, pid

            # update active pids list using running_pids

            server_pids[server] = status[server]

        if not [p for server, pids in status.items() for p in pids]:

            # no more running pids

            break

        if time.time() > end:

            break

        else:

            time.sleep(0.1)

**** CubicPower OpenStack Study ****

class UnknownCommandError(Exception):

pass

**** CubicPower OpenStack Study ****

class Manager(object):

"""Main class for performing commands on groups of servers.

:param servers: list of server names as strings

"""

**** CubicPower OpenStack Study ****

    def __init__(self, servers, run_dir=RUN_DIR):

        server_names = set()

        for server in servers:

            if server == 'all':

                server_names.update(ALL_SERVERS)

            elif server == 'main':

                server_names.update(MAIN_SERVERS)

            elif server == 'rest':

                server_names.update(REST_SERVERS)

            elif '*' in server:

                # convert glob to regex

                server_names.update([s for s in ALL_SERVERS if

                                     re.match(server.replace('*', '.*'), s)])

            else:

                server_names.add(server)

        self.servers = set()

        for name in server_names:

            self.servers.add(Server(name, run_dir))

**** CubicPower OpenStack Study ****

    def __iter__(self):

        return iter(self.servers)

    @command

**** CubicPower OpenStack Study ****

    def status(self, **kwargs):

        """display status of tracked pids for server

        """

        status = 0

        for server in self.servers:

            status += server.status(**kwargs)

        return status

    @command

**** CubicPower OpenStack Study ****

    def start(self, **kwargs):

        """starts a server

        """

        setup_env()

        status = 0

        for server in self.servers:

            server.launch(**kwargs)

        if not kwargs.get('daemon', True):

            for server in self.servers:

                try:

                    status += server.interact(**kwargs)

                except KeyboardInterrupt:

                    print _('\nuser quit')

                    self.stop(**kwargs)

                    break

        elif kwargs.get('wait', True):

            for server in self.servers:

                status += server.wait(**kwargs)

        return status

    @command

**** CubicPower OpenStack Study ****

    def no_wait(self, **kwargs):

        """spawn server and return immediately

        """

        kwargs['wait'] = False

        return self.start(**kwargs)

    @command

**** CubicPower OpenStack Study ****

    def no_daemon(self, **kwargs):

        """start a server interactively

        """

        kwargs['daemon'] = False

        return self.start(**kwargs)

    @command

**** CubicPower OpenStack Study ****

    def once(self, **kwargs):

        """start server and run one pass on supporting daemons

        """

        kwargs['once'] = True

        return self.start(**kwargs)

    @command

**** CubicPower OpenStack Study ****

    def stop(self, **kwargs):

        """stops a server

        """

        server_pids = {}

        for server in self.servers:

            signaled_pids = server.stop(**kwargs)

            if not signaled_pids:

                print _('No %s running') % server

            else:

                server_pids[server] = signaled_pids

        # all signaled_pids, i.e. list(itertools.chain(*server_pids.values()))

        signaled_pids = [p for server, pids in server_pids.items()

                         for p in pids]

        # keep track of the pids yeiled back as killed for all servers

        killed_pids = set()

        kill_wait = kwargs.get('kill_wait', KILL_WAIT)

        for server, killed_pid in watch_server_pids(server_pids,

                                                    interval=kill_wait,

                                                    **kwargs):

            print _("%s (%s) appears to have stopped") % (server, killed_pid)

            killed_pids.add(killed_pid)

            if not killed_pids.symmetric_difference(signaled_pids):

                # all proccesses have been stopped

                return 0

        # reached interval n watch_pids w/o killing all servers

        for server, pids in server_pids.items():

            if not killed_pids.issuperset(pids):

                # some pids of this server were not killed

                print _('Waited %s seconds for %s to die; giving up') % (

                    kill_wait, server)

        return 1

    @command

**** CubicPower OpenStack Study ****

    def kill(self, **kwargs):

        """stop a server (no error if not running)

        """

        status = self.stop(**kwargs)

        kwargs['quiet'] = True

        if status and not self.status(**kwargs):

            # only exit error if the server is still running

            return status

        return 0

    @command

**** CubicPower OpenStack Study ****

    def shutdown(self, **kwargs):

        """allow current requests to finish on supporting servers

        """

        kwargs['graceful'] = True

        status = 0

        status += self.stop(**kwargs)

        return status

    @command

**** CubicPower OpenStack Study ****

    def restart(self, **kwargs):

        """stops then restarts server

        """

        status = 0

        status += self.stop(**kwargs)

        status += self.start(**kwargs)

        return status

    @command

**** CubicPower OpenStack Study ****

    def reload(self, **kwargs):

        """graceful shutdown then restart on supporting servers

        """

        kwargs['graceful'] = True

        status = 0

        for server in self.servers:

            m = Manager([server.server])

            status += m.stop(**kwargs)

            status += m.start(**kwargs)

        return status

    @command

**** CubicPower OpenStack Study ****

    def force_reload(self, **kwargs):

        """alias for reload

        """

        return self.reload(**kwargs)

**** CubicPower OpenStack Study ****

    def get_command(self, cmd):

        """Find and return the decorated method named like cmd

        :param cmd: the command to get, a string, if not found raises

                    UnknownCommandError

        """

        cmd = cmd.lower().replace('-', '_')

        try:

            f = getattr(self, cmd)

        except AttributeError:

            raise UnknownCommandError(cmd)

        if not hasattr(f, 'publicly_accessible'):

            raise UnknownCommandError(cmd)

        return f

    @classmethod

**** CubicPower OpenStack Study ****

    def list_commands(cls):

        """Get all publicly accessible commands

        :returns: a list of string tuples (cmd, help), the method names who are

                  decorated as commands

        """

        get_method = lambda cmd: getattr(cls, cmd)

        return sorted([(x.replace('_', '-'), get_method(x).__doc__.strip())

                       for x in dir(cls) if

                       getattr(get_method(x), 'publicly_accessible', False)])

**** CubicPower OpenStack Study ****

    def run_command(self, cmd, **kwargs):

        """Find the named command and run it

        :param cmd: the command name to run

        """

        f = self.get_command(cmd)

        return f(**kwargs)

**** CubicPower OpenStack Study ****

class Server(object):

"""Manage operations on a server or group of servers of similar type

:param server: name of server

"""

**** CubicPower OpenStack Study ****

    def __init__(self, server, run_dir=RUN_DIR):

        if '-' not in server:

            server = '%s-server' % server

        self.server = server.lower()

        self.type = server.rsplit('-', 1)[0]

        self.cmd = 'swift-%s' % server

        self.procs = []

        self.run_dir = run_dir

**** CubicPower OpenStack Study ****

    def __str__(self):

        return self.server

**** CubicPower OpenStack Study ****

    def __repr__(self):

        return "%s(%s)" % (self.__class__.__name__, repr(str(self)))

**** CubicPower OpenStack Study ****

    def __hash__(self):

        return hash(str(self))

**** CubicPower OpenStack Study ****

    def __eq__(self, other):

        try:

            return self.server == other.server

        except AttributeError:

            return False

**** CubicPower OpenStack Study ****

    def get_pid_file_name(self, conf_file):

        """Translate conf_file to a corresponding pid_file

        :param conf_file: an conf_file for this server, a string

        :returns: the pid_file for this conf_file

        """

        return conf_file.replace(

            os.path.normpath(SWIFT_DIR), self.run_dir, 1).replace(

                '%s-server' % self.type, self.server, 1).replace(

                    '.conf', '.pid', 1)

**** CubicPower OpenStack Study ****

    def get_conf_file_name(self, pid_file):

        """Translate pid_file to a corresponding conf_file

        :param pid_file: a pid_file for this server, a string

        :returns: the conf_file for this pid_file

        """

        if self.server in STANDALONE_SERVERS:

            return pid_file.replace(

                os.path.normpath(self.run_dir), SWIFT_DIR, 1).replace(

                    '.pid', '.conf', 1)

        else:

            return pid_file.replace(

                os.path.normpath(self.run_dir), SWIFT_DIR, 1).replace(

                    self.server, '%s-server' % self.type, 1).replace(

                        '.pid', '.conf', 1)

**** CubicPower OpenStack Study ****

    def conf_files(self, **kwargs):

        """Get conf files for this server

        :param: number, if supplied will only lookup the nth server

        :returns: list of conf files

        """

        if self.server in STANDALONE_SERVERS:

            found_conf_files = search_tree(SWIFT_DIR, self.server + '*',

                                           '.conf', dir_ext='.conf.d')

        else:

            found_conf_files = search_tree(SWIFT_DIR, '%s-server*' % self.type,

                                           '.conf', dir_ext='.conf.d')

        number = kwargs.get('number')

        if number:

            try:

                conf_files = [found_conf_files[number - 1]]

            except IndexError:

                conf_files = []

        else:

            conf_files = found_conf_files

        if not conf_files:

            # maybe there's a config file(s) out there, but I couldn't find it!

            if not kwargs.get('quiet'):

                print _('Unable to locate config %sfor %s') % (

                    ('number %s ' % number if number else ''), self.server)

            if kwargs.get('verbose') and not kwargs.get('quiet'):

                if found_conf_files:

                    print _('Found configs:')

                for i, conf_file in enumerate(found_conf_files):

                    print '  %d) %s' % (i + 1, conf_file)

        return conf_files

**** CubicPower OpenStack Study ****

    def pid_files(self, **kwargs):

        """Get pid files for this server

        :param: number, if supplied will only lookup the nth server

        :returns: list of pid files

        """

        pid_files = search_tree(self.run_dir, '%s*' % self.server)

        if kwargs.get('number', 0):

            conf_files = self.conf_files(**kwargs)

            # filter pid_files to match the index of numbered conf_file

            pid_files = [pid_file for pid_file in pid_files if

                         self.get_conf_file_name(pid_file) in conf_files]

        return pid_files

**** CubicPower OpenStack Study ****

    def iter_pid_files(self, **kwargs):

        """Generator, yields (pid_file, pids)

        """

        for pid_file in self.pid_files(**kwargs):

            yield pid_file, int(open(pid_file).read().strip())

**** CubicPower OpenStack Study ****

    def signal_pids(self, sig, **kwargs):

        """Send a signal to pids for this server

        :param sig: signal to send

        :returns: a dict mapping pids (ints) to pid_files (paths)

        """

        pids = {}

        for pid_file, pid in self.iter_pid_files(**kwargs):

            try:

                if sig != signal.SIG_DFL:

                    print _('Signal %s  pid: %s  signal: %s') % (self.server,

                                                                 pid, sig)

                os.kill(pid, sig)

            except OSError as e:

                if e.errno == errno.ESRCH:

                    # pid does not exist

                    if kwargs.get('verbose'):

                        print _("Removing stale pid file %s") % pid_file

                    remove_file(pid_file)

                elif e.errno == errno.EPERM:

                    print _("No permission to signal PID %d") % pid

            else:

                # process exists

                pids[pid] = pid_file

        return pids

**** CubicPower OpenStack Study ****

    def get_running_pids(self, **kwargs):

        """Get running pids

        :returns: a dict mapping pids (ints) to pid_files (paths)

        """

        return self.signal_pids(signal.SIG_DFL, **kwargs)  # send noop

**** CubicPower OpenStack Study ****

    def kill_running_pids(self, **kwargs):

        """Kill running pids

        :param graceful: if True, attempt SIGHUP on supporting servers

        :returns: a dict mapping pids (ints) to pid_files (paths)

        """

        graceful = kwargs.get('graceful')

        if graceful and self.server in GRACEFUL_SHUTDOWN_SERVERS:

            sig = signal.SIGHUP

        else:

            sig = signal.SIGTERM

        return self.signal_pids(sig, **kwargs)

**** CubicPower OpenStack Study ****

    def status(self, pids=None, **kwargs):

        """Display status of server

        :param: pids, if not supplied pids will be populated automatically

        :param: number, if supplied will only lookup the nth server

        :returns: 1 if server is not running, 0 otherwise

        """

        if pids is None:

            pids = self.get_running_pids(**kwargs)

        if not pids:

            number = kwargs.get('number', 0)

            if number:

                kwargs['quiet'] = True

                conf_files = self.conf_files(**kwargs)

                if conf_files:

                    print _("%s #%d not running (%s)") % (self.server, number,

                                                          conf_files[0])

            else:

                print _("No %s running") % self.server

            return 1

        for pid, pid_file in pids.items():

            conf_file = self.get_conf_file_name(pid_file)

            print _("%s running (%s - %s)") % (self.server, pid, conf_file)

        return 0

**** CubicPower OpenStack Study ****

    def spawn(self, conf_file, once=False, wait=True, daemon=True, **kwargs):

        """Launch a subprocess for this server.

        :param conf_file: path to conf_file to use as first arg

        :param once: boolean, add once argument to command

        :param wait: boolean, if true capture stdout with a pipe

        :param daemon: boolean, if false ask server to log to console

        :returns : the pid of the spawned process

        """

        args = [self.cmd, conf_file]

        if once:

            args.append('once')

        if not daemon:

            # ask the server to log to console

            args.append('verbose')

        # figure out what we're going to do with stdio

        if not daemon:

            # do nothing, this process is open until the spawns close anyway

            re_out = None

            re_err = None

        else:

            re_err = subprocess.STDOUT

            if wait:

                # we're going to need to block on this...

                re_out = subprocess.PIPE

            else:

                re_out = open(os.devnull, 'w+b')

        proc = subprocess.Popen(args, stdout=re_out, stderr=re_err)

        pid_file = self.get_pid_file_name(conf_file)

        write_file(pid_file, proc.pid)

        self.procs.append(proc)

        return proc.pid

**** CubicPower OpenStack Study ****

    def wait(self, **kwargs):

        """

        wait on spawned procs to start

        """

        status = 0

        for proc in self.procs:

            # wait for process to close its stdout

            output = proc.stdout.read()

            if kwargs.get('once', False):

                # if you don't want once to wait you can send it to the

                # background on the command line, I generally just run with

                # no-daemon anyway, but this is quieter

                proc.wait()

            if output:

                print output

                start = time.time()

                # wait for process to die (output may just be a warning)

                while time.time() - start < WARNING_WAIT:

                    time.sleep(0.1)

                    if proc.poll() is not None:

                        status += proc.returncode

                        break

        return status

**** CubicPower OpenStack Study ****

    def interact(self, **kwargs):

        """

        wait on spawned procs to terminate

        """

        status = 0

        for proc in self.procs:

            # wait for process to terminate

            proc.communicate()

            if proc.returncode:

                status += 1

        return status

**** CubicPower OpenStack Study ****

    def launch(self, **kwargs):

        """

        Collect conf files and attempt to spawn the processes for this server

        """

        conf_files = self.conf_files(**kwargs)

        if not conf_files:

            return []

        pids = self.get_running_pids(**kwargs)

        already_started = False

        for pid, pid_file in pids.items():

            conf_file = self.get_conf_file_name(pid_file)

            # for legacy compat you can't start other servers if one server is

            # already running (unless -n specifies which one you want), this

            # restriction could potentially be lifted, and launch could start

            # any unstarted instances

            if conf_file in conf_files:

                already_started = True

                print _("%s running (%s - %s)") % (self.server, pid, conf_file)

            elif not kwargs.get('number', 0):

                already_started = True

                print _("%s running (%s - %s)") % (self.server, pid, pid_file)

        if already_started:

            print _("%s already started...") % self.server

            return []

        if self.server not in START_ONCE_SERVERS:

            kwargs['once'] = False

        pids = {}

        for conf_file in conf_files:

            if kwargs.get('once'):

                msg = _('Running %s once') % self.server

            else:

                msg = _('Starting %s') % self.server

            print '%s...(%s)' % (msg, conf_file)

            try:

                pid = self.spawn(conf_file, **kwargs)

            except OSError as e:

                if e.errno == errno.ENOENT:

                    #TODO(clayg): should I check if self.cmd exists earlier?

                    print _("%s does not exist") % self.cmd

                    break

                else:

                    raise

            pids[pid] = conf_file

        return pids

**** CubicPower OpenStack Study ****

    def stop(self, **kwargs):

        """Send stop signals to pids for this server

        :returns: a dict mapping pids (ints) to pid_files (paths)

        """

        return self.kill_running_pids(**kwargs)