X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=meta-starlingx%2Fmeta-stx-virt%2Frecipes-extended%2Fceph%2Ffiles%2Fmgr-restful-plugin.py;fp=meta-starlingx%2Fmeta-stx-virt%2Frecipes-extended%2Fceph%2Ffiles%2Fmgr-restful-plugin.py;h=0000000000000000000000000000000000000000;hb=6fc6934434f70595536a387ece31bc30141cafb5;hp=d1f14b835b646835619ceecbd3ed4c765a3616be;hpb=eb1e26510491ba49de693ab3b0498edcb06be6c5;p=pti%2Frtp.git diff --git a/meta-starlingx/meta-stx-virt/recipes-extended/ceph/files/mgr-restful-plugin.py b/meta-starlingx/meta-stx-virt/recipes-extended/ceph/files/mgr-restful-plugin.py deleted file mode 100644 index d1f14b8..0000000 --- a/meta-starlingx/meta-stx-virt/recipes-extended/ceph/files/mgr-restful-plugin.py +++ /dev/null @@ -1,1121 +0,0 @@ -#!/usr/bin/python -# -# Copyright (c) 2019 Wind River Systems, Inc. -# -# SPDX-License-Identifier: Apache-2.0 -# - - -### BEGIN INIT INFO -# Provides: ceph/mgr RESTful API plugin -# Required-Start: $ceph -# Required-Stop: $ceph -# Default-Start: 2 3 4 5 -# Default-Stop: 0 1 6 -# Short-Description: Ceph MGR RESTful API plugin -# Description: Ceph MGR RESTful API plugin -### END INIT INFO - -import argparse -import contextlib -import errno -import fcntl -import inspect -import json -import logging -import multiprocessing -import os -import shutil -import signal -import socket -import subprocess -import sys -import tempfile -import time - -import daemon -import psutil -import requests - -# 'timeout' command returns exit status 124 -# if command times out (see man page) -GNU_TIMEOUT_EXPIRED_RETCODE = 124 - - -def psutil_terminate_kill(target, timeout): - - """Extend psutil functionality to stop a process. - - SIGINT is sent to each target then after a grace period SIGKILL - is sent to the ones that are still running. - """ - - if not isinstance(target, list): - target = [target] - _, target = psutil.wait_procs(target, timeout=0) - for action in [lambda p: p.terminate(), lambda p: p.kill()]: - for proc in target: - action(proc) - _, target = psutil.wait_procs( - target, timeout=timeout) - - -class Config(object): - - """ceph-mgr service wrapper configuration options. - - In the future we may want to load them from a configuration file - (for example /etc/ceph/mgr-restful-plugin.conf ) - """ - - def __init__(self): - self.log_level = logging.INFO - self.log_dir = '/var/log' - - self.ceph_mgr_service = '/usr/bin/ceph-mgr' - self.ceph_mgr_config = '/etc/ceph/ceph.conf' - self.ceph_mgr_cluster = 'ceph' - self.ceph_mgr_rundir = '/var/run/ceph/mgr' - self.ceph_mgr_confdir = '/var/lib/ceph/mgr' - self.ceph_mgr_identity = socket.gethostname() - - self.service_name = 'mgr-restful-plugin' - self.service_socket = os.path.join( - self.ceph_mgr_rundir, '{}.socket'.format(self.service_name)) - self.service_lock = os.path.join( - self.ceph_mgr_rundir, '{}.lock'.format(self.service_name)) - self.service_pid_file = os.path.join( - '/var/run/ceph', '{}.pid'.format(self.service_name)) - - self.restful_plugin_port = 5001 - - # maximum size of a message received/sent via - # service monitor control socket - self.service_socket_bufsize = 1024 - - # maximum time to wait for ceph cli to exit - self.ceph_cli_timeout_sec = 30 - - # how much time to wait after ceph cli commands fail with timeout - # before running any other commands - self.cluster_grace_period_sec = 30 - - # after ceph-mgr is started it goes through an internal initialization - # phase before; how much time to wait before querying ceph-mgr - self.ceph_mgr_grace_period_sec = 15 - - # after sending SIGTERM to ceph-mgr how much time to wait before - # sending SIGKILL (maximum time allowed for ceph-mgr cleanup) - self.ceph_mgr_kill_delay_sec = 5 - - # if service monitor is running a recovery procedure it reports - # status OK even if ceph-mgr is currently down. This sets the - # maximum number of consecutive ceph-mgr failures before reporting - # status error - self.ceph_mgr_fail_count_report_error = 3 - - # maximum number of consecutive ceph-mgr failures before - # stopping mgr-restful-plugin service - self.ceph_mgr_fail_count_exit = 5 - - # maximum time allowed for ceph-mgr to respond to a REST API request - self.rest_api_timeout_sec = 15 - - # interval between consecutive REST API requests (ping's). A smaller - # value here triggers more requests to ceph-mgr restful plugin. A - # higher value makes recovery slower when services become unavailable - self.restful_plugin_ping_delay_sec = 3 - - # where to save the self-signed certificate generated by ceph-mgr - self.restful_plugin_cert_path = os.path.join( - self.ceph_mgr_rundir, 'restful.crt') - - # time to wait after enabling restful plugin - self.restful_plugin_grace_period_sec = 3 - - # after how many REST API ping failures to restart ceph-mgr - self.ping_fail_count_restart_mgr = 3 - - # after how many REST API ping failures to report status error. - # Until then service monitor reports status OK just in case - # restful plugin recovers - self.ping_fail_count_report_error = 5 - - @staticmethod - def load(): - return Config() - - -def setup_logging(name=None, cleanup_handlers=False): - if not name: - name = CONFIG.service_name - log = logging.getLogger(name) - log.setLevel(CONFIG.log_level) - if cleanup_handlers: - try: - for handler in log.handlers: - if isinstance(handler, logging.StreamHandler): - handler.flush() - if isinstance(handler, logging.FileHandler): - handler.close() - log.handlers = [] - except Exception: - pass - elif log.handlers: - return log - handler = logging.FileHandler( - os.path.join(CONFIG.log_dir, - '{}.log'.format(CONFIG.service_name))) - handler.setFormatter( - logging.Formatter('%(asctime)s %(process)s %(levelname)s %(name)s %(message)s')) - log.addHandler(handler) - return log - - -CONFIG = Config.load() -LOG = setup_logging(name='init-wrapper') - - -class ServiceException(Exception): - - """Generic mgr-restful-plugin service exception. - - Build exception string based on static (per exception class) - string plus args, keyword args passed to exception constructor. - """ - - message = "" - - def __init__(self, *args, **kwargs): - if "message" not in kwargs: - try: - message = self.message.format(*args, **kwargs) - except Exception: # noqa - message = '{}, args:{}, kwargs: {}'.format( - self.message, args, kwargs) - else: - message = kwargs["message"] - super(ServiceException, self).__init__(message) - - -class ServiceAlreadyStarted(ServiceException): - message = ('Service monitor already started') - - -class ServiceLockFailed(ServiceException): - message = ('Unable to lock service monitor: ' - 'reason={reason}') - - -class ServiceNoSocket(ServiceException): - message = ('Unable to create service monitor socket: ' - 'reason={reason}') - - -class ServiceSocketBindFailed(ServiceException): - message = ('Failed to bind service monitor socket: ' - 'path={path}, reason={reason}') - - -class ServiceNoPidFile(ServiceException): - message = ('Failed to update pid file: ' - 'path={path}, reason={reason}') - - -class CommandFailed(ServiceException): - message = ('Command failed: command={command}, ' - 'reason={reason}, out={out}') - - -class CommandTimeout(ServiceException): - message = ('Command timeout: command={command}, ' - 'timeout={timeout}') - - -class CephMgrStartFailed(ServiceException): - message = ('Failed to start ceph_mgr: ' - 'reason={reason}') - - -class CephRestfulPluginFailed(ServiceException): - message = ('Failed to start restful plugin: ' - 'reason={reason}') - - -class RestApiPingFailed(ServiceException): - message = ('REST API ping failed: ' - 'reason={reason}') - - -class ServiceMonitor(object): - - """Configure and monitor ceph-mgr and restful plugin (Ceph REST API) - - 1. process init script service requests: status, stop. Requests are - received via a control socket. Stop has priority over whatever - the monitor is doing currently. Any ceph command that may be running - is terminated/killed. Note that while ceph-mgr and restful plugin - configuration is in progress ServiceMonitor reports status OK to - avoid being restarted by SM. - - 2. configure ceph-mgr and mgr restful plugin: authentication, REST API - service port, self signed certificate. This runs as a separate - process so it can be stopped when init script requests it. - - 3. periodically check (ping) REST API responds to HTTPS requests. - Recovery actions are taken if REST API fails to respond: restart - ceph-mgr, wait for cluster to become available again. - """ - - def __init__(self): - # process running configuration & REST API ping loop - self.monitor = None - - # command socket used by init script - self.command = None - - # ceph-mgr process - self.ceph_mgr = None - - # consecutive ceph-mgr/restful-plugin start failures. Service monitor - # reports failure after CONFIG.ceph_mgr_max_failure_count - self.ceph_mgr_failure_count = 0 - - # consecutive REST API ping failures. ceph-mgr service is restarted - # after CONFIG.ping_fail_count_restart_mgr threshold is exceeded - self.ping_failure_count = 0 - - # REST API url reported by ceph-mgr after enabling restful plugin - self.restful_plugin_url = '' - - # REST API self signed certificate generated by restful plugin - self.certificate = '' - - def run(self): - self.disable_certificate_check() - with self.service_lock(), self.service_socket(), \ - self.service_pid_file(): - self.start_monitor() - self.server_loop() - - def disable_certificate_check(self): - # ceph-mgr restful plugin is configured with a self-signed - # certificate. Certificate host is hard-coded to "ceph-restful" - # which causes HTTPS requests to fail because they don't - # match current host name ("controller-..."). Disable HTTPS - # certificates check in urllib3 - LOG.warning('Disable urllib3 certifcates check') - requests.packages.urllib3.disable_warnings() - - def server_loop(self): - self.command.listen(2) - while True: - try: - client, _ = self.command.accept() - request = client.recv(CONFIG.service_socket_bufsize) - LOG.debug('Monitor command socket: request=%s', str(request)) - cmd = request.split(' ') - cmd, args = cmd[0], cmd[1:] - if cmd == 'status': - self.send_response(client, request, self.status()) - elif cmd == 'stop': - self.stop() - self.send_response(client, request, 'OK') - break - elif cmd == 'restful-url': - try: - self.restful_plugin_url = args[0] - self.send_response(client, request, 'OK') - except IndexError: - LOG.warning('Failed to update restful plugin url: ' - 'args=%s', str(args)) - self.send_response(client, request, 'ERR') - elif cmd == 'certificate': - try: - self.certificate = args[0] if args else '' - self.send_response(client, request, 'OK') - except IndexError: - LOG.warning('Failed to update certificate path: ' - 'args=%s', str(args)) - self.send_response(client, request, 'ERR') - elif cmd == 'ceph-mgr-failures': - try: - self.ceph_mgr_failure_count = int(args[0]) - self.send_response(client, request, 'OK') - if self.ceph_mgr_failure_count >= CONFIG.ceph_mgr_fail_count_exit: - self.stop() - break - except (IndexError, ValueError): - LOG.warning('Failed to update ceph-mgr failures: ' - 'args=%s', str(args)) - self.send_response(client, request, 'ERR') - elif cmd == 'ping-failures': - try: - self.ping_failure_count = int(args[0]) - self.send_response(client, request, 'OK') - except (IndexError, ValueError): - LOG.warning('Failed to update ping failures: ' - 'args=%s', str(args)) - self.send_response(client, request, 'ERR') - except Exception as err: - LOG.exception(err) - - @staticmethod - def send_response(client, request, response): - try: - client.send(response) - except socket.error as err: - LOG.warning('Failed to send response back. ' - 'request=%s, response=%s, reason=%s', - request, response, err) - - def status(self): - if not self.restful_plugin_url: - if self.ceph_mgr_failure_count < CONFIG.ceph_mgr_fail_count_report_error \ - and self.ping_failure_count < CONFIG.ping_fail_count_report_error: - LOG.debug('Monitor is starting services. Report status OK') - return 'OK' - LOG.debug('Too many failures: ' - 'ceph_mgr=%d < %d, ping=%d < %d. ' - 'Report status ERR', - self.ceph_mgr_failure_count, - CONFIG.ceph_mgr_fail_count_report_error, - self.ping_failure_count, - CONFIG.ping_fail_count_report_error) - return 'ERR.down' - try: - self.restful_plugin_ping() - LOG.debug('Restful plugin ping successful. Report status OK') - return 'OK' - except (CommandFailed, RestApiPingFailed): - if self.ceph_mgr_failure_count < CONFIG.ceph_mgr_fail_count_report_error \ - and self.ping_failure_count < CONFIG.ping_fail_count_report_error: - LOG.info('Restful plugin does not respond but failure ' - 'count is within acceptable limits: ' - ' ceph_mgr=%d < %d, ping=%d < %d. ' - 'Report status OK', - self.ceph_mgr_failure_count, - CONFIG.ceph_mgr_fail_count_report_error, - self.ping_failure_count, - CONFIG.ping_fail_count_report_error) - return 'OK' - LOG.debug('Restful does not respond (ping failure count %d). ' - 'Report status ERR', self.ping_failure_count) - return 'ERR.ping_failed' - - def stop(self): - if not self.monitor: - return - LOG.info('Stop monitor with SIGTERM to process group %d', - self.monitor.pid) - try: - os.killpg(self.monitor.pid, signal.SIGTERM) - except OSError as err: - LOG.info('Stop monitor failed: reason=%s', str(err)) - return - time.sleep(CONFIG.ceph_mgr_kill_delay_sec) - LOG.info('Stop monitor with SIGKILL to process group %d', - self.monitor.pid) - try: - os.killpg(self.monitor.pid, signal.SIGKILL) - os.waitpid(self.monitor.pid, 0) - except OSError as err: - LOG.info('Stop monitor failed: reason=%s', str(err)) - return - LOG.info('Monitor stopped: pid=%d', self.monitor.pid) - - @contextlib.contextmanager - def service_lock(self): - LOG.info('Take service lock: path=%s', CONFIG.service_lock) - try: - os.makedirs(os.path.dirname(CONFIG.service_lock)) - except OSError: - pass - lock_file = open(CONFIG.service_lock, 'w') - try: - fcntl.flock(lock_file.fileno(), - fcntl.LOCK_EX | fcntl.LOCK_NB) - except (IOError, OSError) as err: - if err.errno == errno.EAGAIN: - raise ServiceAlreadyStarted() - else: - raise ServiceLockFailed(reason=str(err)) - # even if we have the lock here there might be another service manager - # running whose CONFIG.ceph_mgr_rundir was removed before starting - # this instance. Make sure there is only one service manager running - self.stop_other_service_managers() - try: - yield - finally: - os.unlink(CONFIG.service_lock) - lock_file.close() - LOG.info('Release service lock: path=%s', CONFIG.service_lock) - - def stop_other_service_managers(self): - service = os.path.join('/etc/init.d', CONFIG.service_name) - for p in psutil.process_iter(): - if p.cmdline()[:2] not in [[service], ['/usr/bin/python', service]]: - continue - if p.pid == os.getpid(): - continue - p.kill() - - @contextlib.contextmanager - def service_socket(self): - LOG.info('Create service socket') - try: - self.command = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) - except socket.error as err: - raise ServiceNoSocket(reason=str(err)) - LOG.info('Remove existing socket files') - try: - os.unlink(CONFIG.service_socket) - except OSError: - pass - LOG.info('Bind service socket: path=%s', CONFIG.service_socket) - try: - self.command.bind(CONFIG.service_socket) - except socket.error as err: - raise ServiceSocketBindFailed( - path=CONFIG.service_socket, reason=str(err)) - try: - yield - finally: - LOG.info('Close service socket and remove file: path=%s', - CONFIG.service_socket) - self.command.close() - os.unlink(CONFIG.service_socket) - - @contextlib.contextmanager - def service_pid_file(self): - LOG.info('Update service pid file: path=%s', CONFIG.service_pid_file) - try: - pid_file = open(CONFIG.service_pid_file, 'w') - pid_file.write(str(os.getpid())) - pid_file.flush() - except OSError as err: - raise ServiceNoPidFile( - path=CONFIG.service_pid_file, reason=str(err)) - try: - yield - finally: - LOG.info('Remove service pid file: path=%s', - CONFIG.service_pid_file) - try: - os.unlink(CONFIG.service_pid_file) - except OSError: - pass - - def start_monitor(self): - LOG.info('Start monitor loop') - self.monitor = multiprocessing.Process(target=self.monitor_loop) - self.monitor.start() - - def stop_unmanaged_ceph_mgr(self): - LOG.info('Stop unmanaged running ceph-mgr processes') - service_name = os.path.basename(CONFIG.ceph_mgr_service) - if self.ceph_mgr: - psutil_terminate_kill( - [proc for proc in psutil.process_iter() - if (proc.name() == service_name - and proc.pid != self.ceph_mgr.pid)], - CONFIG.ceph_mgr_kill_delay_sec) - else: - psutil_terminate_kill( - [proc for proc in psutil.process_iter() - if proc.name() == service_name], - CONFIG.ceph_mgr_kill_delay_sec) - - def monitor_loop(self): - - """Bring up and monitor ceph-mgr restful plugin. - - Steps: - - wait for Ceph cluster to become available - - configure and start ceph-mgr - - configure and enable restful plugin - - send periodic requests to REST API - - recover from failures - - Note: because this runs as a separate process it - must send status updates to service monitor - via control socket for: ping_failure_count, - restful_plugin_url and certificate. - """ - - # Promote to process group leader so parent (service monitor) - # can kill the monitor plus processes spawned by it. Otherwise - # children of monitor_loop() will keep running in background and - # will be reaped by init when they finish but by then they might - # interfere with any new service instance. - os.setpgrp() - - # Ignoring SIGTERM here ensures process group is not reused by - # the time parent (service monitor) issues the final SIGKILL. - signal.signal(signal.SIGTERM, signal.SIG_IGN) - - while True: - try: - # steps to configure/start ceph-mgr and restful plugin - self.ceph_fsid_get() - self.ceph_mgr_auth_create() - self.restful_plugin_set_server_port() - self.restful_plugin_create_certificate() - self.ceph_mgr_start() - self.restful_plugin_enable() - self.restful_plugin_create_admin_key() - self.restful_plugin_get_url() - self.restful_plugin_get_certificate() - - # REST API should be available now - # start making periodic requests (ping) - while True: - try: - self.restful_plugin_ping() - self.ping_failure_count = 0 - self.request_update_ping_failures( - self.ping_failure_count) - self.ceph_mgr_failure_count = 0 - self.request_update_ceph_mgr_failures( - self.ceph_mgr_failure_count) - time.sleep(CONFIG.restful_plugin_ping_delay_sec) - continue - except RestApiPingFailed as err: - LOG.warning(str(err)) - - LOG.info('REST API ping failure count=%d', - self.ping_failure_count) - self.ping_failure_count += 1 - self.request_update_ping_failures( - self.ping_failure_count) - - # maybe request failed because ceph-mgr is not running - if not self.ceph_mgr_is_running(): - self.ceph_mgr_failure_count += 1 - self.request_update_ceph_mgr_failures( - self.ceph_mgr_failure_count) - self.ceph_mgr_start() - time.sleep(CONFIG.ceph_mgr_grace_period_sec) - continue - - # maybe request failed because cluster health is not ok - if not self.ceph_fsid_get(): - LOG.info('Unable to get cluster fsid. ' - 'Sleep for a while') - time.sleep(CONFIG.cluster_grace_period_sec) - break - - # too many failures? Restart ceph-mgr and go again - # through configuration steps - if (self.ping_failure_count - % CONFIG.ping_fail_count_restart_mgr == 0): - LOG.info('Too many consecutive REST API failures. ' - 'Restart ceph-mgr. Update service ' - 'url and certificate') - self.ceph_mgr_stop() - self.restful_plugin_url = '' - self.request_update_plugin_url(self.restful_plugin_url) - self.certificate = '' - self.request_update_certificate(self.certificate) - break - - time.sleep(CONFIG.restful_plugin_ping_delay_sec) - - except CommandFailed as err: - LOG.warning(str(err)) - time.sleep(CONFIG.cluster_grace_period_sec) - except CommandTimeout as err: - LOG.warning(str(err)) - except (CephMgrStartFailed, CephRestfulPluginFailed) as err: - LOG.warning(str(err)) - self.ceph_mgr_failure_count += 1 - self.request_update_ceph_mgr_failures( - self.ceph_mgr_failure_count) - time.sleep(CONFIG.ceph_mgr_grace_period_sec) - except Exception as err: - LOG.exception(err) - time.sleep(CONFIG.cluster_grace_period_sec) - - @staticmethod - def run_with_timeout(command, timeout, stderr=subprocess.STDOUT): - try: - LOG.info('Run command: %s', ' '.join(command)) - return subprocess.check_output( - ['/usr/bin/timeout', str(timeout)] + command, - stderr=stderr, shell=False).strip() - except subprocess.CalledProcessError as err: - if err.returncode == GNU_TIMEOUT_EXPIRED_RETCODE: - raise CommandTimeout(command=err.cmd, timeout=timeout) - raise CommandFailed(command=err.cmd, reason=str(err), - out=err.output) - - def ceph_fsid_get(self): - return self.run_with_timeout(['/usr/bin/ceph', 'fsid'], - CONFIG.ceph_cli_timeout_sec) - - def ceph_mgr_has_auth(self): - path = '{}/ceph-{}'.format( - CONFIG.ceph_mgr_confdir, CONFIG.ceph_mgr_identity) - try: - os.makedirs(path) - except OSError as err: - pass - try: - self.run_with_timeout( - ['/usr/bin/ceph', 'auth', 'get', - 'mgr.{}'.format(CONFIG.ceph_mgr_identity), - '-o', '{}/keyring'.format(path)], - CONFIG.ceph_cli_timeout_sec) - return True - except CommandFailed as err: - if 'ENOENT' in str(err): - return False - raise - - def ceph_mgr_auth_create(self): - if self.ceph_mgr_has_auth(): - return - LOG.info('Create ceph-mgr authentication') - self.run_with_timeout( - ['/usr/bin/ceph', 'auth', 'get-or-create', - 'mgr.{}'.format(CONFIG.ceph_mgr_identity), - 'mon', 'allow *', 'osd', 'allow *'], - CONFIG.ceph_cli_timeout_sec) - - def ceph_mgr_is_running(self): - if not self.ceph_mgr: - return None - try: - self.ceph_mgr.wait(timeout=0) - except psutil.TimeoutExpired: - return True - return False - - def ceph_mgr_start(self): - if self.ceph_mgr_is_running(): - return - self.stop_unmanaged_ceph_mgr() - LOG.info('Start ceph-mgr daemon') - try: - with open(os.devnull, 'wb') as null: - self.ceph_mgr = psutil.Popen( - [CONFIG.ceph_mgr_service, - '--cluster', CONFIG.ceph_mgr_cluster, - '--conf', CONFIG.ceph_mgr_config, - '--id', CONFIG.ceph_mgr_identity, - '-f'], - close_fds=True, - stdout=null, - stderr=null, - shell=False) - except (OSError, ValueError) as err: - raise CephMgrStartFailed(reason=str(err)) - time.sleep(CONFIG.ceph_mgr_grace_period_sec) - - def ceph_mgr_stop(self): - if not self.ceph_mgr: - return - LOG.info('Stop ceph-mgr') - psutil_terminate_kill(self.ceph_mgr, CONFIG.ceph_mgr_kill_delay_sec) - - def restful_plugin_has_server_port(self): - try: - with open(os.devnull, 'wb') as null: - out = self.run_with_timeout( - ['/usr/bin/ceph', 'config-key', 'get', - 'mgr/restful/server_port'], - CONFIG.ceph_cli_timeout_sec, stderr=null) - if out == str(CONFIG.restful_plugin_port): - return True - LOG.warning('Restful plugin port mismatch: ' - 'current=%d, expected=%d', out, - CONFIG.restful_plugin_port) - except CommandFailed as err: - LOG.warning('Failed to get restful plugin port: ' - 'reason=%s', str(err)) - return False - - def restful_plugin_set_server_port(self): - if self.restful_plugin_has_server_port(): - return - LOG.info('Set restful plugin port=%d', CONFIG.restful_plugin_port) - self.run_with_timeout( - ['/usr/bin/ceph', 'config-key', 'set', - 'mgr/restful/server_port', str(CONFIG.restful_plugin_port)], - CONFIG.ceph_cli_timeout_sec) - - def restful_plugin_has_admin_key(self): - try: - self.run_with_timeout( - ['/usr/bin/ceph', 'config-key', 'get', - 'mgr/restful/keys/admin'], - CONFIG.ceph_cli_timeout_sec) - return True - except CommandFailed: - pass - return False - - def restful_plugin_create_admin_key(self): - if self.restful_plugin_has_admin_key(): - return - LOG.info('Create restful plugin admin key') - self.run_with_timeout( - ['/usr/bin/ceph', 'restful', - 'create-key', 'admin'], - CONFIG.ceph_cli_timeout_sec) - - def restful_plugin_has_certificate(self): - try: - self.run_with_timeout( - ['/usr/bin/ceph', 'config-key', 'get', - 'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)], - CONFIG.ceph_cli_timeout_sec) - self.run_with_timeout( - ['/usr/bin/ceph', 'config-key', 'get', - 'mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)], - CONFIG.ceph_cli_timeout_sec) - self.run_with_timeout( - ['/usr/bin/ceph', 'config-key', 'get', - 'config/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity)], - CONFIG.ceph_cli_timeout_sec) - self.run_with_timeout( - ['/usr/bin/ceph', 'config-key', 'get', - '/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity)], - CONFIG.ceph_cli_timeout_sec) - return True - except CommandFailed: - pass - return False - - def restful_plugin_create_certificate(self): - if self.restful_plugin_has_certificate(): - return - LOG.info('Create restful plugin self signed certificate') - path = tempfile.mkdtemp() - try: - try: - with tempfile.NamedTemporaryFile() as restful_cnf: - restful_cnf.write(( - '[req]\n' - 'req_extensions = v3_ca\n' - 'distinguished_name = req_distinguished_name\n' - '[v3_ca]\n' - 'subjectAltName=DNS:{}\n' - 'basicConstraints = CA:true\n' - '[ req_distinguished_name ]\n' - '0.organizationName = IT\n' - 'commonName = ceph-restful\n').format( - CONFIG.ceph_mgr_identity)) - restful_cnf.flush() - subprocess.check_call([ - '/usr/bin/openssl', 'req', '-new', '-nodes', '-x509', - '-subj', '/O=IT/CN=' + CONFIG.ceph_mgr_identity, - '-days', '3650', - '-config', restful_cnf.name, - '-out', os.path.join(path, 'crt'), - '-keyout', os.path.join(path, 'key'), - '-extensions', 'v3_ca']) - except subprocess.CalledProcessError as err: - raise CommandFailed( - command=' '.join(err.cmd), - reason='failed to generate self-signed certificate: {}'.format(str(err)), - out=err.output) - self.run_with_timeout( - ['/usr/bin/ceph', 'config-key', 'set', - 'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity), - '-i', os.path.join(path, 'crt')], - CONFIG.ceph_cli_timeout_sec) - self.run_with_timeout( - ['/usr/bin/ceph', 'config-key', 'set', - 'mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity), - '-i', os.path.join(path, 'crt')], - CONFIG.ceph_cli_timeout_sec) - self.run_with_timeout( - ['/usr/bin/ceph', 'config-key', 'set', - 'config/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity), - '-i', os.path.join(path, 'key')], - CONFIG.ceph_cli_timeout_sec) - self.run_with_timeout( - ['/usr/bin/ceph', 'config-key', 'set', - 'mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity), - '-i', os.path.join(path, 'key')], - CONFIG.ceph_cli_timeout_sec) - finally: - shutil.rmtree(path) - - def restful_plugin_is_enabled(self): - command = ['/usr/bin/ceph', 'mgr', 'module', 'ls', - '--format', 'json'] - with open(os.devnull, 'wb') as null: - out = self.run_with_timeout( - command, CONFIG.ceph_cli_timeout_sec, stderr=null) - try: - if 'restful' in json.loads(out)['enabled_modules']: - return True - except ValueError as err: - raise CommandFailed( - command=' '.join(command), - reason='unable to decode json: {}'.format(err), out=out) - except KeyError as err: - raise CommandFailed( - command=' '.join(command), - reason='missing expected key: {}'.format(err), out=out) - return False - - def restful_plugin_enable(self): - if not self.restful_plugin_is_enabled(): - LOG.info('Enable restful plugin') - self.run_with_timeout( - ['/usr/bin/ceph', 'mgr', - 'module', 'enable', 'restful'], - CONFIG.ceph_cli_timeout_sec) - time.sleep(CONFIG.restful_plugin_grace_period_sec) - - def restful_plugin_get_url(self): - command = ['/usr/bin/ceph', 'mgr', 'services', - '--format', 'json'] - with open(os.devnull, 'wb') as null: - out = self.run_with_timeout( - command, CONFIG.ceph_cli_timeout_sec, stderr=null) - try: - self.restful_plugin_url = json.loads(out)['restful'] - except ValueError as err: - raise CephRestfulPluginFailed( - reason='unable to decode json: {} output={}'.format(err, out)) - except KeyError as err: - raise CephRestfulPluginFailed( - reason='missing expected key: {} in ouput={}'.format(err, out)) - self.request_update_plugin_url(self.restful_plugin_url) - - def restful_plugin_get_certificate(self): - command = ['/usr/bin/ceph', 'config-key', 'get', - 'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)] - with open(os.devnull, 'wb') as null: - certificate = self.run_with_timeout( - command, CONFIG.ceph_cli_timeout_sec, stderr=null) - with open(CONFIG.restful_plugin_cert_path, 'wb') as cert_file: - cert_file.write(certificate) - self.certificate = CONFIG.restful_plugin_cert_path - self.request_update_certificate( - self.certificate) - - def restful_plugin_ping(self): - if not self.restful_plugin_url: - raise RestApiPingFailed(reason='missing service url') - if not self.certificate: - raise RestApiPingFailed(reason='missing certificate') - LOG.debug('Ping restful plugin: url=%d', self.restful_plugin_url) - try: - response = requests.request( - 'GET', self.restful_plugin_url, verify=False, - timeout=CONFIG.rest_api_timeout_sec) - if not response.ok: - raise RestApiPingFailed( - reason='response not ok ({})'.format(response)) - LOG.debug('Ping restful plugin OK') - except (requests.ConnectionError, - requests.Timeout, - requests.HTTPError) as err: - raise RestApiPingFailed(reason=str(err)) - - @staticmethod - def _make_client_socket(): - sock = socket.socket( - socket.AF_UNIX, socket.SOCK_SEQPACKET) - sock.settimeout(2 * CONFIG.rest_api_timeout_sec) - sock.connect(CONFIG.service_socket) - return sock - - @staticmethod - def request_status(): - try: - with contextlib.closing( - ServiceMonitor._make_client_socket()) as sock: - sock.send('status') - status = sock.recv(CONFIG.service_socket_bufsize) - LOG.debug('Status %s', status) - return status.startswith('OK') - except socket.error as err: - LOG.error('Status error: reason=%s', err) - return False - - @staticmethod - def request_stop(): - try: - with contextlib.closing( - ServiceMonitor._make_client_socket()) as sock: - sock.send('stop') - response = sock.recv(CONFIG.service_socket_bufsize) - LOG.debug('Stop response: %s', response) - return True - except socket.error as err: - LOG.error('Stop error: reason=%s', err) - return False - - @staticmethod - def request_update_ceph_mgr_failures(count): - try: - with contextlib.closing( - ServiceMonitor._make_client_socket()) as sock: - sock.send('ceph-mgr-failures {}'.format(count)) - sock.recv(CONFIG.service_socket_bufsize) - return True - except socket.error as err: - LOG.error('Stop error: reason=%s', err) - return False - - @staticmethod - def request_update_ping_failures(count): - try: - with contextlib.closing( - ServiceMonitor._make_client_socket()) as sock: - sock.send('ping-failures {}'.format(count)) - sock.recv(CONFIG.service_socket_bufsize) - return True - except socket.error as err: - LOG.error('Stop error: reason=%s', err) - return False - - @staticmethod - def request_update_plugin_url(url): - try: - with contextlib.closing( - ServiceMonitor._make_client_socket()) as sock: - sock.send('restful-url {}'.format(url)) - sock.recv(CONFIG.service_socket_bufsize) - return True - except socket.error as err: - LOG.error('Stop error: reason=%s', err) - return False - - @staticmethod - def request_update_certificate(path): - try: - with contextlib.closing( - ServiceMonitor._make_client_socket()) as sock: - sock.send('certificate {}'.format(path)) - sock.recv(CONFIG.service_socket_bufsize) - return True - except socket.error as err: - LOG.error('Stop error: reason=%s', err) - return False - - -class InitWrapper(object): - - """Handle System V init script actions: start, stop, restart, etc. """ - - def __init__(self): - - """Dispatch command line action to the corresponding function. - - Candidate action functions are all class methods except ones - that start with an underscore. - """ - - parser = argparse.ArgumentParser() - actions = [m[0] - for m in inspect.getmembers(self) - if (inspect.ismethod(m[1]) - and not m[0].startswith('_'))] - parser.add_argument( - 'action', - choices=actions) - self.args = parser.parse_args() - getattr(self, self.args.action)() - - def start(self): - - """Start ServiceMonitor as a daemon unless one is already running. - - Use a pipe to report monitor status back to this process. - """ - - pipe = os.pipe() - child = os.fork() - if child == 0: - os.close(pipe[0]) - with daemon.DaemonContext(files_preserve=[pipe[1]]): - # prevent duplication of messages in log - global LOG - LOG = setup_logging(cleanup_handlers=True) - try: - monitor = ServiceMonitor() - status = 'OK' - except ServiceAlreadyStarted: - os.write(pipe[1], 'OK') - os.close(pipe[1]) - return - except Exception as err: - status = str(err) - os.write(pipe[1], status) - os.close(pipe[1]) - if status == 'OK': - try: - monitor.run() - except ServiceException as err: - LOG.warning(str(err)) - except Exception as err: - LOG.exception('Service monitor error: reason=%s', err) - else: - os.close(pipe[1]) - try: - status = os.read(pipe[0], CONFIG.service_socket_bufsize) - if status == 'OK': - sys.exit(0) - else: - LOG.warning('Service monitor failed to start: ' - 'status=%s', status) - except IOError as err: - LOG.warning('Failed to read monitor status: reason=%s', err) - os.close(pipe[0]) - os.waitpid(child, 0) - sys.exit(1) - - def stop(self): - - """Tell ServiceMonitor daemon to stop running. - - In case request fails stop ServiceMonitor and ceph_mgr proecsses - using SIGTERM followed by SIGKILL. - """ - - result = ServiceMonitor.request_stop() - if not result: - ceph_mgr = os.path.basename(CONFIG.ceph_mgr_service) - procs = [] - for proc in psutil.process_iter(): - name = proc.name() - if name == CONFIG.service_name: - procs.append(proc) - if name == ceph_mgr: - procs.append(proc) - psutil_terminate_kill(procs, CONFIG.ceph_mgr_kill_delay_sec) - - def restart(self): - self.stop() - self.start() - - def force_reload(self): - self.stop() - self.start() - - def reload(self): - self.stop() - self.start() - - def status(self): - - """Report status from ServiceMonitor. - - We don't just try to access REST API here because ServiceMonitor may - be in the process of starting/configuring ceph-mgr and restful - plugin in which case we report OK to avoid being restarted by SM. - """ - - status = ServiceMonitor.request_status() - sys.exit(0 if status is True else 1) - - -if __name__ == '__main__': - InitWrapper()