Add initial meta-stx to support StarlingX build
[pti/rtp.git] / meta-stx / recipes-extended / ceph / files / mgr-restful-plugin.py
diff --git a/meta-stx/recipes-extended/ceph/files/mgr-restful-plugin.py b/meta-stx/recipes-extended/ceph/files/mgr-restful-plugin.py
new file mode 100644 (file)
index 0000000..d1f14b8
--- /dev/null
@@ -0,0 +1,1121 @@
+#!/usr/bin/python
+#
+# Copyright (c) 2019 Wind River Systems, Inc.
+#
+# SPDX-License-Identifier: Apache-2.0
+#
+
+
+### BEGIN INIT INFO
+# Provides:          ceph/mgr RESTful API plugin
+# Required-Start:    $ceph
+# Required-Stop:     $ceph
+# Default-Start:     2 3 4 5
+# Default-Stop:      0 1 6
+# Short-Description: Ceph MGR RESTful API plugin
+# Description:       Ceph MGR RESTful API plugin
+### END INIT INFO
+
+import argparse
+import contextlib
+import errno
+import fcntl
+import inspect
+import json
+import logging
+import multiprocessing
+import os
+import shutil
+import signal
+import socket
+import subprocess
+import sys
+import tempfile
+import time
+
+import daemon
+import psutil
+import requests
+
+# 'timeout' command returns exit status 124
+# if command times out (see man page)
+GNU_TIMEOUT_EXPIRED_RETCODE = 124
+
+
+def psutil_terminate_kill(target, timeout):
+
+    """Extend psutil functionality to stop a process.
+
+       SIGINT is sent to each target then after a grace period SIGKILL
+       is sent to the ones that are still running.
+    """
+
+    if not isinstance(target, list):
+        target = [target]
+    _, target = psutil.wait_procs(target, timeout=0)
+    for action in [lambda p: p.terminate(), lambda p: p.kill()]:
+        for proc in target:
+            action(proc)
+        _, target = psutil.wait_procs(
+            target, timeout=timeout)
+
+
+class Config(object):
+
+    """ceph-mgr service wrapper configuration options.
+
+        In the future we may want to load them from a configuration file
+        (for example /etc/ceph/mgr-restful-plugin.conf )
+    """
+
+    def __init__(self):
+        self.log_level = logging.INFO
+        self.log_dir = '/var/log'
+
+        self.ceph_mgr_service = '/usr/bin/ceph-mgr'
+        self.ceph_mgr_config = '/etc/ceph/ceph.conf'
+        self.ceph_mgr_cluster = 'ceph'
+        self.ceph_mgr_rundir = '/var/run/ceph/mgr'
+        self.ceph_mgr_confdir = '/var/lib/ceph/mgr'
+        self.ceph_mgr_identity = socket.gethostname()
+
+        self.service_name = 'mgr-restful-plugin'
+        self.service_socket = os.path.join(
+            self.ceph_mgr_rundir, '{}.socket'.format(self.service_name))
+        self.service_lock = os.path.join(
+            self.ceph_mgr_rundir, '{}.lock'.format(self.service_name))
+        self.service_pid_file = os.path.join(
+            '/var/run/ceph', '{}.pid'.format(self.service_name))
+
+        self.restful_plugin_port = 5001
+
+        # maximum size of a message received/sent via
+        # service monitor control socket
+        self.service_socket_bufsize = 1024
+
+        # maximum time to wait for ceph cli to exit
+        self.ceph_cli_timeout_sec = 30
+
+        # how much time to wait after ceph cli commands fail with timeout
+        # before running any other commands
+        self.cluster_grace_period_sec = 30
+
+        # after ceph-mgr is started it goes through an internal initialization
+        # phase before; how much time to wait before querying ceph-mgr
+        self.ceph_mgr_grace_period_sec = 15
+
+        # after sending SIGTERM to ceph-mgr how much time to wait before
+        # sending SIGKILL (maximum time allowed for ceph-mgr cleanup)
+        self.ceph_mgr_kill_delay_sec = 5
+
+        # if service monitor is running a recovery procedure it reports
+        # status OK even if ceph-mgr is currently down. This sets the
+        # maximum number of consecutive ceph-mgr failures before reporting
+        # status error
+        self.ceph_mgr_fail_count_report_error = 3
+
+        # maximum number of consecutive ceph-mgr failures before
+        # stopping mgr-restful-plugin service
+        self.ceph_mgr_fail_count_exit = 5
+
+        # maximum time allowed for ceph-mgr to respond to a REST API request
+        self.rest_api_timeout_sec = 15
+
+        # interval between consecutive REST API requests (ping's). A smaller
+        # value here triggers more requests to ceph-mgr restful plugin. A
+        # higher value makes recovery slower when services become unavailable
+        self.restful_plugin_ping_delay_sec = 3
+
+        # where to save the self-signed certificate generated by ceph-mgr
+        self.restful_plugin_cert_path = os.path.join(
+            self.ceph_mgr_rundir, 'restful.crt')
+
+        # time to wait after enabling restful plugin
+        self.restful_plugin_grace_period_sec = 3
+
+        # after how many REST API ping failures to restart ceph-mgr
+        self.ping_fail_count_restart_mgr = 3
+
+        # after how many REST API ping failures to report status error.
+        # Until then service monitor reports status OK just in case
+        # restful plugin recovers
+        self.ping_fail_count_report_error = 5
+
+    @staticmethod
+    def load():
+        return Config()
+
+
+def setup_logging(name=None, cleanup_handlers=False):
+    if not name:
+        name = CONFIG.service_name
+    log = logging.getLogger(name)
+    log.setLevel(CONFIG.log_level)
+    if cleanup_handlers:
+        try:
+            for handler in log.handlers:
+                if isinstance(handler, logging.StreamHandler):
+                    handler.flush()
+                if isinstance(handler, logging.FileHandler):
+                    handler.close()
+            log.handlers = []
+        except Exception:
+            pass
+    elif log.handlers:
+        return log
+    handler = logging.FileHandler(
+        os.path.join(CONFIG.log_dir,
+                     '{}.log'.format(CONFIG.service_name)))
+    handler.setFormatter(
+        logging.Formatter('%(asctime)s %(process)s %(levelname)s %(name)s %(message)s'))
+    log.addHandler(handler)
+    return log
+
+
+CONFIG = Config.load()
+LOG = setup_logging(name='init-wrapper')
+
+
+class ServiceException(Exception):
+
+    """Generic mgr-restful-plugin service exception.
+
+       Build exception string based on static (per exception class)
+       string plus args, keyword args passed to exception constructor.
+    """
+
+    message = ""
+
+    def __init__(self, *args, **kwargs):
+        if "message" not in kwargs:
+            try:
+                message = self.message.format(*args, **kwargs)
+            except Exception:   # noqa
+                message = '{}, args:{}, kwargs: {}'.format(
+                    self.message, args, kwargs)
+        else:
+            message = kwargs["message"]
+        super(ServiceException, self).__init__(message)
+
+
+class ServiceAlreadyStarted(ServiceException):
+    message = ('Service monitor already started')
+
+
+class ServiceLockFailed(ServiceException):
+    message = ('Unable to lock service monitor: '
+               'reason={reason}')
+
+
+class ServiceNoSocket(ServiceException):
+    message = ('Unable to create service monitor socket: '
+               'reason={reason}')
+
+
+class ServiceSocketBindFailed(ServiceException):
+    message = ('Failed to bind service monitor socket: '
+               'path={path}, reason={reason}')
+
+
+class ServiceNoPidFile(ServiceException):
+    message = ('Failed to update pid file: '
+               'path={path}, reason={reason}')
+
+
+class CommandFailed(ServiceException):
+    message = ('Command failed: command={command}, '
+               'reason={reason}, out={out}')
+
+
+class CommandTimeout(ServiceException):
+    message = ('Command timeout: command={command}, '
+               'timeout={timeout}')
+
+
+class CephMgrStartFailed(ServiceException):
+    message = ('Failed to start ceph_mgr: '
+               'reason={reason}')
+
+
+class CephRestfulPluginFailed(ServiceException):
+    message = ('Failed to start restful plugin: '
+               'reason={reason}')
+
+
+class RestApiPingFailed(ServiceException):
+    message = ('REST API ping failed: '
+               'reason={reason}')
+
+
+class ServiceMonitor(object):
+
+    """Configure and monitor ceph-mgr and restful plugin (Ceph REST API)
+
+       1. process init script service requests: status, stop. Requests are
+          received via a control socket. Stop has priority over whatever
+          the monitor is doing currently. Any ceph command that may be running
+          is terminated/killed. Note that while ceph-mgr and restful plugin
+          configuration is in progress ServiceMonitor reports status OK to
+          avoid being restarted by SM.
+
+       2. configure ceph-mgr and mgr restful plugin: authentication, REST API
+          service port, self signed certificate. This runs as a separate
+          process so it can be stopped when init script requests it.
+
+       3. periodically check (ping) REST API responds to HTTPS requests.
+          Recovery actions are taken if REST API fails to respond: restart
+          ceph-mgr, wait for cluster to become available again.
+    """
+
+    def __init__(self):
+        # process running configuration & REST API ping loop
+        self.monitor = None
+
+        # command socket used by init script
+        self.command = None
+
+        # ceph-mgr process
+        self.ceph_mgr = None
+
+        # consecutive ceph-mgr/restful-plugin start failures. Service monitor
+        # reports failure after CONFIG.ceph_mgr_max_failure_count
+        self.ceph_mgr_failure_count = 0
+
+        # consecutive REST API ping failures. ceph-mgr service is restarted
+        # after CONFIG.ping_fail_count_restart_mgr threshold is exceeded
+        self.ping_failure_count = 0
+
+        # REST API url reported by ceph-mgr after enabling restful plugin
+        self.restful_plugin_url = ''
+
+        # REST API self signed certificate generated by restful plugin
+        self.certificate = ''
+
+    def run(self):
+        self.disable_certificate_check()
+        with self.service_lock(), self.service_socket(), \
+                self.service_pid_file():
+            self.start_monitor()
+            self.server_loop()
+
+    def disable_certificate_check(self):
+        # ceph-mgr restful plugin is configured with a self-signed
+        # certificate. Certificate host is hard-coded to "ceph-restful"
+        # which causes HTTPS requests to fail because they don't
+        # match current host name ("controller-..."). Disable HTTPS
+        # certificates check in urllib3
+        LOG.warning('Disable urllib3 certifcates check')
+        requests.packages.urllib3.disable_warnings()
+
+    def server_loop(self):
+        self.command.listen(2)
+        while True:
+            try:
+                client, _ = self.command.accept()
+                request = client.recv(CONFIG.service_socket_bufsize)
+                LOG.debug('Monitor command socket: request=%s', str(request))
+                cmd = request.split(' ')
+                cmd, args = cmd[0], cmd[1:]
+                if cmd == 'status':
+                    self.send_response(client, request, self.status())
+                elif cmd == 'stop':
+                    self.stop()
+                    self.send_response(client, request, 'OK')
+                    break
+                elif cmd == 'restful-url':
+                    try:
+                        self.restful_plugin_url = args[0]
+                        self.send_response(client, request, 'OK')
+                    except IndexError:
+                        LOG.warning('Failed to update restful plugin url: '
+                                    'args=%s', str(args))
+                        self.send_response(client, request, 'ERR')
+                elif cmd == 'certificate':
+                    try:
+                        self.certificate = args[0] if args else ''
+                        self.send_response(client, request, 'OK')
+                    except IndexError:
+                        LOG.warning('Failed to update certificate path: '
+                                    'args=%s', str(args))
+                        self.send_response(client, request, 'ERR')
+                elif cmd == 'ceph-mgr-failures':
+                    try:
+                        self.ceph_mgr_failure_count = int(args[0])
+                        self.send_response(client, request, 'OK')
+                        if self.ceph_mgr_failure_count >= CONFIG.ceph_mgr_fail_count_exit:
+                            self.stop()
+                            break
+                    except (IndexError, ValueError):
+                        LOG.warning('Failed to update ceph-mgr failures: '
+                                    'args=%s', str(args))
+                        self.send_response(client, request, 'ERR')
+                elif cmd == 'ping-failures':
+                    try:
+                        self.ping_failure_count = int(args[0])
+                        self.send_response(client, request, 'OK')
+                    except (IndexError, ValueError):
+                        LOG.warning('Failed to update ping failures: '
+                                    'args=%s', str(args))
+                        self.send_response(client, request, 'ERR')
+            except Exception as err:
+                LOG.exception(err)
+
+    @staticmethod
+    def send_response(client, request, response):
+        try:
+            client.send(response)
+        except socket.error as err:
+            LOG.warning('Failed to send response back. '
+                        'request=%s, response=%s, reason=%s',
+                        request, response, err)
+
+    def status(self):
+        if not self.restful_plugin_url:
+            if self.ceph_mgr_failure_count < CONFIG.ceph_mgr_fail_count_report_error \
+               and self.ping_failure_count < CONFIG.ping_fail_count_report_error:
+                LOG.debug('Monitor is starting services. Report status OK')
+                return 'OK'
+            LOG.debug('Too many failures: '
+                      'ceph_mgr=%d < %d, ping=%d < %d. '
+                      'Report status ERR',
+                      self.ceph_mgr_failure_count,
+                      CONFIG.ceph_mgr_fail_count_report_error,
+                      self.ping_failure_count,
+                      CONFIG.ping_fail_count_report_error)
+            return 'ERR.down'
+        try:
+            self.restful_plugin_ping()
+            LOG.debug('Restful plugin ping successful. Report status OK')
+            return 'OK'
+        except (CommandFailed, RestApiPingFailed):
+            if self.ceph_mgr_failure_count < CONFIG.ceph_mgr_fail_count_report_error \
+               and self.ping_failure_count < CONFIG.ping_fail_count_report_error:
+                LOG.info('Restful plugin does not respond but failure '
+                         'count is within acceptable limits: '
+                         ' ceph_mgr=%d < %d, ping=%d < %d. '
+                         'Report status OK',
+                         self.ceph_mgr_failure_count,
+                         CONFIG.ceph_mgr_fail_count_report_error,
+                         self.ping_failure_count,
+                         CONFIG.ping_fail_count_report_error)
+                return 'OK'
+            LOG.debug('Restful does not respond (ping failure count %d). '
+                      'Report status ERR', self.ping_failure_count)
+            return 'ERR.ping_failed'
+
+    def stop(self):
+        if not self.monitor:
+            return
+        LOG.info('Stop monitor with SIGTERM to process group %d',
+                 self.monitor.pid)
+        try:
+            os.killpg(self.monitor.pid, signal.SIGTERM)
+        except OSError as err:
+            LOG.info('Stop monitor failed: reason=%s', str(err))
+            return
+        time.sleep(CONFIG.ceph_mgr_kill_delay_sec)
+        LOG.info('Stop monitor with SIGKILL to process group %d',
+                 self.monitor.pid)
+        try:
+            os.killpg(self.monitor.pid, signal.SIGKILL)
+            os.waitpid(self.monitor.pid, 0)
+        except OSError as err:
+            LOG.info('Stop monitor failed: reason=%s', str(err))
+            return
+        LOG.info('Monitor stopped: pid=%d', self.monitor.pid)
+
+    @contextlib.contextmanager
+    def service_lock(self):
+        LOG.info('Take service lock: path=%s', CONFIG.service_lock)
+        try:
+            os.makedirs(os.path.dirname(CONFIG.service_lock))
+        except OSError:
+            pass
+        lock_file = open(CONFIG.service_lock, 'w')
+        try:
+            fcntl.flock(lock_file.fileno(),
+                        fcntl.LOCK_EX | fcntl.LOCK_NB)
+        except (IOError, OSError) as err:
+            if err.errno == errno.EAGAIN:
+                raise ServiceAlreadyStarted()
+            else:
+                raise ServiceLockFailed(reason=str(err))
+        # even if we have the lock here there might be another service manager
+        # running whose CONFIG.ceph_mgr_rundir was removed before starting
+        # this instance. Make sure there is only one service manager running
+        self.stop_other_service_managers()
+        try:
+            yield
+        finally:
+            os.unlink(CONFIG.service_lock)
+            lock_file.close()
+            LOG.info('Release service lock: path=%s', CONFIG.service_lock)
+
+    def stop_other_service_managers(self):
+        service = os.path.join('/etc/init.d', CONFIG.service_name)
+        for p in psutil.process_iter():
+            if p.cmdline()[:2] not in [[service], ['/usr/bin/python', service]]:
+                continue
+            if p.pid == os.getpid():
+                continue
+            p.kill()
+
+    @contextlib.contextmanager
+    def service_socket(self):
+        LOG.info('Create service socket')
+        try:
+            self.command = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+        except socket.error as err:
+            raise ServiceNoSocket(reason=str(err))
+        LOG.info('Remove existing socket files')
+        try:
+            os.unlink(CONFIG.service_socket)
+        except OSError:
+            pass
+        LOG.info('Bind service socket: path=%s', CONFIG.service_socket)
+        try:
+            self.command.bind(CONFIG.service_socket)
+        except socket.error as err:
+            raise ServiceSocketBindFailed(
+                path=CONFIG.service_socket, reason=str(err))
+        try:
+            yield
+        finally:
+            LOG.info('Close service socket and remove file: path=%s',
+                     CONFIG.service_socket)
+            self.command.close()
+            os.unlink(CONFIG.service_socket)
+
+    @contextlib.contextmanager
+    def service_pid_file(self):
+        LOG.info('Update service pid file: path=%s', CONFIG.service_pid_file)
+        try:
+            pid_file = open(CONFIG.service_pid_file, 'w')
+            pid_file.write(str(os.getpid()))
+            pid_file.flush()
+        except OSError as err:
+            raise ServiceNoPidFile(
+                path=CONFIG.service_pid_file, reason=str(err))
+        try:
+            yield
+        finally:
+            LOG.info('Remove service pid file: path=%s',
+                     CONFIG.service_pid_file)
+            try:
+                os.unlink(CONFIG.service_pid_file)
+            except OSError:
+                pass
+
+    def start_monitor(self):
+        LOG.info('Start monitor loop')
+        self.monitor = multiprocessing.Process(target=self.monitor_loop)
+        self.monitor.start()
+
+    def stop_unmanaged_ceph_mgr(self):
+        LOG.info('Stop unmanaged running ceph-mgr processes')
+        service_name = os.path.basename(CONFIG.ceph_mgr_service)
+        if self.ceph_mgr:
+            psutil_terminate_kill(
+                [proc for proc in psutil.process_iter()
+                 if (proc.name() == service_name
+                     and proc.pid != self.ceph_mgr.pid)],
+                CONFIG.ceph_mgr_kill_delay_sec)
+        else:
+            psutil_terminate_kill(
+                [proc for proc in psutil.process_iter()
+                 if proc.name() == service_name],
+                CONFIG.ceph_mgr_kill_delay_sec)
+
+    def monitor_loop(self):
+
+        """Bring up and monitor ceph-mgr restful plugin.
+
+           Steps:
+           - wait for Ceph cluster to become available
+           - configure and start ceph-mgr
+           - configure and enable restful plugin
+           - send periodic requests to REST API
+           - recover from failures
+
+           Note: because this runs as a separate process it
+               must send status updates to service monitor
+               via control socket for: ping_failure_count,
+               restful_plugin_url and certificate.
+        """
+
+        # Promote to process group leader so parent (service monitor)
+        # can kill the monitor plus processes spawned by it. Otherwise
+        # children of monitor_loop() will keep running in background and
+        # will be reaped by init when they finish but by then they might
+        # interfere with any new service instance.
+        os.setpgrp()
+
+        # Ignoring SIGTERM here ensures process group is not reused by
+        # the time parent (service monitor) issues the final SIGKILL.
+        signal.signal(signal.SIGTERM, signal.SIG_IGN)
+
+        while True:
+            try:
+                # steps to configure/start ceph-mgr and restful plugin
+                self.ceph_fsid_get()
+                self.ceph_mgr_auth_create()
+                self.restful_plugin_set_server_port()
+                self.restful_plugin_create_certificate()
+                self.ceph_mgr_start()
+                self.restful_plugin_enable()
+                self.restful_plugin_create_admin_key()
+                self.restful_plugin_get_url()
+                self.restful_plugin_get_certificate()
+
+                # REST API should be available now
+                # start making periodic requests (ping)
+                while True:
+                    try:
+                        self.restful_plugin_ping()
+                        self.ping_failure_count = 0
+                        self.request_update_ping_failures(
+                            self.ping_failure_count)
+                        self.ceph_mgr_failure_count = 0
+                        self.request_update_ceph_mgr_failures(
+                            self.ceph_mgr_failure_count)
+                        time.sleep(CONFIG.restful_plugin_ping_delay_sec)
+                        continue
+                    except RestApiPingFailed as err:
+                        LOG.warning(str(err))
+
+                    LOG.info('REST API ping failure count=%d',
+                             self.ping_failure_count)
+                    self.ping_failure_count += 1
+                    self.request_update_ping_failures(
+                        self.ping_failure_count)
+
+                    # maybe request failed because ceph-mgr is not running
+                    if not self.ceph_mgr_is_running():
+                        self.ceph_mgr_failure_count += 1
+                        self.request_update_ceph_mgr_failures(
+                            self.ceph_mgr_failure_count)
+                        self.ceph_mgr_start()
+                        time.sleep(CONFIG.ceph_mgr_grace_period_sec)
+                        continue
+
+                    # maybe request failed because cluster health is not ok
+                    if not self.ceph_fsid_get():
+                        LOG.info('Unable to get cluster fsid. '
+                                 'Sleep for a while')
+                        time.sleep(CONFIG.cluster_grace_period_sec)
+                        break
+
+                    # too many failures? Restart ceph-mgr and go again
+                    # through configuration steps
+                    if (self.ping_failure_count
+                            % CONFIG.ping_fail_count_restart_mgr == 0):
+                        LOG.info('Too many consecutive REST API failures. '
+                                 'Restart ceph-mgr. Update service '
+                                 'url and certificate')
+                        self.ceph_mgr_stop()
+                        self.restful_plugin_url = ''
+                        self.request_update_plugin_url(self.restful_plugin_url)
+                        self.certificate = ''
+                        self.request_update_certificate(self.certificate)
+                        break
+
+                    time.sleep(CONFIG.restful_plugin_ping_delay_sec)
+
+            except CommandFailed as err:
+                LOG.warning(str(err))
+                time.sleep(CONFIG.cluster_grace_period_sec)
+            except CommandTimeout as err:
+                LOG.warning(str(err))
+            except (CephMgrStartFailed, CephRestfulPluginFailed) as err:
+                LOG.warning(str(err))
+                self.ceph_mgr_failure_count += 1
+                self.request_update_ceph_mgr_failures(
+                    self.ceph_mgr_failure_count)
+                time.sleep(CONFIG.ceph_mgr_grace_period_sec)
+            except Exception as err:
+                LOG.exception(err)
+                time.sleep(CONFIG.cluster_grace_period_sec)
+
+    @staticmethod
+    def run_with_timeout(command, timeout, stderr=subprocess.STDOUT):
+        try:
+            LOG.info('Run command: %s', ' '.join(command))
+            return subprocess.check_output(
+                ['/usr/bin/timeout', str(timeout)] + command,
+                stderr=stderr, shell=False).strip()
+        except subprocess.CalledProcessError as err:
+            if err.returncode == GNU_TIMEOUT_EXPIRED_RETCODE:
+                raise CommandTimeout(command=err.cmd, timeout=timeout)
+            raise CommandFailed(command=err.cmd, reason=str(err),
+                                out=err.output)
+
+    def ceph_fsid_get(self):
+        return self.run_with_timeout(['/usr/bin/ceph', 'fsid'],
+                                     CONFIG.ceph_cli_timeout_sec)
+
+    def ceph_mgr_has_auth(self):
+        path = '{}/ceph-{}'.format(
+            CONFIG.ceph_mgr_confdir, CONFIG.ceph_mgr_identity)
+        try:
+            os.makedirs(path)
+        except OSError as err:
+            pass
+        try:
+            self.run_with_timeout(
+                ['/usr/bin/ceph', 'auth', 'get',
+                 'mgr.{}'.format(CONFIG.ceph_mgr_identity),
+                 '-o', '{}/keyring'.format(path)],
+                CONFIG.ceph_cli_timeout_sec)
+            return True
+        except CommandFailed as err:
+            if 'ENOENT' in str(err):
+                return False
+            raise
+
+    def ceph_mgr_auth_create(self):
+        if self.ceph_mgr_has_auth():
+            return
+        LOG.info('Create ceph-mgr authentication')
+        self.run_with_timeout(
+            ['/usr/bin/ceph', 'auth', 'get-or-create',
+             'mgr.{}'.format(CONFIG.ceph_mgr_identity),
+             'mon', 'allow *', 'osd', 'allow *'],
+            CONFIG.ceph_cli_timeout_sec)
+
+    def ceph_mgr_is_running(self):
+        if not self.ceph_mgr:
+            return None
+        try:
+            self.ceph_mgr.wait(timeout=0)
+        except psutil.TimeoutExpired:
+            return True
+        return False
+
+    def ceph_mgr_start(self):
+        if self.ceph_mgr_is_running():
+            return
+        self.stop_unmanaged_ceph_mgr()
+        LOG.info('Start ceph-mgr daemon')
+        try:
+            with open(os.devnull, 'wb') as null:
+                self.ceph_mgr = psutil.Popen(
+                    [CONFIG.ceph_mgr_service,
+                     '--cluster', CONFIG.ceph_mgr_cluster,
+                     '--conf', CONFIG.ceph_mgr_config,
+                     '--id', CONFIG.ceph_mgr_identity,
+                     '-f'],
+                    close_fds=True,
+                    stdout=null,
+                    stderr=null,
+                    shell=False)
+        except (OSError, ValueError) as err:
+            raise CephMgrStartFailed(reason=str(err))
+        time.sleep(CONFIG.ceph_mgr_grace_period_sec)
+
+    def ceph_mgr_stop(self):
+        if not self.ceph_mgr:
+            return
+        LOG.info('Stop ceph-mgr')
+        psutil_terminate_kill(self.ceph_mgr, CONFIG.ceph_mgr_kill_delay_sec)
+
+    def restful_plugin_has_server_port(self):
+        try:
+            with open(os.devnull, 'wb') as null:
+                out = self.run_with_timeout(
+                    ['/usr/bin/ceph', 'config-key', 'get',
+                     'mgr/restful/server_port'],
+                    CONFIG.ceph_cli_timeout_sec, stderr=null)
+            if out == str(CONFIG.restful_plugin_port):
+                return True
+            LOG.warning('Restful plugin port mismatch: '
+                        'current=%d, expected=%d', out,
+                        CONFIG.restful_plugin_port)
+        except CommandFailed as err:
+            LOG.warning('Failed to get restful plugin port: '
+                        'reason=%s', str(err))
+        return False
+
+    def restful_plugin_set_server_port(self):
+        if self.restful_plugin_has_server_port():
+            return
+        LOG.info('Set restful plugin port=%d', CONFIG.restful_plugin_port)
+        self.run_with_timeout(
+            ['/usr/bin/ceph', 'config-key', 'set',
+             'mgr/restful/server_port', str(CONFIG.restful_plugin_port)],
+            CONFIG.ceph_cli_timeout_sec)
+
+    def restful_plugin_has_admin_key(self):
+        try:
+            self.run_with_timeout(
+                ['/usr/bin/ceph', 'config-key', 'get',
+                 'mgr/restful/keys/admin'],
+                CONFIG.ceph_cli_timeout_sec)
+            return True
+        except CommandFailed:
+            pass
+        return False
+
+    def restful_plugin_create_admin_key(self):
+        if self.restful_plugin_has_admin_key():
+            return
+        LOG.info('Create restful plugin admin key')
+        self.run_with_timeout(
+            ['/usr/bin/ceph', 'restful',
+             'create-key', 'admin'],
+            CONFIG.ceph_cli_timeout_sec)
+
+    def restful_plugin_has_certificate(self):
+        try:
+            self.run_with_timeout(
+                ['/usr/bin/ceph', 'config-key', 'get',
+                 'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)],
+                CONFIG.ceph_cli_timeout_sec)
+            self.run_with_timeout(
+                ['/usr/bin/ceph', 'config-key', 'get',
+                 'mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)],
+                CONFIG.ceph_cli_timeout_sec)
+            self.run_with_timeout(
+                ['/usr/bin/ceph', 'config-key', 'get',
+                 'config/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity)],
+                CONFIG.ceph_cli_timeout_sec)
+            self.run_with_timeout(
+                ['/usr/bin/ceph', 'config-key', 'get',
+                 '/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity)],
+                CONFIG.ceph_cli_timeout_sec)
+            return True
+        except CommandFailed:
+            pass
+        return False
+
+    def restful_plugin_create_certificate(self):
+        if self.restful_plugin_has_certificate():
+            return
+        LOG.info('Create restful plugin self signed certificate')
+        path = tempfile.mkdtemp()
+        try:
+            try:
+                with tempfile.NamedTemporaryFile() as restful_cnf:
+                    restful_cnf.write((
+                        '[req]\n'
+                        'req_extensions = v3_ca\n'
+                        'distinguished_name = req_distinguished_name\n'
+                        '[v3_ca]\n'
+                        'subjectAltName=DNS:{}\n'
+                        'basicConstraints = CA:true\n'
+                        '[ req_distinguished_name ]\n'
+                        '0.organizationName = IT\n'
+                        'commonName = ceph-restful\n').format(
+                            CONFIG.ceph_mgr_identity))
+                    restful_cnf.flush()
+                    subprocess.check_call([
+                        '/usr/bin/openssl', 'req', '-new', '-nodes', '-x509',
+                        '-subj', '/O=IT/CN=' + CONFIG.ceph_mgr_identity,
+                        '-days', '3650',
+                        '-config', restful_cnf.name,
+                        '-out', os.path.join(path, 'crt'),
+                        '-keyout', os.path.join(path, 'key'),
+                        '-extensions', 'v3_ca'])
+            except subprocess.CalledProcessError as err:
+                raise CommandFailed(
+                    command=' '.join(err.cmd),
+                    reason='failed to generate self-signed certificate: {}'.format(str(err)),
+                    out=err.output)
+            self.run_with_timeout(
+                ['/usr/bin/ceph', 'config-key', 'set',
+                 'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity),
+                 '-i', os.path.join(path, 'crt')],
+                CONFIG.ceph_cli_timeout_sec)
+            self.run_with_timeout(
+                ['/usr/bin/ceph', 'config-key', 'set',
+                 'mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity),
+                 '-i', os.path.join(path, 'crt')],
+                CONFIG.ceph_cli_timeout_sec)
+            self.run_with_timeout(
+                ['/usr/bin/ceph', 'config-key', 'set',
+                 'config/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity),
+                 '-i', os.path.join(path, 'key')],
+                CONFIG.ceph_cli_timeout_sec)
+            self.run_with_timeout(
+                ['/usr/bin/ceph', 'config-key', 'set',
+                 'mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity),
+                 '-i', os.path.join(path, 'key')],
+                CONFIG.ceph_cli_timeout_sec)
+        finally:
+            shutil.rmtree(path)
+
+    def restful_plugin_is_enabled(self):
+        command = ['/usr/bin/ceph', 'mgr', 'module', 'ls',
+                   '--format', 'json']
+        with open(os.devnull, 'wb') as null:
+            out = self.run_with_timeout(
+                command, CONFIG.ceph_cli_timeout_sec, stderr=null)
+        try:
+            if 'restful' in json.loads(out)['enabled_modules']:
+                return True
+        except ValueError as err:
+            raise CommandFailed(
+                command=' '.join(command),
+                reason='unable to decode json: {}'.format(err), out=out)
+        except KeyError as err:
+            raise CommandFailed(
+                command=' '.join(command),
+                reason='missing expected key: {}'.format(err), out=out)
+        return False
+
+    def restful_plugin_enable(self):
+        if not self.restful_plugin_is_enabled():
+            LOG.info('Enable restful plugin')
+            self.run_with_timeout(
+                ['/usr/bin/ceph', 'mgr',
+                 'module', 'enable', 'restful'],
+                CONFIG.ceph_cli_timeout_sec)
+        time.sleep(CONFIG.restful_plugin_grace_period_sec)
+
+    def restful_plugin_get_url(self):
+        command = ['/usr/bin/ceph', 'mgr', 'services',
+                   '--format', 'json']
+        with open(os.devnull, 'wb') as null:
+            out = self.run_with_timeout(
+                command, CONFIG.ceph_cli_timeout_sec, stderr=null)
+        try:
+            self.restful_plugin_url = json.loads(out)['restful']
+        except ValueError as err:
+            raise CephRestfulPluginFailed(
+                reason='unable to decode json: {} output={}'.format(err, out))
+        except KeyError as err:
+            raise CephRestfulPluginFailed(
+                reason='missing expected key: {} in ouput={}'.format(err, out))
+        self.request_update_plugin_url(self.restful_plugin_url)
+
+    def restful_plugin_get_certificate(self):
+        command = ['/usr/bin/ceph', 'config-key', 'get',
+                   'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)]
+        with open(os.devnull, 'wb') as null:
+            certificate = self.run_with_timeout(
+                command, CONFIG.ceph_cli_timeout_sec, stderr=null)
+            with open(CONFIG.restful_plugin_cert_path, 'wb') as cert_file:
+                cert_file.write(certificate)
+            self.certificate = CONFIG.restful_plugin_cert_path
+            self.request_update_certificate(
+                self.certificate)
+
+    def restful_plugin_ping(self):
+        if not self.restful_plugin_url:
+            raise RestApiPingFailed(reason='missing service url')
+        if not self.certificate:
+            raise RestApiPingFailed(reason='missing certificate')
+        LOG.debug('Ping restful plugin: url=%d', self.restful_plugin_url)
+        try:
+            response = requests.request(
+                'GET', self.restful_plugin_url, verify=False,
+                timeout=CONFIG.rest_api_timeout_sec)
+            if not response.ok:
+                raise RestApiPingFailed(
+                    reason='response not ok ({})'.format(response))
+            LOG.debug('Ping restful plugin OK')
+        except (requests.ConnectionError,
+                requests.Timeout,
+                requests.HTTPError) as err:
+            raise RestApiPingFailed(reason=str(err))
+
+    @staticmethod
+    def _make_client_socket():
+        sock = socket.socket(
+            socket.AF_UNIX, socket.SOCK_SEQPACKET)
+        sock.settimeout(2 * CONFIG.rest_api_timeout_sec)
+        sock.connect(CONFIG.service_socket)
+        return sock
+
+    @staticmethod
+    def request_status():
+        try:
+            with contextlib.closing(
+                    ServiceMonitor._make_client_socket()) as sock:
+                sock.send('status')
+                status = sock.recv(CONFIG.service_socket_bufsize)
+                LOG.debug('Status %s', status)
+                return status.startswith('OK')
+        except socket.error as err:
+            LOG.error('Status error: reason=%s', err)
+            return False
+
+    @staticmethod
+    def request_stop():
+        try:
+            with contextlib.closing(
+                    ServiceMonitor._make_client_socket()) as sock:
+                sock.send('stop')
+                response = sock.recv(CONFIG.service_socket_bufsize)
+                LOG.debug('Stop response: %s', response)
+                return True
+        except socket.error as err:
+            LOG.error('Stop error: reason=%s', err)
+            return False
+
+    @staticmethod
+    def request_update_ceph_mgr_failures(count):
+        try:
+            with contextlib.closing(
+                    ServiceMonitor._make_client_socket()) as sock:
+                sock.send('ceph-mgr-failures {}'.format(count))
+                sock.recv(CONFIG.service_socket_bufsize)
+                return True
+        except socket.error as err:
+            LOG.error('Stop error: reason=%s', err)
+            return False
+
+    @staticmethod
+    def request_update_ping_failures(count):
+        try:
+            with contextlib.closing(
+                    ServiceMonitor._make_client_socket()) as sock:
+                sock.send('ping-failures {}'.format(count))
+                sock.recv(CONFIG.service_socket_bufsize)
+                return True
+        except socket.error as err:
+            LOG.error('Stop error: reason=%s', err)
+            return False
+
+    @staticmethod
+    def request_update_plugin_url(url):
+        try:
+            with contextlib.closing(
+                    ServiceMonitor._make_client_socket()) as sock:
+                sock.send('restful-url {}'.format(url))
+                sock.recv(CONFIG.service_socket_bufsize)
+                return True
+        except socket.error as err:
+            LOG.error('Stop error: reason=%s', err)
+            return False
+
+    @staticmethod
+    def request_update_certificate(path):
+        try:
+            with contextlib.closing(
+                    ServiceMonitor._make_client_socket()) as sock:
+                sock.send('certificate {}'.format(path))
+                sock.recv(CONFIG.service_socket_bufsize)
+                return True
+        except socket.error as err:
+            LOG.error('Stop error: reason=%s', err)
+            return False
+
+
+class InitWrapper(object):
+
+    """Handle System V init script actions: start, stop, restart, etc. """
+
+    def __init__(self):
+
+        """Dispatch command line action to the corresponding function.
+
+           Candidate action functions are all class methods except ones
+           that start with an underscore.
+        """
+
+        parser = argparse.ArgumentParser()
+        actions = [m[0]
+                   for m in inspect.getmembers(self)
+                   if (inspect.ismethod(m[1])
+                       and not m[0].startswith('_'))]
+        parser.add_argument(
+            'action',
+            choices=actions)
+        self.args = parser.parse_args()
+        getattr(self, self.args.action)()
+
+    def start(self):
+
+        """Start ServiceMonitor as a daemon unless one is already running.
+
+           Use a pipe to report monitor status back to this process.
+        """
+
+        pipe = os.pipe()
+        child = os.fork()
+        if child == 0:
+            os.close(pipe[0])
+            with daemon.DaemonContext(files_preserve=[pipe[1]]):
+                # prevent duplication of messages in log
+                global LOG
+                LOG = setup_logging(cleanup_handlers=True)
+                try:
+                    monitor = ServiceMonitor()
+                    status = 'OK'
+                except ServiceAlreadyStarted:
+                    os.write(pipe[1], 'OK')
+                    os.close(pipe[1])
+                    return
+                except Exception as err:
+                    status = str(err)
+                os.write(pipe[1], status)
+                os.close(pipe[1])
+                if status == 'OK':
+                    try:
+                        monitor.run()
+                    except ServiceException as err:
+                        LOG.warning(str(err))
+                    except Exception as err:
+                        LOG.exception('Service monitor error: reason=%s', err)
+        else:
+            os.close(pipe[1])
+            try:
+                status = os.read(pipe[0], CONFIG.service_socket_bufsize)
+                if status == 'OK':
+                    sys.exit(0)
+                else:
+                    LOG.warning('Service monitor failed to start: '
+                                'status=%s', status)
+            except IOError as err:
+                LOG.warning('Failed to read monitor status: reason=%s', err)
+            os.close(pipe[0])
+            os.waitpid(child, 0)
+            sys.exit(1)
+
+    def stop(self):
+
+        """Tell ServiceMonitor daemon to stop running.
+
+           In case request fails stop ServiceMonitor and ceph_mgr proecsses
+           using SIGTERM followed by SIGKILL.
+        """
+
+        result = ServiceMonitor.request_stop()
+        if not result:
+            ceph_mgr = os.path.basename(CONFIG.ceph_mgr_service)
+            procs = []
+            for proc in psutil.process_iter():
+                name = proc.name()
+                if name == CONFIG.service_name:
+                    procs.append(proc)
+                if name == ceph_mgr:
+                    procs.append(proc)
+            psutil_terminate_kill(procs, CONFIG.ceph_mgr_kill_delay_sec)
+
+    def restart(self):
+        self.stop()
+        self.start()
+
+    def force_reload(self):
+        self.stop()
+        self.start()
+
+    def reload(self):
+        self.stop()
+        self.start()
+
+    def status(self):
+
+        """Report status from ServiceMonitor.
+
+           We don't just try to access REST API here because ServiceMonitor may
+           be in the process of starting/configuring ceph-mgr and restful
+           plugin in which case we report OK to avoid being restarted by SM.
+        """
+
+        status = ServiceMonitor.request_status()
+        sys.exit(0 if status is True else 1)
+
+
+if __name__ == '__main__':
+    InitWrapper()