--- /dev/null
+#!/usr/bin/python
+#
+# Copyright (c) 2019 Wind River Systems, Inc.
+#
+# SPDX-License-Identifier: Apache-2.0
+#
+
+
+### BEGIN INIT INFO
+# Provides: ceph/mgr RESTful API plugin
+# Required-Start: $ceph
+# Required-Stop: $ceph
+# Default-Start: 2 3 4 5
+# Default-Stop: 0 1 6
+# Short-Description: Ceph MGR RESTful API plugin
+# Description: Ceph MGR RESTful API plugin
+### END INIT INFO
+
+import argparse
+import contextlib
+import errno
+import fcntl
+import inspect
+import json
+import logging
+import multiprocessing
+import os
+import shutil
+import signal
+import socket
+import subprocess
+import sys
+import tempfile
+import time
+
+import daemon
+import psutil
+import requests
+
+# 'timeout' command returns exit status 124
+# if command times out (see man page)
+GNU_TIMEOUT_EXPIRED_RETCODE = 124
+
+
+def psutil_terminate_kill(target, timeout):
+
+ """Extend psutil functionality to stop a process.
+
+ SIGINT is sent to each target then after a grace period SIGKILL
+ is sent to the ones that are still running.
+ """
+
+ if not isinstance(target, list):
+ target = [target]
+ _, target = psutil.wait_procs(target, timeout=0)
+ for action in [lambda p: p.terminate(), lambda p: p.kill()]:
+ for proc in target:
+ action(proc)
+ _, target = psutil.wait_procs(
+ target, timeout=timeout)
+
+
+class Config(object):
+
+ """ceph-mgr service wrapper configuration options.
+
+ In the future we may want to load them from a configuration file
+ (for example /etc/ceph/mgr-restful-plugin.conf )
+ """
+
+ def __init__(self):
+ self.log_level = logging.INFO
+ self.log_dir = '/var/log'
+
+ self.ceph_mgr_service = '/usr/bin/ceph-mgr'
+ self.ceph_mgr_config = '/etc/ceph/ceph.conf'
+ self.ceph_mgr_cluster = 'ceph'
+ self.ceph_mgr_rundir = '/var/run/ceph/mgr'
+ self.ceph_mgr_confdir = '/var/lib/ceph/mgr'
+ self.ceph_mgr_identity = socket.gethostname()
+
+ self.service_name = 'mgr-restful-plugin'
+ self.service_socket = os.path.join(
+ self.ceph_mgr_rundir, '{}.socket'.format(self.service_name))
+ self.service_lock = os.path.join(
+ self.ceph_mgr_rundir, '{}.lock'.format(self.service_name))
+ self.service_pid_file = os.path.join(
+ '/var/run/ceph', '{}.pid'.format(self.service_name))
+
+ self.restful_plugin_port = 5001
+
+ # maximum size of a message received/sent via
+ # service monitor control socket
+ self.service_socket_bufsize = 1024
+
+ # maximum time to wait for ceph cli to exit
+ self.ceph_cli_timeout_sec = 30
+
+ # how much time to wait after ceph cli commands fail with timeout
+ # before running any other commands
+ self.cluster_grace_period_sec = 30
+
+ # after ceph-mgr is started it goes through an internal initialization
+ # phase before; how much time to wait before querying ceph-mgr
+ self.ceph_mgr_grace_period_sec = 15
+
+ # after sending SIGTERM to ceph-mgr how much time to wait before
+ # sending SIGKILL (maximum time allowed for ceph-mgr cleanup)
+ self.ceph_mgr_kill_delay_sec = 5
+
+ # if service monitor is running a recovery procedure it reports
+ # status OK even if ceph-mgr is currently down. This sets the
+ # maximum number of consecutive ceph-mgr failures before reporting
+ # status error
+ self.ceph_mgr_fail_count_report_error = 3
+
+ # maximum number of consecutive ceph-mgr failures before
+ # stopping mgr-restful-plugin service
+ self.ceph_mgr_fail_count_exit = 5
+
+ # maximum time allowed for ceph-mgr to respond to a REST API request
+ self.rest_api_timeout_sec = 15
+
+ # interval between consecutive REST API requests (ping's). A smaller
+ # value here triggers more requests to ceph-mgr restful plugin. A
+ # higher value makes recovery slower when services become unavailable
+ self.restful_plugin_ping_delay_sec = 3
+
+ # where to save the self-signed certificate generated by ceph-mgr
+ self.restful_plugin_cert_path = os.path.join(
+ self.ceph_mgr_rundir, 'restful.crt')
+
+ # time to wait after enabling restful plugin
+ self.restful_plugin_grace_period_sec = 3
+
+ # after how many REST API ping failures to restart ceph-mgr
+ self.ping_fail_count_restart_mgr = 3
+
+ # after how many REST API ping failures to report status error.
+ # Until then service monitor reports status OK just in case
+ # restful plugin recovers
+ self.ping_fail_count_report_error = 5
+
+ @staticmethod
+ def load():
+ return Config()
+
+
+def setup_logging(name=None, cleanup_handlers=False):
+ if not name:
+ name = CONFIG.service_name
+ log = logging.getLogger(name)
+ log.setLevel(CONFIG.log_level)
+ if cleanup_handlers:
+ try:
+ for handler in log.handlers:
+ if isinstance(handler, logging.StreamHandler):
+ handler.flush()
+ if isinstance(handler, logging.FileHandler):
+ handler.close()
+ log.handlers = []
+ except Exception:
+ pass
+ elif log.handlers:
+ return log
+ handler = logging.FileHandler(
+ os.path.join(CONFIG.log_dir,
+ '{}.log'.format(CONFIG.service_name)))
+ handler.setFormatter(
+ logging.Formatter('%(asctime)s %(process)s %(levelname)s %(name)s %(message)s'))
+ log.addHandler(handler)
+ return log
+
+
+CONFIG = Config.load()
+LOG = setup_logging(name='init-wrapper')
+
+
+class ServiceException(Exception):
+
+ """Generic mgr-restful-plugin service exception.
+
+ Build exception string based on static (per exception class)
+ string plus args, keyword args passed to exception constructor.
+ """
+
+ message = ""
+
+ def __init__(self, *args, **kwargs):
+ if "message" not in kwargs:
+ try:
+ message = self.message.format(*args, **kwargs)
+ except Exception: # noqa
+ message = '{}, args:{}, kwargs: {}'.format(
+ self.message, args, kwargs)
+ else:
+ message = kwargs["message"]
+ super(ServiceException, self).__init__(message)
+
+
+class ServiceAlreadyStarted(ServiceException):
+ message = ('Service monitor already started')
+
+
+class ServiceLockFailed(ServiceException):
+ message = ('Unable to lock service monitor: '
+ 'reason={reason}')
+
+
+class ServiceNoSocket(ServiceException):
+ message = ('Unable to create service monitor socket: '
+ 'reason={reason}')
+
+
+class ServiceSocketBindFailed(ServiceException):
+ message = ('Failed to bind service monitor socket: '
+ 'path={path}, reason={reason}')
+
+
+class ServiceNoPidFile(ServiceException):
+ message = ('Failed to update pid file: '
+ 'path={path}, reason={reason}')
+
+
+class CommandFailed(ServiceException):
+ message = ('Command failed: command={command}, '
+ 'reason={reason}, out={out}')
+
+
+class CommandTimeout(ServiceException):
+ message = ('Command timeout: command={command}, '
+ 'timeout={timeout}')
+
+
+class CephMgrStartFailed(ServiceException):
+ message = ('Failed to start ceph_mgr: '
+ 'reason={reason}')
+
+
+class CephRestfulPluginFailed(ServiceException):
+ message = ('Failed to start restful plugin: '
+ 'reason={reason}')
+
+
+class RestApiPingFailed(ServiceException):
+ message = ('REST API ping failed: '
+ 'reason={reason}')
+
+
+class ServiceMonitor(object):
+
+ """Configure and monitor ceph-mgr and restful plugin (Ceph REST API)
+
+ 1. process init script service requests: status, stop. Requests are
+ received via a control socket. Stop has priority over whatever
+ the monitor is doing currently. Any ceph command that may be running
+ is terminated/killed. Note that while ceph-mgr and restful plugin
+ configuration is in progress ServiceMonitor reports status OK to
+ avoid being restarted by SM.
+
+ 2. configure ceph-mgr and mgr restful plugin: authentication, REST API
+ service port, self signed certificate. This runs as a separate
+ process so it can be stopped when init script requests it.
+
+ 3. periodically check (ping) REST API responds to HTTPS requests.
+ Recovery actions are taken if REST API fails to respond: restart
+ ceph-mgr, wait for cluster to become available again.
+ """
+
+ def __init__(self):
+ # process running configuration & REST API ping loop
+ self.monitor = None
+
+ # command socket used by init script
+ self.command = None
+
+ # ceph-mgr process
+ self.ceph_mgr = None
+
+ # consecutive ceph-mgr/restful-plugin start failures. Service monitor
+ # reports failure after CONFIG.ceph_mgr_max_failure_count
+ self.ceph_mgr_failure_count = 0
+
+ # consecutive REST API ping failures. ceph-mgr service is restarted
+ # after CONFIG.ping_fail_count_restart_mgr threshold is exceeded
+ self.ping_failure_count = 0
+
+ # REST API url reported by ceph-mgr after enabling restful plugin
+ self.restful_plugin_url = ''
+
+ # REST API self signed certificate generated by restful plugin
+ self.certificate = ''
+
+ def run(self):
+ self.disable_certificate_check()
+ with self.service_lock(), self.service_socket(), \
+ self.service_pid_file():
+ self.start_monitor()
+ self.server_loop()
+
+ def disable_certificate_check(self):
+ # ceph-mgr restful plugin is configured with a self-signed
+ # certificate. Certificate host is hard-coded to "ceph-restful"
+ # which causes HTTPS requests to fail because they don't
+ # match current host name ("controller-..."). Disable HTTPS
+ # certificates check in urllib3
+ LOG.warning('Disable urllib3 certifcates check')
+ requests.packages.urllib3.disable_warnings()
+
+ def server_loop(self):
+ self.command.listen(2)
+ while True:
+ try:
+ client, _ = self.command.accept()
+ request = client.recv(CONFIG.service_socket_bufsize)
+ LOG.debug('Monitor command socket: request=%s', str(request))
+ cmd = request.split(' ')
+ cmd, args = cmd[0], cmd[1:]
+ if cmd == 'status':
+ self.send_response(client, request, self.status())
+ elif cmd == 'stop':
+ self.stop()
+ self.send_response(client, request, 'OK')
+ break
+ elif cmd == 'restful-url':
+ try:
+ self.restful_plugin_url = args[0]
+ self.send_response(client, request, 'OK')
+ except IndexError:
+ LOG.warning('Failed to update restful plugin url: '
+ 'args=%s', str(args))
+ self.send_response(client, request, 'ERR')
+ elif cmd == 'certificate':
+ try:
+ self.certificate = args[0] if args else ''
+ self.send_response(client, request, 'OK')
+ except IndexError:
+ LOG.warning('Failed to update certificate path: '
+ 'args=%s', str(args))
+ self.send_response(client, request, 'ERR')
+ elif cmd == 'ceph-mgr-failures':
+ try:
+ self.ceph_mgr_failure_count = int(args[0])
+ self.send_response(client, request, 'OK')
+ if self.ceph_mgr_failure_count >= CONFIG.ceph_mgr_fail_count_exit:
+ self.stop()
+ break
+ except (IndexError, ValueError):
+ LOG.warning('Failed to update ceph-mgr failures: '
+ 'args=%s', str(args))
+ self.send_response(client, request, 'ERR')
+ elif cmd == 'ping-failures':
+ try:
+ self.ping_failure_count = int(args[0])
+ self.send_response(client, request, 'OK')
+ except (IndexError, ValueError):
+ LOG.warning('Failed to update ping failures: '
+ 'args=%s', str(args))
+ self.send_response(client, request, 'ERR')
+ except Exception as err:
+ LOG.exception(err)
+
+ @staticmethod
+ def send_response(client, request, response):
+ try:
+ client.send(response)
+ except socket.error as err:
+ LOG.warning('Failed to send response back. '
+ 'request=%s, response=%s, reason=%s',
+ request, response, err)
+
+ def status(self):
+ if not self.restful_plugin_url:
+ if self.ceph_mgr_failure_count < CONFIG.ceph_mgr_fail_count_report_error \
+ and self.ping_failure_count < CONFIG.ping_fail_count_report_error:
+ LOG.debug('Monitor is starting services. Report status OK')
+ return 'OK'
+ LOG.debug('Too many failures: '
+ 'ceph_mgr=%d < %d, ping=%d < %d. '
+ 'Report status ERR',
+ self.ceph_mgr_failure_count,
+ CONFIG.ceph_mgr_fail_count_report_error,
+ self.ping_failure_count,
+ CONFIG.ping_fail_count_report_error)
+ return 'ERR.down'
+ try:
+ self.restful_plugin_ping()
+ LOG.debug('Restful plugin ping successful. Report status OK')
+ return 'OK'
+ except (CommandFailed, RestApiPingFailed):
+ if self.ceph_mgr_failure_count < CONFIG.ceph_mgr_fail_count_report_error \
+ and self.ping_failure_count < CONFIG.ping_fail_count_report_error:
+ LOG.info('Restful plugin does not respond but failure '
+ 'count is within acceptable limits: '
+ ' ceph_mgr=%d < %d, ping=%d < %d. '
+ 'Report status OK',
+ self.ceph_mgr_failure_count,
+ CONFIG.ceph_mgr_fail_count_report_error,
+ self.ping_failure_count,
+ CONFIG.ping_fail_count_report_error)
+ return 'OK'
+ LOG.debug('Restful does not respond (ping failure count %d). '
+ 'Report status ERR', self.ping_failure_count)
+ return 'ERR.ping_failed'
+
+ def stop(self):
+ if not self.monitor:
+ return
+ LOG.info('Stop monitor with SIGTERM to process group %d',
+ self.monitor.pid)
+ try:
+ os.killpg(self.monitor.pid, signal.SIGTERM)
+ except OSError as err:
+ LOG.info('Stop monitor failed: reason=%s', str(err))
+ return
+ time.sleep(CONFIG.ceph_mgr_kill_delay_sec)
+ LOG.info('Stop monitor with SIGKILL to process group %d',
+ self.monitor.pid)
+ try:
+ os.killpg(self.monitor.pid, signal.SIGKILL)
+ os.waitpid(self.monitor.pid, 0)
+ except OSError as err:
+ LOG.info('Stop monitor failed: reason=%s', str(err))
+ return
+ LOG.info('Monitor stopped: pid=%d', self.monitor.pid)
+
+ @contextlib.contextmanager
+ def service_lock(self):
+ LOG.info('Take service lock: path=%s', CONFIG.service_lock)
+ try:
+ os.makedirs(os.path.dirname(CONFIG.service_lock))
+ except OSError:
+ pass
+ lock_file = open(CONFIG.service_lock, 'w')
+ try:
+ fcntl.flock(lock_file.fileno(),
+ fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except (IOError, OSError) as err:
+ if err.errno == errno.EAGAIN:
+ raise ServiceAlreadyStarted()
+ else:
+ raise ServiceLockFailed(reason=str(err))
+ # even if we have the lock here there might be another service manager
+ # running whose CONFIG.ceph_mgr_rundir was removed before starting
+ # this instance. Make sure there is only one service manager running
+ self.stop_other_service_managers()
+ try:
+ yield
+ finally:
+ os.unlink(CONFIG.service_lock)
+ lock_file.close()
+ LOG.info('Release service lock: path=%s', CONFIG.service_lock)
+
+ def stop_other_service_managers(self):
+ service = os.path.join('/etc/init.d', CONFIG.service_name)
+ for p in psutil.process_iter():
+ if p.cmdline()[:2] not in [[service], ['/usr/bin/python', service]]:
+ continue
+ if p.pid == os.getpid():
+ continue
+ p.kill()
+
+ @contextlib.contextmanager
+ def service_socket(self):
+ LOG.info('Create service socket')
+ try:
+ self.command = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+ except socket.error as err:
+ raise ServiceNoSocket(reason=str(err))
+ LOG.info('Remove existing socket files')
+ try:
+ os.unlink(CONFIG.service_socket)
+ except OSError:
+ pass
+ LOG.info('Bind service socket: path=%s', CONFIG.service_socket)
+ try:
+ self.command.bind(CONFIG.service_socket)
+ except socket.error as err:
+ raise ServiceSocketBindFailed(
+ path=CONFIG.service_socket, reason=str(err))
+ try:
+ yield
+ finally:
+ LOG.info('Close service socket and remove file: path=%s',
+ CONFIG.service_socket)
+ self.command.close()
+ os.unlink(CONFIG.service_socket)
+
+ @contextlib.contextmanager
+ def service_pid_file(self):
+ LOG.info('Update service pid file: path=%s', CONFIG.service_pid_file)
+ try:
+ pid_file = open(CONFIG.service_pid_file, 'w')
+ pid_file.write(str(os.getpid()))
+ pid_file.flush()
+ except OSError as err:
+ raise ServiceNoPidFile(
+ path=CONFIG.service_pid_file, reason=str(err))
+ try:
+ yield
+ finally:
+ LOG.info('Remove service pid file: path=%s',
+ CONFIG.service_pid_file)
+ try:
+ os.unlink(CONFIG.service_pid_file)
+ except OSError:
+ pass
+
+ def start_monitor(self):
+ LOG.info('Start monitor loop')
+ self.monitor = multiprocessing.Process(target=self.monitor_loop)
+ self.monitor.start()
+
+ def stop_unmanaged_ceph_mgr(self):
+ LOG.info('Stop unmanaged running ceph-mgr processes')
+ service_name = os.path.basename(CONFIG.ceph_mgr_service)
+ if self.ceph_mgr:
+ psutil_terminate_kill(
+ [proc for proc in psutil.process_iter()
+ if (proc.name() == service_name
+ and proc.pid != self.ceph_mgr.pid)],
+ CONFIG.ceph_mgr_kill_delay_sec)
+ else:
+ psutil_terminate_kill(
+ [proc for proc in psutil.process_iter()
+ if proc.name() == service_name],
+ CONFIG.ceph_mgr_kill_delay_sec)
+
+ def monitor_loop(self):
+
+ """Bring up and monitor ceph-mgr restful plugin.
+
+ Steps:
+ - wait for Ceph cluster to become available
+ - configure and start ceph-mgr
+ - configure and enable restful plugin
+ - send periodic requests to REST API
+ - recover from failures
+
+ Note: because this runs as a separate process it
+ must send status updates to service monitor
+ via control socket for: ping_failure_count,
+ restful_plugin_url and certificate.
+ """
+
+ # Promote to process group leader so parent (service monitor)
+ # can kill the monitor plus processes spawned by it. Otherwise
+ # children of monitor_loop() will keep running in background and
+ # will be reaped by init when they finish but by then they might
+ # interfere with any new service instance.
+ os.setpgrp()
+
+ # Ignoring SIGTERM here ensures process group is not reused by
+ # the time parent (service monitor) issues the final SIGKILL.
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+
+ while True:
+ try:
+ # steps to configure/start ceph-mgr and restful plugin
+ self.ceph_fsid_get()
+ self.ceph_mgr_auth_create()
+ self.restful_plugin_set_server_port()
+ self.restful_plugin_create_certificate()
+ self.ceph_mgr_start()
+ self.restful_plugin_enable()
+ self.restful_plugin_create_admin_key()
+ self.restful_plugin_get_url()
+ self.restful_plugin_get_certificate()
+
+ # REST API should be available now
+ # start making periodic requests (ping)
+ while True:
+ try:
+ self.restful_plugin_ping()
+ self.ping_failure_count = 0
+ self.request_update_ping_failures(
+ self.ping_failure_count)
+ self.ceph_mgr_failure_count = 0
+ self.request_update_ceph_mgr_failures(
+ self.ceph_mgr_failure_count)
+ time.sleep(CONFIG.restful_plugin_ping_delay_sec)
+ continue
+ except RestApiPingFailed as err:
+ LOG.warning(str(err))
+
+ LOG.info('REST API ping failure count=%d',
+ self.ping_failure_count)
+ self.ping_failure_count += 1
+ self.request_update_ping_failures(
+ self.ping_failure_count)
+
+ # maybe request failed because ceph-mgr is not running
+ if not self.ceph_mgr_is_running():
+ self.ceph_mgr_failure_count += 1
+ self.request_update_ceph_mgr_failures(
+ self.ceph_mgr_failure_count)
+ self.ceph_mgr_start()
+ time.sleep(CONFIG.ceph_mgr_grace_period_sec)
+ continue
+
+ # maybe request failed because cluster health is not ok
+ if not self.ceph_fsid_get():
+ LOG.info('Unable to get cluster fsid. '
+ 'Sleep for a while')
+ time.sleep(CONFIG.cluster_grace_period_sec)
+ break
+
+ # too many failures? Restart ceph-mgr and go again
+ # through configuration steps
+ if (self.ping_failure_count
+ % CONFIG.ping_fail_count_restart_mgr == 0):
+ LOG.info('Too many consecutive REST API failures. '
+ 'Restart ceph-mgr. Update service '
+ 'url and certificate')
+ self.ceph_mgr_stop()
+ self.restful_plugin_url = ''
+ self.request_update_plugin_url(self.restful_plugin_url)
+ self.certificate = ''
+ self.request_update_certificate(self.certificate)
+ break
+
+ time.sleep(CONFIG.restful_plugin_ping_delay_sec)
+
+ except CommandFailed as err:
+ LOG.warning(str(err))
+ time.sleep(CONFIG.cluster_grace_period_sec)
+ except CommandTimeout as err:
+ LOG.warning(str(err))
+ except (CephMgrStartFailed, CephRestfulPluginFailed) as err:
+ LOG.warning(str(err))
+ self.ceph_mgr_failure_count += 1
+ self.request_update_ceph_mgr_failures(
+ self.ceph_mgr_failure_count)
+ time.sleep(CONFIG.ceph_mgr_grace_period_sec)
+ except Exception as err:
+ LOG.exception(err)
+ time.sleep(CONFIG.cluster_grace_period_sec)
+
+ @staticmethod
+ def run_with_timeout(command, timeout, stderr=subprocess.STDOUT):
+ try:
+ LOG.info('Run command: %s', ' '.join(command))
+ return subprocess.check_output(
+ ['/usr/bin/timeout', str(timeout)] + command,
+ stderr=stderr, shell=False).strip()
+ except subprocess.CalledProcessError as err:
+ if err.returncode == GNU_TIMEOUT_EXPIRED_RETCODE:
+ raise CommandTimeout(command=err.cmd, timeout=timeout)
+ raise CommandFailed(command=err.cmd, reason=str(err),
+ out=err.output)
+
+ def ceph_fsid_get(self):
+ return self.run_with_timeout(['/usr/bin/ceph', 'fsid'],
+ CONFIG.ceph_cli_timeout_sec)
+
+ def ceph_mgr_has_auth(self):
+ path = '{}/ceph-{}'.format(
+ CONFIG.ceph_mgr_confdir, CONFIG.ceph_mgr_identity)
+ try:
+ os.makedirs(path)
+ except OSError as err:
+ pass
+ try:
+ self.run_with_timeout(
+ ['/usr/bin/ceph', 'auth', 'get',
+ 'mgr.{}'.format(CONFIG.ceph_mgr_identity),
+ '-o', '{}/keyring'.format(path)],
+ CONFIG.ceph_cli_timeout_sec)
+ return True
+ except CommandFailed as err:
+ if 'ENOENT' in str(err):
+ return False
+ raise
+
+ def ceph_mgr_auth_create(self):
+ if self.ceph_mgr_has_auth():
+ return
+ LOG.info('Create ceph-mgr authentication')
+ self.run_with_timeout(
+ ['/usr/bin/ceph', 'auth', 'get-or-create',
+ 'mgr.{}'.format(CONFIG.ceph_mgr_identity),
+ 'mon', 'allow *', 'osd', 'allow *'],
+ CONFIG.ceph_cli_timeout_sec)
+
+ def ceph_mgr_is_running(self):
+ if not self.ceph_mgr:
+ return None
+ try:
+ self.ceph_mgr.wait(timeout=0)
+ except psutil.TimeoutExpired:
+ return True
+ return False
+
+ def ceph_mgr_start(self):
+ if self.ceph_mgr_is_running():
+ return
+ self.stop_unmanaged_ceph_mgr()
+ LOG.info('Start ceph-mgr daemon')
+ try:
+ with open(os.devnull, 'wb') as null:
+ self.ceph_mgr = psutil.Popen(
+ [CONFIG.ceph_mgr_service,
+ '--cluster', CONFIG.ceph_mgr_cluster,
+ '--conf', CONFIG.ceph_mgr_config,
+ '--id', CONFIG.ceph_mgr_identity,
+ '-f'],
+ close_fds=True,
+ stdout=null,
+ stderr=null,
+ shell=False)
+ except (OSError, ValueError) as err:
+ raise CephMgrStartFailed(reason=str(err))
+ time.sleep(CONFIG.ceph_mgr_grace_period_sec)
+
+ def ceph_mgr_stop(self):
+ if not self.ceph_mgr:
+ return
+ LOG.info('Stop ceph-mgr')
+ psutil_terminate_kill(self.ceph_mgr, CONFIG.ceph_mgr_kill_delay_sec)
+
+ def restful_plugin_has_server_port(self):
+ try:
+ with open(os.devnull, 'wb') as null:
+ out = self.run_with_timeout(
+ ['/usr/bin/ceph', 'config-key', 'get',
+ 'mgr/restful/server_port'],
+ CONFIG.ceph_cli_timeout_sec, stderr=null)
+ if out == str(CONFIG.restful_plugin_port):
+ return True
+ LOG.warning('Restful plugin port mismatch: '
+ 'current=%d, expected=%d', out,
+ CONFIG.restful_plugin_port)
+ except CommandFailed as err:
+ LOG.warning('Failed to get restful plugin port: '
+ 'reason=%s', str(err))
+ return False
+
+ def restful_plugin_set_server_port(self):
+ if self.restful_plugin_has_server_port():
+ return
+ LOG.info('Set restful plugin port=%d', CONFIG.restful_plugin_port)
+ self.run_with_timeout(
+ ['/usr/bin/ceph', 'config-key', 'set',
+ 'mgr/restful/server_port', str(CONFIG.restful_plugin_port)],
+ CONFIG.ceph_cli_timeout_sec)
+
+ def restful_plugin_has_admin_key(self):
+ try:
+ self.run_with_timeout(
+ ['/usr/bin/ceph', 'config-key', 'get',
+ 'mgr/restful/keys/admin'],
+ CONFIG.ceph_cli_timeout_sec)
+ return True
+ except CommandFailed:
+ pass
+ return False
+
+ def restful_plugin_create_admin_key(self):
+ if self.restful_plugin_has_admin_key():
+ return
+ LOG.info('Create restful plugin admin key')
+ self.run_with_timeout(
+ ['/usr/bin/ceph', 'restful',
+ 'create-key', 'admin'],
+ CONFIG.ceph_cli_timeout_sec)
+
+ def restful_plugin_has_certificate(self):
+ try:
+ self.run_with_timeout(
+ ['/usr/bin/ceph', 'config-key', 'get',
+ 'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)],
+ CONFIG.ceph_cli_timeout_sec)
+ self.run_with_timeout(
+ ['/usr/bin/ceph', 'config-key', 'get',
+ 'mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)],
+ CONFIG.ceph_cli_timeout_sec)
+ self.run_with_timeout(
+ ['/usr/bin/ceph', 'config-key', 'get',
+ 'config/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity)],
+ CONFIG.ceph_cli_timeout_sec)
+ self.run_with_timeout(
+ ['/usr/bin/ceph', 'config-key', 'get',
+ '/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity)],
+ CONFIG.ceph_cli_timeout_sec)
+ return True
+ except CommandFailed:
+ pass
+ return False
+
+ def restful_plugin_create_certificate(self):
+ if self.restful_plugin_has_certificate():
+ return
+ LOG.info('Create restful plugin self signed certificate')
+ path = tempfile.mkdtemp()
+ try:
+ try:
+ with tempfile.NamedTemporaryFile() as restful_cnf:
+ restful_cnf.write((
+ '[req]\n'
+ 'req_extensions = v3_ca\n'
+ 'distinguished_name = req_distinguished_name\n'
+ '[v3_ca]\n'
+ 'subjectAltName=DNS:{}\n'
+ 'basicConstraints = CA:true\n'
+ '[ req_distinguished_name ]\n'
+ '0.organizationName = IT\n'
+ 'commonName = ceph-restful\n').format(
+ CONFIG.ceph_mgr_identity))
+ restful_cnf.flush()
+ subprocess.check_call([
+ '/usr/bin/openssl', 'req', '-new', '-nodes', '-x509',
+ '-subj', '/O=IT/CN=' + CONFIG.ceph_mgr_identity,
+ '-days', '3650',
+ '-config', restful_cnf.name,
+ '-out', os.path.join(path, 'crt'),
+ '-keyout', os.path.join(path, 'key'),
+ '-extensions', 'v3_ca'])
+ except subprocess.CalledProcessError as err:
+ raise CommandFailed(
+ command=' '.join(err.cmd),
+ reason='failed to generate self-signed certificate: {}'.format(str(err)),
+ out=err.output)
+ self.run_with_timeout(
+ ['/usr/bin/ceph', 'config-key', 'set',
+ 'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity),
+ '-i', os.path.join(path, 'crt')],
+ CONFIG.ceph_cli_timeout_sec)
+ self.run_with_timeout(
+ ['/usr/bin/ceph', 'config-key', 'set',
+ 'mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity),
+ '-i', os.path.join(path, 'crt')],
+ CONFIG.ceph_cli_timeout_sec)
+ self.run_with_timeout(
+ ['/usr/bin/ceph', 'config-key', 'set',
+ 'config/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity),
+ '-i', os.path.join(path, 'key')],
+ CONFIG.ceph_cli_timeout_sec)
+ self.run_with_timeout(
+ ['/usr/bin/ceph', 'config-key', 'set',
+ 'mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity),
+ '-i', os.path.join(path, 'key')],
+ CONFIG.ceph_cli_timeout_sec)
+ finally:
+ shutil.rmtree(path)
+
+ def restful_plugin_is_enabled(self):
+ command = ['/usr/bin/ceph', 'mgr', 'module', 'ls',
+ '--format', 'json']
+ with open(os.devnull, 'wb') as null:
+ out = self.run_with_timeout(
+ command, CONFIG.ceph_cli_timeout_sec, stderr=null)
+ try:
+ if 'restful' in json.loads(out)['enabled_modules']:
+ return True
+ except ValueError as err:
+ raise CommandFailed(
+ command=' '.join(command),
+ reason='unable to decode json: {}'.format(err), out=out)
+ except KeyError as err:
+ raise CommandFailed(
+ command=' '.join(command),
+ reason='missing expected key: {}'.format(err), out=out)
+ return False
+
+ def restful_plugin_enable(self):
+ if not self.restful_plugin_is_enabled():
+ LOG.info('Enable restful plugin')
+ self.run_with_timeout(
+ ['/usr/bin/ceph', 'mgr',
+ 'module', 'enable', 'restful'],
+ CONFIG.ceph_cli_timeout_sec)
+ time.sleep(CONFIG.restful_plugin_grace_period_sec)
+
+ def restful_plugin_get_url(self):
+ command = ['/usr/bin/ceph', 'mgr', 'services',
+ '--format', 'json']
+ with open(os.devnull, 'wb') as null:
+ out = self.run_with_timeout(
+ command, CONFIG.ceph_cli_timeout_sec, stderr=null)
+ try:
+ self.restful_plugin_url = json.loads(out)['restful']
+ except ValueError as err:
+ raise CephRestfulPluginFailed(
+ reason='unable to decode json: {} output={}'.format(err, out))
+ except KeyError as err:
+ raise CephRestfulPluginFailed(
+ reason='missing expected key: {} in ouput={}'.format(err, out))
+ self.request_update_plugin_url(self.restful_plugin_url)
+
+ def restful_plugin_get_certificate(self):
+ command = ['/usr/bin/ceph', 'config-key', 'get',
+ 'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)]
+ with open(os.devnull, 'wb') as null:
+ certificate = self.run_with_timeout(
+ command, CONFIG.ceph_cli_timeout_sec, stderr=null)
+ with open(CONFIG.restful_plugin_cert_path, 'wb') as cert_file:
+ cert_file.write(certificate)
+ self.certificate = CONFIG.restful_plugin_cert_path
+ self.request_update_certificate(
+ self.certificate)
+
+ def restful_plugin_ping(self):
+ if not self.restful_plugin_url:
+ raise RestApiPingFailed(reason='missing service url')
+ if not self.certificate:
+ raise RestApiPingFailed(reason='missing certificate')
+ LOG.debug('Ping restful plugin: url=%d', self.restful_plugin_url)
+ try:
+ response = requests.request(
+ 'GET', self.restful_plugin_url, verify=False,
+ timeout=CONFIG.rest_api_timeout_sec)
+ if not response.ok:
+ raise RestApiPingFailed(
+ reason='response not ok ({})'.format(response))
+ LOG.debug('Ping restful plugin OK')
+ except (requests.ConnectionError,
+ requests.Timeout,
+ requests.HTTPError) as err:
+ raise RestApiPingFailed(reason=str(err))
+
+ @staticmethod
+ def _make_client_socket():
+ sock = socket.socket(
+ socket.AF_UNIX, socket.SOCK_SEQPACKET)
+ sock.settimeout(2 * CONFIG.rest_api_timeout_sec)
+ sock.connect(CONFIG.service_socket)
+ return sock
+
+ @staticmethod
+ def request_status():
+ try:
+ with contextlib.closing(
+ ServiceMonitor._make_client_socket()) as sock:
+ sock.send('status')
+ status = sock.recv(CONFIG.service_socket_bufsize)
+ LOG.debug('Status %s', status)
+ return status.startswith('OK')
+ except socket.error as err:
+ LOG.error('Status error: reason=%s', err)
+ return False
+
+ @staticmethod
+ def request_stop():
+ try:
+ with contextlib.closing(
+ ServiceMonitor._make_client_socket()) as sock:
+ sock.send('stop')
+ response = sock.recv(CONFIG.service_socket_bufsize)
+ LOG.debug('Stop response: %s', response)
+ return True
+ except socket.error as err:
+ LOG.error('Stop error: reason=%s', err)
+ return False
+
+ @staticmethod
+ def request_update_ceph_mgr_failures(count):
+ try:
+ with contextlib.closing(
+ ServiceMonitor._make_client_socket()) as sock:
+ sock.send('ceph-mgr-failures {}'.format(count))
+ sock.recv(CONFIG.service_socket_bufsize)
+ return True
+ except socket.error as err:
+ LOG.error('Stop error: reason=%s', err)
+ return False
+
+ @staticmethod
+ def request_update_ping_failures(count):
+ try:
+ with contextlib.closing(
+ ServiceMonitor._make_client_socket()) as sock:
+ sock.send('ping-failures {}'.format(count))
+ sock.recv(CONFIG.service_socket_bufsize)
+ return True
+ except socket.error as err:
+ LOG.error('Stop error: reason=%s', err)
+ return False
+
+ @staticmethod
+ def request_update_plugin_url(url):
+ try:
+ with contextlib.closing(
+ ServiceMonitor._make_client_socket()) as sock:
+ sock.send('restful-url {}'.format(url))
+ sock.recv(CONFIG.service_socket_bufsize)
+ return True
+ except socket.error as err:
+ LOG.error('Stop error: reason=%s', err)
+ return False
+
+ @staticmethod
+ def request_update_certificate(path):
+ try:
+ with contextlib.closing(
+ ServiceMonitor._make_client_socket()) as sock:
+ sock.send('certificate {}'.format(path))
+ sock.recv(CONFIG.service_socket_bufsize)
+ return True
+ except socket.error as err:
+ LOG.error('Stop error: reason=%s', err)
+ return False
+
+
+class InitWrapper(object):
+
+ """Handle System V init script actions: start, stop, restart, etc. """
+
+ def __init__(self):
+
+ """Dispatch command line action to the corresponding function.
+
+ Candidate action functions are all class methods except ones
+ that start with an underscore.
+ """
+
+ parser = argparse.ArgumentParser()
+ actions = [m[0]
+ for m in inspect.getmembers(self)
+ if (inspect.ismethod(m[1])
+ and not m[0].startswith('_'))]
+ parser.add_argument(
+ 'action',
+ choices=actions)
+ self.args = parser.parse_args()
+ getattr(self, self.args.action)()
+
+ def start(self):
+
+ """Start ServiceMonitor as a daemon unless one is already running.
+
+ Use a pipe to report monitor status back to this process.
+ """
+
+ pipe = os.pipe()
+ child = os.fork()
+ if child == 0:
+ os.close(pipe[0])
+ with daemon.DaemonContext(files_preserve=[pipe[1]]):
+ # prevent duplication of messages in log
+ global LOG
+ LOG = setup_logging(cleanup_handlers=True)
+ try:
+ monitor = ServiceMonitor()
+ status = 'OK'
+ except ServiceAlreadyStarted:
+ os.write(pipe[1], 'OK')
+ os.close(pipe[1])
+ return
+ except Exception as err:
+ status = str(err)
+ os.write(pipe[1], status)
+ os.close(pipe[1])
+ if status == 'OK':
+ try:
+ monitor.run()
+ except ServiceException as err:
+ LOG.warning(str(err))
+ except Exception as err:
+ LOG.exception('Service monitor error: reason=%s', err)
+ else:
+ os.close(pipe[1])
+ try:
+ status = os.read(pipe[0], CONFIG.service_socket_bufsize)
+ if status == 'OK':
+ sys.exit(0)
+ else:
+ LOG.warning('Service monitor failed to start: '
+ 'status=%s', status)
+ except IOError as err:
+ LOG.warning('Failed to read monitor status: reason=%s', err)
+ os.close(pipe[0])
+ os.waitpid(child, 0)
+ sys.exit(1)
+
+ def stop(self):
+
+ """Tell ServiceMonitor daemon to stop running.
+
+ In case request fails stop ServiceMonitor and ceph_mgr proecsses
+ using SIGTERM followed by SIGKILL.
+ """
+
+ result = ServiceMonitor.request_stop()
+ if not result:
+ ceph_mgr = os.path.basename(CONFIG.ceph_mgr_service)
+ procs = []
+ for proc in psutil.process_iter():
+ name = proc.name()
+ if name == CONFIG.service_name:
+ procs.append(proc)
+ if name == ceph_mgr:
+ procs.append(proc)
+ psutil_terminate_kill(procs, CONFIG.ceph_mgr_kill_delay_sec)
+
+ def restart(self):
+ self.stop()
+ self.start()
+
+ def force_reload(self):
+ self.stop()
+ self.start()
+
+ def reload(self):
+ self.stop()
+ self.start()
+
+ def status(self):
+
+ """Report status from ServiceMonitor.
+
+ We don't just try to access REST API here because ServiceMonitor may
+ be in the process of starting/configuring ceph-mgr and restful
+ plugin in which case we report OK to avoid being restarted by SM.
+ """
+
+ status = ServiceMonitor.request_status()
+ sys.exit(0 if status is True else 1)
+
+
+if __name__ == '__main__':
+ InitWrapper()