+++ /dev/null
-#!/usr/bin/python
-#
-# Copyright (c) 2019 Wind River Systems, Inc.
-#
-# SPDX-License-Identifier: Apache-2.0
-#
-
-
-### BEGIN INIT INFO
-# Provides: ceph/mgr RESTful API plugin
-# Required-Start: $ceph
-# Required-Stop: $ceph
-# Default-Start: 2 3 4 5
-# Default-Stop: 0 1 6
-# Short-Description: Ceph MGR RESTful API plugin
-# Description: Ceph MGR RESTful API plugin
-### END INIT INFO
-
-import argparse
-import contextlib
-import errno
-import fcntl
-import inspect
-import json
-import logging
-import multiprocessing
-import os
-import shutil
-import signal
-import socket
-import subprocess
-import sys
-import tempfile
-import time
-
-import daemon
-import psutil
-import requests
-
-# 'timeout' command returns exit status 124
-# if command times out (see man page)
-GNU_TIMEOUT_EXPIRED_RETCODE = 124
-
-
-def psutil_terminate_kill(target, timeout):
-
- """Extend psutil functionality to stop a process.
-
- SIGINT is sent to each target then after a grace period SIGKILL
- is sent to the ones that are still running.
- """
-
- if not isinstance(target, list):
- target = [target]
- _, target = psutil.wait_procs(target, timeout=0)
- for action in [lambda p: p.terminate(), lambda p: p.kill()]:
- for proc in target:
- action(proc)
- _, target = psutil.wait_procs(
- target, timeout=timeout)
-
-
-class Config(object):
-
- """ceph-mgr service wrapper configuration options.
-
- In the future we may want to load them from a configuration file
- (for example /etc/ceph/mgr-restful-plugin.conf )
- """
-
- def __init__(self):
- self.log_level = logging.INFO
- self.log_dir = '/var/log'
-
- self.ceph_mgr_service = '/usr/bin/ceph-mgr'
- self.ceph_mgr_config = '/etc/ceph/ceph.conf'
- self.ceph_mgr_cluster = 'ceph'
- self.ceph_mgr_rundir = '/var/run/ceph/mgr'
- self.ceph_mgr_confdir = '/var/lib/ceph/mgr'
- self.ceph_mgr_identity = socket.gethostname()
-
- self.service_name = 'mgr-restful-plugin'
- self.service_socket = os.path.join(
- self.ceph_mgr_rundir, '{}.socket'.format(self.service_name))
- self.service_lock = os.path.join(
- self.ceph_mgr_rundir, '{}.lock'.format(self.service_name))
- self.service_pid_file = os.path.join(
- '/var/run/ceph', '{}.pid'.format(self.service_name))
-
- self.restful_plugin_port = 5001
-
- # maximum size of a message received/sent via
- # service monitor control socket
- self.service_socket_bufsize = 1024
-
- # maximum time to wait for ceph cli to exit
- self.ceph_cli_timeout_sec = 30
-
- # how much time to wait after ceph cli commands fail with timeout
- # before running any other commands
- self.cluster_grace_period_sec = 30
-
- # after ceph-mgr is started it goes through an internal initialization
- # phase before; how much time to wait before querying ceph-mgr
- self.ceph_mgr_grace_period_sec = 15
-
- # after sending SIGTERM to ceph-mgr how much time to wait before
- # sending SIGKILL (maximum time allowed for ceph-mgr cleanup)
- self.ceph_mgr_kill_delay_sec = 5
-
- # if service monitor is running a recovery procedure it reports
- # status OK even if ceph-mgr is currently down. This sets the
- # maximum number of consecutive ceph-mgr failures before reporting
- # status error
- self.ceph_mgr_fail_count_report_error = 3
-
- # maximum number of consecutive ceph-mgr failures before
- # stopping mgr-restful-plugin service
- self.ceph_mgr_fail_count_exit = 5
-
- # maximum time allowed for ceph-mgr to respond to a REST API request
- self.rest_api_timeout_sec = 15
-
- # interval between consecutive REST API requests (ping's). A smaller
- # value here triggers more requests to ceph-mgr restful plugin. A
- # higher value makes recovery slower when services become unavailable
- self.restful_plugin_ping_delay_sec = 3
-
- # where to save the self-signed certificate generated by ceph-mgr
- self.restful_plugin_cert_path = os.path.join(
- self.ceph_mgr_rundir, 'restful.crt')
-
- # time to wait after enabling restful plugin
- self.restful_plugin_grace_period_sec = 3
-
- # after how many REST API ping failures to restart ceph-mgr
- self.ping_fail_count_restart_mgr = 3
-
- # after how many REST API ping failures to report status error.
- # Until then service monitor reports status OK just in case
- # restful plugin recovers
- self.ping_fail_count_report_error = 5
-
- @staticmethod
- def load():
- return Config()
-
-
-def setup_logging(name=None, cleanup_handlers=False):
- if not name:
- name = CONFIG.service_name
- log = logging.getLogger(name)
- log.setLevel(CONFIG.log_level)
- if cleanup_handlers:
- try:
- for handler in log.handlers:
- if isinstance(handler, logging.StreamHandler):
- handler.flush()
- if isinstance(handler, logging.FileHandler):
- handler.close()
- log.handlers = []
- except Exception:
- pass
- elif log.handlers:
- return log
- handler = logging.FileHandler(
- os.path.join(CONFIG.log_dir,
- '{}.log'.format(CONFIG.service_name)))
- handler.setFormatter(
- logging.Formatter('%(asctime)s %(process)s %(levelname)s %(name)s %(message)s'))
- log.addHandler(handler)
- return log
-
-
-CONFIG = Config.load()
-LOG = setup_logging(name='init-wrapper')
-
-
-class ServiceException(Exception):
-
- """Generic mgr-restful-plugin service exception.
-
- Build exception string based on static (per exception class)
- string plus args, keyword args passed to exception constructor.
- """
-
- message = ""
-
- def __init__(self, *args, **kwargs):
- if "message" not in kwargs:
- try:
- message = self.message.format(*args, **kwargs)
- except Exception: # noqa
- message = '{}, args:{}, kwargs: {}'.format(
- self.message, args, kwargs)
- else:
- message = kwargs["message"]
- super(ServiceException, self).__init__(message)
-
-
-class ServiceAlreadyStarted(ServiceException):
- message = ('Service monitor already started')
-
-
-class ServiceLockFailed(ServiceException):
- message = ('Unable to lock service monitor: '
- 'reason={reason}')
-
-
-class ServiceNoSocket(ServiceException):
- message = ('Unable to create service monitor socket: '
- 'reason={reason}')
-
-
-class ServiceSocketBindFailed(ServiceException):
- message = ('Failed to bind service monitor socket: '
- 'path={path}, reason={reason}')
-
-
-class ServiceNoPidFile(ServiceException):
- message = ('Failed to update pid file: '
- 'path={path}, reason={reason}')
-
-
-class CommandFailed(ServiceException):
- message = ('Command failed: command={command}, '
- 'reason={reason}, out={out}')
-
-
-class CommandTimeout(ServiceException):
- message = ('Command timeout: command={command}, '
- 'timeout={timeout}')
-
-
-class CephMgrStartFailed(ServiceException):
- message = ('Failed to start ceph_mgr: '
- 'reason={reason}')
-
-
-class CephRestfulPluginFailed(ServiceException):
- message = ('Failed to start restful plugin: '
- 'reason={reason}')
-
-
-class RestApiPingFailed(ServiceException):
- message = ('REST API ping failed: '
- 'reason={reason}')
-
-
-class ServiceMonitor(object):
-
- """Configure and monitor ceph-mgr and restful plugin (Ceph REST API)
-
- 1. process init script service requests: status, stop. Requests are
- received via a control socket. Stop has priority over whatever
- the monitor is doing currently. Any ceph command that may be running
- is terminated/killed. Note that while ceph-mgr and restful plugin
- configuration is in progress ServiceMonitor reports status OK to
- avoid being restarted by SM.
-
- 2. configure ceph-mgr and mgr restful plugin: authentication, REST API
- service port, self signed certificate. This runs as a separate
- process so it can be stopped when init script requests it.
-
- 3. periodically check (ping) REST API responds to HTTPS requests.
- Recovery actions are taken if REST API fails to respond: restart
- ceph-mgr, wait for cluster to become available again.
- """
-
- def __init__(self):
- # process running configuration & REST API ping loop
- self.monitor = None
-
- # command socket used by init script
- self.command = None
-
- # ceph-mgr process
- self.ceph_mgr = None
-
- # consecutive ceph-mgr/restful-plugin start failures. Service monitor
- # reports failure after CONFIG.ceph_mgr_max_failure_count
- self.ceph_mgr_failure_count = 0
-
- # consecutive REST API ping failures. ceph-mgr service is restarted
- # after CONFIG.ping_fail_count_restart_mgr threshold is exceeded
- self.ping_failure_count = 0
-
- # REST API url reported by ceph-mgr after enabling restful plugin
- self.restful_plugin_url = ''
-
- # REST API self signed certificate generated by restful plugin
- self.certificate = ''
-
- def run(self):
- self.disable_certificate_check()
- with self.service_lock(), self.service_socket(), \
- self.service_pid_file():
- self.start_monitor()
- self.server_loop()
-
- def disable_certificate_check(self):
- # ceph-mgr restful plugin is configured with a self-signed
- # certificate. Certificate host is hard-coded to "ceph-restful"
- # which causes HTTPS requests to fail because they don't
- # match current host name ("controller-..."). Disable HTTPS
- # certificates check in urllib3
- LOG.warning('Disable urllib3 certifcates check')
- requests.packages.urllib3.disable_warnings()
-
- def server_loop(self):
- self.command.listen(2)
- while True:
- try:
- client, _ = self.command.accept()
- request = client.recv(CONFIG.service_socket_bufsize)
- LOG.debug('Monitor command socket: request=%s', str(request))
- cmd = request.split(' ')
- cmd, args = cmd[0], cmd[1:]
- if cmd == 'status':
- self.send_response(client, request, self.status())
- elif cmd == 'stop':
- self.stop()
- self.send_response(client, request, 'OK')
- break
- elif cmd == 'restful-url':
- try:
- self.restful_plugin_url = args[0]
- self.send_response(client, request, 'OK')
- except IndexError:
- LOG.warning('Failed to update restful plugin url: '
- 'args=%s', str(args))
- self.send_response(client, request, 'ERR')
- elif cmd == 'certificate':
- try:
- self.certificate = args[0] if args else ''
- self.send_response(client, request, 'OK')
- except IndexError:
- LOG.warning('Failed to update certificate path: '
- 'args=%s', str(args))
- self.send_response(client, request, 'ERR')
- elif cmd == 'ceph-mgr-failures':
- try:
- self.ceph_mgr_failure_count = int(args[0])
- self.send_response(client, request, 'OK')
- if self.ceph_mgr_failure_count >= CONFIG.ceph_mgr_fail_count_exit:
- self.stop()
- break
- except (IndexError, ValueError):
- LOG.warning('Failed to update ceph-mgr failures: '
- 'args=%s', str(args))
- self.send_response(client, request, 'ERR')
- elif cmd == 'ping-failures':
- try:
- self.ping_failure_count = int(args[0])
- self.send_response(client, request, 'OK')
- except (IndexError, ValueError):
- LOG.warning('Failed to update ping failures: '
- 'args=%s', str(args))
- self.send_response(client, request, 'ERR')
- except Exception as err:
- LOG.exception(err)
-
- @staticmethod
- def send_response(client, request, response):
- try:
- client.send(response)
- except socket.error as err:
- LOG.warning('Failed to send response back. '
- 'request=%s, response=%s, reason=%s',
- request, response, err)
-
- def status(self):
- if not self.restful_plugin_url:
- if self.ceph_mgr_failure_count < CONFIG.ceph_mgr_fail_count_report_error \
- and self.ping_failure_count < CONFIG.ping_fail_count_report_error:
- LOG.debug('Monitor is starting services. Report status OK')
- return 'OK'
- LOG.debug('Too many failures: '
- 'ceph_mgr=%d < %d, ping=%d < %d. '
- 'Report status ERR',
- self.ceph_mgr_failure_count,
- CONFIG.ceph_mgr_fail_count_report_error,
- self.ping_failure_count,
- CONFIG.ping_fail_count_report_error)
- return 'ERR.down'
- try:
- self.restful_plugin_ping()
- LOG.debug('Restful plugin ping successful. Report status OK')
- return 'OK'
- except (CommandFailed, RestApiPingFailed):
- if self.ceph_mgr_failure_count < CONFIG.ceph_mgr_fail_count_report_error \
- and self.ping_failure_count < CONFIG.ping_fail_count_report_error:
- LOG.info('Restful plugin does not respond but failure '
- 'count is within acceptable limits: '
- ' ceph_mgr=%d < %d, ping=%d < %d. '
- 'Report status OK',
- self.ceph_mgr_failure_count,
- CONFIG.ceph_mgr_fail_count_report_error,
- self.ping_failure_count,
- CONFIG.ping_fail_count_report_error)
- return 'OK'
- LOG.debug('Restful does not respond (ping failure count %d). '
- 'Report status ERR', self.ping_failure_count)
- return 'ERR.ping_failed'
-
- def stop(self):
- if not self.monitor:
- return
- LOG.info('Stop monitor with SIGTERM to process group %d',
- self.monitor.pid)
- try:
- os.killpg(self.monitor.pid, signal.SIGTERM)
- except OSError as err:
- LOG.info('Stop monitor failed: reason=%s', str(err))
- return
- time.sleep(CONFIG.ceph_mgr_kill_delay_sec)
- LOG.info('Stop monitor with SIGKILL to process group %d',
- self.monitor.pid)
- try:
- os.killpg(self.monitor.pid, signal.SIGKILL)
- os.waitpid(self.monitor.pid, 0)
- except OSError as err:
- LOG.info('Stop monitor failed: reason=%s', str(err))
- return
- LOG.info('Monitor stopped: pid=%d', self.monitor.pid)
-
- @contextlib.contextmanager
- def service_lock(self):
- LOG.info('Take service lock: path=%s', CONFIG.service_lock)
- try:
- os.makedirs(os.path.dirname(CONFIG.service_lock))
- except OSError:
- pass
- lock_file = open(CONFIG.service_lock, 'w')
- try:
- fcntl.flock(lock_file.fileno(),
- fcntl.LOCK_EX | fcntl.LOCK_NB)
- except (IOError, OSError) as err:
- if err.errno == errno.EAGAIN:
- raise ServiceAlreadyStarted()
- else:
- raise ServiceLockFailed(reason=str(err))
- # even if we have the lock here there might be another service manager
- # running whose CONFIG.ceph_mgr_rundir was removed before starting
- # this instance. Make sure there is only one service manager running
- self.stop_other_service_managers()
- try:
- yield
- finally:
- os.unlink(CONFIG.service_lock)
- lock_file.close()
- LOG.info('Release service lock: path=%s', CONFIG.service_lock)
-
- def stop_other_service_managers(self):
- service = os.path.join('/etc/init.d', CONFIG.service_name)
- for p in psutil.process_iter():
- if p.cmdline()[:2] not in [[service], ['/usr/bin/python', service]]:
- continue
- if p.pid == os.getpid():
- continue
- p.kill()
-
- @contextlib.contextmanager
- def service_socket(self):
- LOG.info('Create service socket')
- try:
- self.command = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
- except socket.error as err:
- raise ServiceNoSocket(reason=str(err))
- LOG.info('Remove existing socket files')
- try:
- os.unlink(CONFIG.service_socket)
- except OSError:
- pass
- LOG.info('Bind service socket: path=%s', CONFIG.service_socket)
- try:
- self.command.bind(CONFIG.service_socket)
- except socket.error as err:
- raise ServiceSocketBindFailed(
- path=CONFIG.service_socket, reason=str(err))
- try:
- yield
- finally:
- LOG.info('Close service socket and remove file: path=%s',
- CONFIG.service_socket)
- self.command.close()
- os.unlink(CONFIG.service_socket)
-
- @contextlib.contextmanager
- def service_pid_file(self):
- LOG.info('Update service pid file: path=%s', CONFIG.service_pid_file)
- try:
- pid_file = open(CONFIG.service_pid_file, 'w')
- pid_file.write(str(os.getpid()))
- pid_file.flush()
- except OSError as err:
- raise ServiceNoPidFile(
- path=CONFIG.service_pid_file, reason=str(err))
- try:
- yield
- finally:
- LOG.info('Remove service pid file: path=%s',
- CONFIG.service_pid_file)
- try:
- os.unlink(CONFIG.service_pid_file)
- except OSError:
- pass
-
- def start_monitor(self):
- LOG.info('Start monitor loop')
- self.monitor = multiprocessing.Process(target=self.monitor_loop)
- self.monitor.start()
-
- def stop_unmanaged_ceph_mgr(self):
- LOG.info('Stop unmanaged running ceph-mgr processes')
- service_name = os.path.basename(CONFIG.ceph_mgr_service)
- if self.ceph_mgr:
- psutil_terminate_kill(
- [proc for proc in psutil.process_iter()
- if (proc.name() == service_name
- and proc.pid != self.ceph_mgr.pid)],
- CONFIG.ceph_mgr_kill_delay_sec)
- else:
- psutil_terminate_kill(
- [proc for proc in psutil.process_iter()
- if proc.name() == service_name],
- CONFIG.ceph_mgr_kill_delay_sec)
-
- def monitor_loop(self):
-
- """Bring up and monitor ceph-mgr restful plugin.
-
- Steps:
- - wait for Ceph cluster to become available
- - configure and start ceph-mgr
- - configure and enable restful plugin
- - send periodic requests to REST API
- - recover from failures
-
- Note: because this runs as a separate process it
- must send status updates to service monitor
- via control socket for: ping_failure_count,
- restful_plugin_url and certificate.
- """
-
- # Promote to process group leader so parent (service monitor)
- # can kill the monitor plus processes spawned by it. Otherwise
- # children of monitor_loop() will keep running in background and
- # will be reaped by init when they finish but by then they might
- # interfere with any new service instance.
- os.setpgrp()
-
- # Ignoring SIGTERM here ensures process group is not reused by
- # the time parent (service monitor) issues the final SIGKILL.
- signal.signal(signal.SIGTERM, signal.SIG_IGN)
-
- while True:
- try:
- # steps to configure/start ceph-mgr and restful plugin
- self.ceph_fsid_get()
- self.ceph_mgr_auth_create()
- self.restful_plugin_set_server_port()
- self.restful_plugin_create_certificate()
- self.ceph_mgr_start()
- self.restful_plugin_enable()
- self.restful_plugin_create_admin_key()
- self.restful_plugin_get_url()
- self.restful_plugin_get_certificate()
-
- # REST API should be available now
- # start making periodic requests (ping)
- while True:
- try:
- self.restful_plugin_ping()
- self.ping_failure_count = 0
- self.request_update_ping_failures(
- self.ping_failure_count)
- self.ceph_mgr_failure_count = 0
- self.request_update_ceph_mgr_failures(
- self.ceph_mgr_failure_count)
- time.sleep(CONFIG.restful_plugin_ping_delay_sec)
- continue
- except RestApiPingFailed as err:
- LOG.warning(str(err))
-
- LOG.info('REST API ping failure count=%d',
- self.ping_failure_count)
- self.ping_failure_count += 1
- self.request_update_ping_failures(
- self.ping_failure_count)
-
- # maybe request failed because ceph-mgr is not running
- if not self.ceph_mgr_is_running():
- self.ceph_mgr_failure_count += 1
- self.request_update_ceph_mgr_failures(
- self.ceph_mgr_failure_count)
- self.ceph_mgr_start()
- time.sleep(CONFIG.ceph_mgr_grace_period_sec)
- continue
-
- # maybe request failed because cluster health is not ok
- if not self.ceph_fsid_get():
- LOG.info('Unable to get cluster fsid. '
- 'Sleep for a while')
- time.sleep(CONFIG.cluster_grace_period_sec)
- break
-
- # too many failures? Restart ceph-mgr and go again
- # through configuration steps
- if (self.ping_failure_count
- % CONFIG.ping_fail_count_restart_mgr == 0):
- LOG.info('Too many consecutive REST API failures. '
- 'Restart ceph-mgr. Update service '
- 'url and certificate')
- self.ceph_mgr_stop()
- self.restful_plugin_url = ''
- self.request_update_plugin_url(self.restful_plugin_url)
- self.certificate = ''
- self.request_update_certificate(self.certificate)
- break
-
- time.sleep(CONFIG.restful_plugin_ping_delay_sec)
-
- except CommandFailed as err:
- LOG.warning(str(err))
- time.sleep(CONFIG.cluster_grace_period_sec)
- except CommandTimeout as err:
- LOG.warning(str(err))
- except (CephMgrStartFailed, CephRestfulPluginFailed) as err:
- LOG.warning(str(err))
- self.ceph_mgr_failure_count += 1
- self.request_update_ceph_mgr_failures(
- self.ceph_mgr_failure_count)
- time.sleep(CONFIG.ceph_mgr_grace_period_sec)
- except Exception as err:
- LOG.exception(err)
- time.sleep(CONFIG.cluster_grace_period_sec)
-
- @staticmethod
- def run_with_timeout(command, timeout, stderr=subprocess.STDOUT):
- try:
- LOG.info('Run command: %s', ' '.join(command))
- return subprocess.check_output(
- ['/usr/bin/timeout', str(timeout)] + command,
- stderr=stderr, shell=False).strip()
- except subprocess.CalledProcessError as err:
- if err.returncode == GNU_TIMEOUT_EXPIRED_RETCODE:
- raise CommandTimeout(command=err.cmd, timeout=timeout)
- raise CommandFailed(command=err.cmd, reason=str(err),
- out=err.output)
-
- def ceph_fsid_get(self):
- return self.run_with_timeout(['/usr/bin/ceph', 'fsid'],
- CONFIG.ceph_cli_timeout_sec)
-
- def ceph_mgr_has_auth(self):
- path = '{}/ceph-{}'.format(
- CONFIG.ceph_mgr_confdir, CONFIG.ceph_mgr_identity)
- try:
- os.makedirs(path)
- except OSError as err:
- pass
- try:
- self.run_with_timeout(
- ['/usr/bin/ceph', 'auth', 'get',
- 'mgr.{}'.format(CONFIG.ceph_mgr_identity),
- '-o', '{}/keyring'.format(path)],
- CONFIG.ceph_cli_timeout_sec)
- return True
- except CommandFailed as err:
- if 'ENOENT' in str(err):
- return False
- raise
-
- def ceph_mgr_auth_create(self):
- if self.ceph_mgr_has_auth():
- return
- LOG.info('Create ceph-mgr authentication')
- self.run_with_timeout(
- ['/usr/bin/ceph', 'auth', 'get-or-create',
- 'mgr.{}'.format(CONFIG.ceph_mgr_identity),
- 'mon', 'allow *', 'osd', 'allow *'],
- CONFIG.ceph_cli_timeout_sec)
-
- def ceph_mgr_is_running(self):
- if not self.ceph_mgr:
- return None
- try:
- self.ceph_mgr.wait(timeout=0)
- except psutil.TimeoutExpired:
- return True
- return False
-
- def ceph_mgr_start(self):
- if self.ceph_mgr_is_running():
- return
- self.stop_unmanaged_ceph_mgr()
- LOG.info('Start ceph-mgr daemon')
- try:
- with open(os.devnull, 'wb') as null:
- self.ceph_mgr = psutil.Popen(
- [CONFIG.ceph_mgr_service,
- '--cluster', CONFIG.ceph_mgr_cluster,
- '--conf', CONFIG.ceph_mgr_config,
- '--id', CONFIG.ceph_mgr_identity,
- '-f'],
- close_fds=True,
- stdout=null,
- stderr=null,
- shell=False)
- except (OSError, ValueError) as err:
- raise CephMgrStartFailed(reason=str(err))
- time.sleep(CONFIG.ceph_mgr_grace_period_sec)
-
- def ceph_mgr_stop(self):
- if not self.ceph_mgr:
- return
- LOG.info('Stop ceph-mgr')
- psutil_terminate_kill(self.ceph_mgr, CONFIG.ceph_mgr_kill_delay_sec)
-
- def restful_plugin_has_server_port(self):
- try:
- with open(os.devnull, 'wb') as null:
- out = self.run_with_timeout(
- ['/usr/bin/ceph', 'config-key', 'get',
- 'mgr/restful/server_port'],
- CONFIG.ceph_cli_timeout_sec, stderr=null)
- if out == str(CONFIG.restful_plugin_port):
- return True
- LOG.warning('Restful plugin port mismatch: '
- 'current=%d, expected=%d', out,
- CONFIG.restful_plugin_port)
- except CommandFailed as err:
- LOG.warning('Failed to get restful plugin port: '
- 'reason=%s', str(err))
- return False
-
- def restful_plugin_set_server_port(self):
- if self.restful_plugin_has_server_port():
- return
- LOG.info('Set restful plugin port=%d', CONFIG.restful_plugin_port)
- self.run_with_timeout(
- ['/usr/bin/ceph', 'config-key', 'set',
- 'mgr/restful/server_port', str(CONFIG.restful_plugin_port)],
- CONFIG.ceph_cli_timeout_sec)
-
- def restful_plugin_has_admin_key(self):
- try:
- self.run_with_timeout(
- ['/usr/bin/ceph', 'config-key', 'get',
- 'mgr/restful/keys/admin'],
- CONFIG.ceph_cli_timeout_sec)
- return True
- except CommandFailed:
- pass
- return False
-
- def restful_plugin_create_admin_key(self):
- if self.restful_plugin_has_admin_key():
- return
- LOG.info('Create restful plugin admin key')
- self.run_with_timeout(
- ['/usr/bin/ceph', 'restful',
- 'create-key', 'admin'],
- CONFIG.ceph_cli_timeout_sec)
-
- def restful_plugin_has_certificate(self):
- try:
- self.run_with_timeout(
- ['/usr/bin/ceph', 'config-key', 'get',
- 'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)],
- CONFIG.ceph_cli_timeout_sec)
- self.run_with_timeout(
- ['/usr/bin/ceph', 'config-key', 'get',
- 'mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)],
- CONFIG.ceph_cli_timeout_sec)
- self.run_with_timeout(
- ['/usr/bin/ceph', 'config-key', 'get',
- 'config/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity)],
- CONFIG.ceph_cli_timeout_sec)
- self.run_with_timeout(
- ['/usr/bin/ceph', 'config-key', 'get',
- '/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity)],
- CONFIG.ceph_cli_timeout_sec)
- return True
- except CommandFailed:
- pass
- return False
-
- def restful_plugin_create_certificate(self):
- if self.restful_plugin_has_certificate():
- return
- LOG.info('Create restful plugin self signed certificate')
- path = tempfile.mkdtemp()
- try:
- try:
- with tempfile.NamedTemporaryFile() as restful_cnf:
- restful_cnf.write((
- '[req]\n'
- 'req_extensions = v3_ca\n'
- 'distinguished_name = req_distinguished_name\n'
- '[v3_ca]\n'
- 'subjectAltName=DNS:{}\n'
- 'basicConstraints = CA:true\n'
- '[ req_distinguished_name ]\n'
- '0.organizationName = IT\n'
- 'commonName = ceph-restful\n').format(
- CONFIG.ceph_mgr_identity))
- restful_cnf.flush()
- subprocess.check_call([
- '/usr/bin/openssl', 'req', '-new', '-nodes', '-x509',
- '-subj', '/O=IT/CN=' + CONFIG.ceph_mgr_identity,
- '-days', '3650',
- '-config', restful_cnf.name,
- '-out', os.path.join(path, 'crt'),
- '-keyout', os.path.join(path, 'key'),
- '-extensions', 'v3_ca'])
- except subprocess.CalledProcessError as err:
- raise CommandFailed(
- command=' '.join(err.cmd),
- reason='failed to generate self-signed certificate: {}'.format(str(err)),
- out=err.output)
- self.run_with_timeout(
- ['/usr/bin/ceph', 'config-key', 'set',
- 'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity),
- '-i', os.path.join(path, 'crt')],
- CONFIG.ceph_cli_timeout_sec)
- self.run_with_timeout(
- ['/usr/bin/ceph', 'config-key', 'set',
- 'mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity),
- '-i', os.path.join(path, 'crt')],
- CONFIG.ceph_cli_timeout_sec)
- self.run_with_timeout(
- ['/usr/bin/ceph', 'config-key', 'set',
- 'config/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity),
- '-i', os.path.join(path, 'key')],
- CONFIG.ceph_cli_timeout_sec)
- self.run_with_timeout(
- ['/usr/bin/ceph', 'config-key', 'set',
- 'mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity),
- '-i', os.path.join(path, 'key')],
- CONFIG.ceph_cli_timeout_sec)
- finally:
- shutil.rmtree(path)
-
- def restful_plugin_is_enabled(self):
- command = ['/usr/bin/ceph', 'mgr', 'module', 'ls',
- '--format', 'json']
- with open(os.devnull, 'wb') as null:
- out = self.run_with_timeout(
- command, CONFIG.ceph_cli_timeout_sec, stderr=null)
- try:
- if 'restful' in json.loads(out)['enabled_modules']:
- return True
- except ValueError as err:
- raise CommandFailed(
- command=' '.join(command),
- reason='unable to decode json: {}'.format(err), out=out)
- except KeyError as err:
- raise CommandFailed(
- command=' '.join(command),
- reason='missing expected key: {}'.format(err), out=out)
- return False
-
- def restful_plugin_enable(self):
- if not self.restful_plugin_is_enabled():
- LOG.info('Enable restful plugin')
- self.run_with_timeout(
- ['/usr/bin/ceph', 'mgr',
- 'module', 'enable', 'restful'],
- CONFIG.ceph_cli_timeout_sec)
- time.sleep(CONFIG.restful_plugin_grace_period_sec)
-
- def restful_plugin_get_url(self):
- command = ['/usr/bin/ceph', 'mgr', 'services',
- '--format', 'json']
- with open(os.devnull, 'wb') as null:
- out = self.run_with_timeout(
- command, CONFIG.ceph_cli_timeout_sec, stderr=null)
- try:
- self.restful_plugin_url = json.loads(out)['restful']
- except ValueError as err:
- raise CephRestfulPluginFailed(
- reason='unable to decode json: {} output={}'.format(err, out))
- except KeyError as err:
- raise CephRestfulPluginFailed(
- reason='missing expected key: {} in ouput={}'.format(err, out))
- self.request_update_plugin_url(self.restful_plugin_url)
-
- def restful_plugin_get_certificate(self):
- command = ['/usr/bin/ceph', 'config-key', 'get',
- 'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)]
- with open(os.devnull, 'wb') as null:
- certificate = self.run_with_timeout(
- command, CONFIG.ceph_cli_timeout_sec, stderr=null)
- with open(CONFIG.restful_plugin_cert_path, 'wb') as cert_file:
- cert_file.write(certificate)
- self.certificate = CONFIG.restful_plugin_cert_path
- self.request_update_certificate(
- self.certificate)
-
- def restful_plugin_ping(self):
- if not self.restful_plugin_url:
- raise RestApiPingFailed(reason='missing service url')
- if not self.certificate:
- raise RestApiPingFailed(reason='missing certificate')
- LOG.debug('Ping restful plugin: url=%d', self.restful_plugin_url)
- try:
- response = requests.request(
- 'GET', self.restful_plugin_url, verify=False,
- timeout=CONFIG.rest_api_timeout_sec)
- if not response.ok:
- raise RestApiPingFailed(
- reason='response not ok ({})'.format(response))
- LOG.debug('Ping restful plugin OK')
- except (requests.ConnectionError,
- requests.Timeout,
- requests.HTTPError) as err:
- raise RestApiPingFailed(reason=str(err))
-
- @staticmethod
- def _make_client_socket():
- sock = socket.socket(
- socket.AF_UNIX, socket.SOCK_SEQPACKET)
- sock.settimeout(2 * CONFIG.rest_api_timeout_sec)
- sock.connect(CONFIG.service_socket)
- return sock
-
- @staticmethod
- def request_status():
- try:
- with contextlib.closing(
- ServiceMonitor._make_client_socket()) as sock:
- sock.send('status')
- status = sock.recv(CONFIG.service_socket_bufsize)
- LOG.debug('Status %s', status)
- return status.startswith('OK')
- except socket.error as err:
- LOG.error('Status error: reason=%s', err)
- return False
-
- @staticmethod
- def request_stop():
- try:
- with contextlib.closing(
- ServiceMonitor._make_client_socket()) as sock:
- sock.send('stop')
- response = sock.recv(CONFIG.service_socket_bufsize)
- LOG.debug('Stop response: %s', response)
- return True
- except socket.error as err:
- LOG.error('Stop error: reason=%s', err)
- return False
-
- @staticmethod
- def request_update_ceph_mgr_failures(count):
- try:
- with contextlib.closing(
- ServiceMonitor._make_client_socket()) as sock:
- sock.send('ceph-mgr-failures {}'.format(count))
- sock.recv(CONFIG.service_socket_bufsize)
- return True
- except socket.error as err:
- LOG.error('Stop error: reason=%s', err)
- return False
-
- @staticmethod
- def request_update_ping_failures(count):
- try:
- with contextlib.closing(
- ServiceMonitor._make_client_socket()) as sock:
- sock.send('ping-failures {}'.format(count))
- sock.recv(CONFIG.service_socket_bufsize)
- return True
- except socket.error as err:
- LOG.error('Stop error: reason=%s', err)
- return False
-
- @staticmethod
- def request_update_plugin_url(url):
- try:
- with contextlib.closing(
- ServiceMonitor._make_client_socket()) as sock:
- sock.send('restful-url {}'.format(url))
- sock.recv(CONFIG.service_socket_bufsize)
- return True
- except socket.error as err:
- LOG.error('Stop error: reason=%s', err)
- return False
-
- @staticmethod
- def request_update_certificate(path):
- try:
- with contextlib.closing(
- ServiceMonitor._make_client_socket()) as sock:
- sock.send('certificate {}'.format(path))
- sock.recv(CONFIG.service_socket_bufsize)
- return True
- except socket.error as err:
- LOG.error('Stop error: reason=%s', err)
- return False
-
-
-class InitWrapper(object):
-
- """Handle System V init script actions: start, stop, restart, etc. """
-
- def __init__(self):
-
- """Dispatch command line action to the corresponding function.
-
- Candidate action functions are all class methods except ones
- that start with an underscore.
- """
-
- parser = argparse.ArgumentParser()
- actions = [m[0]
- for m in inspect.getmembers(self)
- if (inspect.ismethod(m[1])
- and not m[0].startswith('_'))]
- parser.add_argument(
- 'action',
- choices=actions)
- self.args = parser.parse_args()
- getattr(self, self.args.action)()
-
- def start(self):
-
- """Start ServiceMonitor as a daemon unless one is already running.
-
- Use a pipe to report monitor status back to this process.
- """
-
- pipe = os.pipe()
- child = os.fork()
- if child == 0:
- os.close(pipe[0])
- with daemon.DaemonContext(files_preserve=[pipe[1]]):
- # prevent duplication of messages in log
- global LOG
- LOG = setup_logging(cleanup_handlers=True)
- try:
- monitor = ServiceMonitor()
- status = 'OK'
- except ServiceAlreadyStarted:
- os.write(pipe[1], 'OK')
- os.close(pipe[1])
- return
- except Exception as err:
- status = str(err)
- os.write(pipe[1], status)
- os.close(pipe[1])
- if status == 'OK':
- try:
- monitor.run()
- except ServiceException as err:
- LOG.warning(str(err))
- except Exception as err:
- LOG.exception('Service monitor error: reason=%s', err)
- else:
- os.close(pipe[1])
- try:
- status = os.read(pipe[0], CONFIG.service_socket_bufsize)
- if status == 'OK':
- sys.exit(0)
- else:
- LOG.warning('Service monitor failed to start: '
- 'status=%s', status)
- except IOError as err:
- LOG.warning('Failed to read monitor status: reason=%s', err)
- os.close(pipe[0])
- os.waitpid(child, 0)
- sys.exit(1)
-
- def stop(self):
-
- """Tell ServiceMonitor daemon to stop running.
-
- In case request fails stop ServiceMonitor and ceph_mgr proecsses
- using SIGTERM followed by SIGKILL.
- """
-
- result = ServiceMonitor.request_stop()
- if not result:
- ceph_mgr = os.path.basename(CONFIG.ceph_mgr_service)
- procs = []
- for proc in psutil.process_iter():
- name = proc.name()
- if name == CONFIG.service_name:
- procs.append(proc)
- if name == ceph_mgr:
- procs.append(proc)
- psutil_terminate_kill(procs, CONFIG.ceph_mgr_kill_delay_sec)
-
- def restart(self):
- self.stop()
- self.start()
-
- def force_reload(self):
- self.stop()
- self.start()
-
- def reload(self):
- self.stop()
- self.start()
-
- def status(self):
-
- """Report status from ServiceMonitor.
-
- We don't just try to access REST API here because ServiceMonitor may
- be in the process of starting/configuring ceph-mgr and restful
- plugin in which case we report OK to avoid being restarted by SM.
- """
-
- status = ServiceMonitor.request_status()
- sys.exit(0 if status is True else 1)
-
-
-if __name__ == '__main__':
- InitWrapper()