Revert "Revert "oran-shell-release: release image for F""
[pti/rtp.git] / meta-starlingx / meta-stx-virt / recipes-extended / ceph / files / mgr-restful-plugin.py
1 #!/usr/bin/python
2 #
3 # Copyright (c) 2019 Wind River Systems, Inc.
4 #
5 # SPDX-License-Identifier: Apache-2.0
6 #
7
8
9 ### BEGIN INIT INFO
10 # Provides:          ceph/mgr RESTful API plugin
11 # Required-Start:    $ceph
12 # Required-Stop:     $ceph
13 # Default-Start:     2 3 4 5
14 # Default-Stop:      0 1 6
15 # Short-Description: Ceph MGR RESTful API plugin
16 # Description:       Ceph MGR RESTful API plugin
17 ### END INIT INFO
18
19 import argparse
20 import contextlib
21 import errno
22 import fcntl
23 import inspect
24 import json
25 import logging
26 import multiprocessing
27 import os
28 import shutil
29 import signal
30 import socket
31 import subprocess
32 import sys
33 import tempfile
34 import time
35
36 import daemon
37 import psutil
38 import requests
39
40 # 'timeout' command returns exit status 124
41 # if command times out (see man page)
42 GNU_TIMEOUT_EXPIRED_RETCODE = 124
43
44
45 def psutil_terminate_kill(target, timeout):
46
47     """Extend psutil functionality to stop a process.
48
49        SIGINT is sent to each target then after a grace period SIGKILL
50        is sent to the ones that are still running.
51     """
52
53     if not isinstance(target, list):
54         target = [target]
55     _, target = psutil.wait_procs(target, timeout=0)
56     for action in [lambda p: p.terminate(), lambda p: p.kill()]:
57         for proc in target:
58             action(proc)
59         _, target = psutil.wait_procs(
60             target, timeout=timeout)
61
62
63 class Config(object):
64
65     """ceph-mgr service wrapper configuration options.
66
67         In the future we may want to load them from a configuration file
68         (for example /etc/ceph/mgr-restful-plugin.conf )
69     """
70
71     def __init__(self):
72         self.log_level = logging.INFO
73         self.log_dir = '/var/log'
74
75         self.ceph_mgr_service = '/usr/bin/ceph-mgr'
76         self.ceph_mgr_config = '/etc/ceph/ceph.conf'
77         self.ceph_mgr_cluster = 'ceph'
78         self.ceph_mgr_rundir = '/var/run/ceph/mgr'
79         self.ceph_mgr_confdir = '/var/lib/ceph/mgr'
80         self.ceph_mgr_identity = socket.gethostname()
81
82         self.service_name = 'mgr-restful-plugin'
83         self.service_socket = os.path.join(
84             self.ceph_mgr_rundir, '{}.socket'.format(self.service_name))
85         self.service_lock = os.path.join(
86             self.ceph_mgr_rundir, '{}.lock'.format(self.service_name))
87         self.service_pid_file = os.path.join(
88             '/var/run/ceph', '{}.pid'.format(self.service_name))
89
90         self.restful_plugin_port = 5001
91
92         # maximum size of a message received/sent via
93         # service monitor control socket
94         self.service_socket_bufsize = 1024
95
96         # maximum time to wait for ceph cli to exit
97         self.ceph_cli_timeout_sec = 30
98
99         # how much time to wait after ceph cli commands fail with timeout
100         # before running any other commands
101         self.cluster_grace_period_sec = 30
102
103         # after ceph-mgr is started it goes through an internal initialization
104         # phase before; how much time to wait before querying ceph-mgr
105         self.ceph_mgr_grace_period_sec = 15
106
107         # after sending SIGTERM to ceph-mgr how much time to wait before
108         # sending SIGKILL (maximum time allowed for ceph-mgr cleanup)
109         self.ceph_mgr_kill_delay_sec = 5
110
111         # if service monitor is running a recovery procedure it reports
112         # status OK even if ceph-mgr is currently down. This sets the
113         # maximum number of consecutive ceph-mgr failures before reporting
114         # status error
115         self.ceph_mgr_fail_count_report_error = 3
116
117         # maximum number of consecutive ceph-mgr failures before
118         # stopping mgr-restful-plugin service
119         self.ceph_mgr_fail_count_exit = 5
120
121         # maximum time allowed for ceph-mgr to respond to a REST API request
122         self.rest_api_timeout_sec = 15
123
124         # interval between consecutive REST API requests (ping's). A smaller
125         # value here triggers more requests to ceph-mgr restful plugin. A
126         # higher value makes recovery slower when services become unavailable
127         self.restful_plugin_ping_delay_sec = 3
128
129         # where to save the self-signed certificate generated by ceph-mgr
130         self.restful_plugin_cert_path = os.path.join(
131             self.ceph_mgr_rundir, 'restful.crt')
132
133         # time to wait after enabling restful plugin
134         self.restful_plugin_grace_period_sec = 3
135
136         # after how many REST API ping failures to restart ceph-mgr
137         self.ping_fail_count_restart_mgr = 3
138
139         # after how many REST API ping failures to report status error.
140         # Until then service monitor reports status OK just in case
141         # restful plugin recovers
142         self.ping_fail_count_report_error = 5
143
144     @staticmethod
145     def load():
146         return Config()
147
148
149 def setup_logging(name=None, cleanup_handlers=False):
150     if not name:
151         name = CONFIG.service_name
152     log = logging.getLogger(name)
153     log.setLevel(CONFIG.log_level)
154     if cleanup_handlers:
155         try:
156             for handler in log.handlers:
157                 if isinstance(handler, logging.StreamHandler):
158                     handler.flush()
159                 if isinstance(handler, logging.FileHandler):
160                     handler.close()
161             log.handlers = []
162         except Exception:
163             pass
164     elif log.handlers:
165         return log
166     handler = logging.FileHandler(
167         os.path.join(CONFIG.log_dir,
168                      '{}.log'.format(CONFIG.service_name)))
169     handler.setFormatter(
170         logging.Formatter('%(asctime)s %(process)s %(levelname)s %(name)s %(message)s'))
171     log.addHandler(handler)
172     return log
173
174
175 CONFIG = Config.load()
176 LOG = setup_logging(name='init-wrapper')
177
178
179 class ServiceException(Exception):
180
181     """Generic mgr-restful-plugin service exception.
182
183        Build exception string based on static (per exception class)
184        string plus args, keyword args passed to exception constructor.
185     """
186
187     message = ""
188
189     def __init__(self, *args, **kwargs):
190         if "message" not in kwargs:
191             try:
192                 message = self.message.format(*args, **kwargs)
193             except Exception:   # noqa
194                 message = '{}, args:{}, kwargs: {}'.format(
195                     self.message, args, kwargs)
196         else:
197             message = kwargs["message"]
198         super(ServiceException, self).__init__(message)
199
200
201 class ServiceAlreadyStarted(ServiceException):
202     message = ('Service monitor already started')
203
204
205 class ServiceLockFailed(ServiceException):
206     message = ('Unable to lock service monitor: '
207                'reason={reason}')
208
209
210 class ServiceNoSocket(ServiceException):
211     message = ('Unable to create service monitor socket: '
212                'reason={reason}')
213
214
215 class ServiceSocketBindFailed(ServiceException):
216     message = ('Failed to bind service monitor socket: '
217                'path={path}, reason={reason}')
218
219
220 class ServiceNoPidFile(ServiceException):
221     message = ('Failed to update pid file: '
222                'path={path}, reason={reason}')
223
224
225 class CommandFailed(ServiceException):
226     message = ('Command failed: command={command}, '
227                'reason={reason}, out={out}')
228
229
230 class CommandTimeout(ServiceException):
231     message = ('Command timeout: command={command}, '
232                'timeout={timeout}')
233
234
235 class CephMgrStartFailed(ServiceException):
236     message = ('Failed to start ceph_mgr: '
237                'reason={reason}')
238
239
240 class CephRestfulPluginFailed(ServiceException):
241     message = ('Failed to start restful plugin: '
242                'reason={reason}')
243
244
245 class RestApiPingFailed(ServiceException):
246     message = ('REST API ping failed: '
247                'reason={reason}')
248
249
250 class ServiceMonitor(object):
251
252     """Configure and monitor ceph-mgr and restful plugin (Ceph REST API)
253
254        1. process init script service requests: status, stop. Requests are
255           received via a control socket. Stop has priority over whatever
256           the monitor is doing currently. Any ceph command that may be running
257           is terminated/killed. Note that while ceph-mgr and restful plugin
258           configuration is in progress ServiceMonitor reports status OK to
259           avoid being restarted by SM.
260
261        2. configure ceph-mgr and mgr restful plugin: authentication, REST API
262           service port, self signed certificate. This runs as a separate
263           process so it can be stopped when init script requests it.
264
265        3. periodically check (ping) REST API responds to HTTPS requests.
266           Recovery actions are taken if REST API fails to respond: restart
267           ceph-mgr, wait for cluster to become available again.
268     """
269
270     def __init__(self):
271         # process running configuration & REST API ping loop
272         self.monitor = None
273
274         # command socket used by init script
275         self.command = None
276
277         # ceph-mgr process
278         self.ceph_mgr = None
279
280         # consecutive ceph-mgr/restful-plugin start failures. Service monitor
281         # reports failure after CONFIG.ceph_mgr_max_failure_count
282         self.ceph_mgr_failure_count = 0
283
284         # consecutive REST API ping failures. ceph-mgr service is restarted
285         # after CONFIG.ping_fail_count_restart_mgr threshold is exceeded
286         self.ping_failure_count = 0
287
288         # REST API url reported by ceph-mgr after enabling restful plugin
289         self.restful_plugin_url = ''
290
291         # REST API self signed certificate generated by restful plugin
292         self.certificate = ''
293
294     def run(self):
295         self.disable_certificate_check()
296         with self.service_lock(), self.service_socket(), \
297                 self.service_pid_file():
298             self.start_monitor()
299             self.server_loop()
300
301     def disable_certificate_check(self):
302         # ceph-mgr restful plugin is configured with a self-signed
303         # certificate. Certificate host is hard-coded to "ceph-restful"
304         # which causes HTTPS requests to fail because they don't
305         # match current host name ("controller-..."). Disable HTTPS
306         # certificates check in urllib3
307         LOG.warning('Disable urllib3 certifcates check')
308         requests.packages.urllib3.disable_warnings()
309
310     def server_loop(self):
311         self.command.listen(2)
312         while True:
313             try:
314                 client, _ = self.command.accept()
315                 request = client.recv(CONFIG.service_socket_bufsize)
316                 LOG.debug('Monitor command socket: request=%s', str(request))
317                 cmd = request.split(' ')
318                 cmd, args = cmd[0], cmd[1:]
319                 if cmd == 'status':
320                     self.send_response(client, request, self.status())
321                 elif cmd == 'stop':
322                     self.stop()
323                     self.send_response(client, request, 'OK')
324                     break
325                 elif cmd == 'restful-url':
326                     try:
327                         self.restful_plugin_url = args[0]
328                         self.send_response(client, request, 'OK')
329                     except IndexError:
330                         LOG.warning('Failed to update restful plugin url: '
331                                     'args=%s', str(args))
332                         self.send_response(client, request, 'ERR')
333                 elif cmd == 'certificate':
334                     try:
335                         self.certificate = args[0] if args else ''
336                         self.send_response(client, request, 'OK')
337                     except IndexError:
338                         LOG.warning('Failed to update certificate path: '
339                                     'args=%s', str(args))
340                         self.send_response(client, request, 'ERR')
341                 elif cmd == 'ceph-mgr-failures':
342                     try:
343                         self.ceph_mgr_failure_count = int(args[0])
344                         self.send_response(client, request, 'OK')
345                         if self.ceph_mgr_failure_count >= CONFIG.ceph_mgr_fail_count_exit:
346                             self.stop()
347                             break
348                     except (IndexError, ValueError):
349                         LOG.warning('Failed to update ceph-mgr failures: '
350                                     'args=%s', str(args))
351                         self.send_response(client, request, 'ERR')
352                 elif cmd == 'ping-failures':
353                     try:
354                         self.ping_failure_count = int(args[0])
355                         self.send_response(client, request, 'OK')
356                     except (IndexError, ValueError):
357                         LOG.warning('Failed to update ping failures: '
358                                     'args=%s', str(args))
359                         self.send_response(client, request, 'ERR')
360             except Exception as err:
361                 LOG.exception(err)
362
363     @staticmethod
364     def send_response(client, request, response):
365         try:
366             client.send(response)
367         except socket.error as err:
368             LOG.warning('Failed to send response back. '
369                         'request=%s, response=%s, reason=%s',
370                         request, response, err)
371
372     def status(self):
373         if not self.restful_plugin_url:
374             if self.ceph_mgr_failure_count < CONFIG.ceph_mgr_fail_count_report_error \
375                and self.ping_failure_count < CONFIG.ping_fail_count_report_error:
376                 LOG.debug('Monitor is starting services. Report status OK')
377                 return 'OK'
378             LOG.debug('Too many failures: '
379                       'ceph_mgr=%d < %d, ping=%d < %d. '
380                       'Report status ERR',
381                       self.ceph_mgr_failure_count,
382                       CONFIG.ceph_mgr_fail_count_report_error,
383                       self.ping_failure_count,
384                       CONFIG.ping_fail_count_report_error)
385             return 'ERR.down'
386         try:
387             self.restful_plugin_ping()
388             LOG.debug('Restful plugin ping successful. Report status OK')
389             return 'OK'
390         except (CommandFailed, RestApiPingFailed):
391             if self.ceph_mgr_failure_count < CONFIG.ceph_mgr_fail_count_report_error \
392                and self.ping_failure_count < CONFIG.ping_fail_count_report_error:
393                 LOG.info('Restful plugin does not respond but failure '
394                          'count is within acceptable limits: '
395                          ' ceph_mgr=%d < %d, ping=%d < %d. '
396                          'Report status OK',
397                          self.ceph_mgr_failure_count,
398                          CONFIG.ceph_mgr_fail_count_report_error,
399                          self.ping_failure_count,
400                          CONFIG.ping_fail_count_report_error)
401                 return 'OK'
402             LOG.debug('Restful does not respond (ping failure count %d). '
403                       'Report status ERR', self.ping_failure_count)
404             return 'ERR.ping_failed'
405
406     def stop(self):
407         if not self.monitor:
408             return
409         LOG.info('Stop monitor with SIGTERM to process group %d',
410                  self.monitor.pid)
411         try:
412             os.killpg(self.monitor.pid, signal.SIGTERM)
413         except OSError as err:
414             LOG.info('Stop monitor failed: reason=%s', str(err))
415             return
416         time.sleep(CONFIG.ceph_mgr_kill_delay_sec)
417         LOG.info('Stop monitor with SIGKILL to process group %d',
418                  self.monitor.pid)
419         try:
420             os.killpg(self.monitor.pid, signal.SIGKILL)
421             os.waitpid(self.monitor.pid, 0)
422         except OSError as err:
423             LOG.info('Stop monitor failed: reason=%s', str(err))
424             return
425         LOG.info('Monitor stopped: pid=%d', self.monitor.pid)
426
427     @contextlib.contextmanager
428     def service_lock(self):
429         LOG.info('Take service lock: path=%s', CONFIG.service_lock)
430         try:
431             os.makedirs(os.path.dirname(CONFIG.service_lock))
432         except OSError:
433             pass
434         lock_file = open(CONFIG.service_lock, 'w')
435         try:
436             fcntl.flock(lock_file.fileno(),
437                         fcntl.LOCK_EX | fcntl.LOCK_NB)
438         except (IOError, OSError) as err:
439             if err.errno == errno.EAGAIN:
440                 raise ServiceAlreadyStarted()
441             else:
442                 raise ServiceLockFailed(reason=str(err))
443         # even if we have the lock here there might be another service manager
444         # running whose CONFIG.ceph_mgr_rundir was removed before starting
445         # this instance. Make sure there is only one service manager running
446         self.stop_other_service_managers()
447         try:
448             yield
449         finally:
450             os.unlink(CONFIG.service_lock)
451             lock_file.close()
452             LOG.info('Release service lock: path=%s', CONFIG.service_lock)
453
454     def stop_other_service_managers(self):
455         service = os.path.join('/etc/init.d', CONFIG.service_name)
456         for p in psutil.process_iter():
457             if p.cmdline()[:2] not in [[service], ['/usr/bin/python', service]]:
458                 continue
459             if p.pid == os.getpid():
460                 continue
461             p.kill()
462
463     @contextlib.contextmanager
464     def service_socket(self):
465         LOG.info('Create service socket')
466         try:
467             self.command = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
468         except socket.error as err:
469             raise ServiceNoSocket(reason=str(err))
470         LOG.info('Remove existing socket files')
471         try:
472             os.unlink(CONFIG.service_socket)
473         except OSError:
474             pass
475         LOG.info('Bind service socket: path=%s', CONFIG.service_socket)
476         try:
477             self.command.bind(CONFIG.service_socket)
478         except socket.error as err:
479             raise ServiceSocketBindFailed(
480                 path=CONFIG.service_socket, reason=str(err))
481         try:
482             yield
483         finally:
484             LOG.info('Close service socket and remove file: path=%s',
485                      CONFIG.service_socket)
486             self.command.close()
487             os.unlink(CONFIG.service_socket)
488
489     @contextlib.contextmanager
490     def service_pid_file(self):
491         LOG.info('Update service pid file: path=%s', CONFIG.service_pid_file)
492         try:
493             pid_file = open(CONFIG.service_pid_file, 'w')
494             pid_file.write(str(os.getpid()))
495             pid_file.flush()
496         except OSError as err:
497             raise ServiceNoPidFile(
498                 path=CONFIG.service_pid_file, reason=str(err))
499         try:
500             yield
501         finally:
502             LOG.info('Remove service pid file: path=%s',
503                      CONFIG.service_pid_file)
504             try:
505                 os.unlink(CONFIG.service_pid_file)
506             except OSError:
507                 pass
508
509     def start_monitor(self):
510         LOG.info('Start monitor loop')
511         self.monitor = multiprocessing.Process(target=self.monitor_loop)
512         self.monitor.start()
513
514     def stop_unmanaged_ceph_mgr(self):
515         LOG.info('Stop unmanaged running ceph-mgr processes')
516         service_name = os.path.basename(CONFIG.ceph_mgr_service)
517         if self.ceph_mgr:
518             psutil_terminate_kill(
519                 [proc for proc in psutil.process_iter()
520                  if (proc.name() == service_name
521                      and proc.pid != self.ceph_mgr.pid)],
522                 CONFIG.ceph_mgr_kill_delay_sec)
523         else:
524             psutil_terminate_kill(
525                 [proc for proc in psutil.process_iter()
526                  if proc.name() == service_name],
527                 CONFIG.ceph_mgr_kill_delay_sec)
528
529     def monitor_loop(self):
530
531         """Bring up and monitor ceph-mgr restful plugin.
532
533            Steps:
534            - wait for Ceph cluster to become available
535            - configure and start ceph-mgr
536            - configure and enable restful plugin
537            - send periodic requests to REST API
538            - recover from failures
539
540            Note: because this runs as a separate process it
541                must send status updates to service monitor
542                via control socket for: ping_failure_count,
543                restful_plugin_url and certificate.
544         """
545
546         # Promote to process group leader so parent (service monitor)
547         # can kill the monitor plus processes spawned by it. Otherwise
548         # children of monitor_loop() will keep running in background and
549         # will be reaped by init when they finish but by then they might
550         # interfere with any new service instance.
551         os.setpgrp()
552
553         # Ignoring SIGTERM here ensures process group is not reused by
554         # the time parent (service monitor) issues the final SIGKILL.
555         signal.signal(signal.SIGTERM, signal.SIG_IGN)
556
557         while True:
558             try:
559                 # steps to configure/start ceph-mgr and restful plugin
560                 self.ceph_fsid_get()
561                 self.ceph_mgr_auth_create()
562                 self.restful_plugin_set_server_port()
563                 self.restful_plugin_create_certificate()
564                 self.ceph_mgr_start()
565                 self.restful_plugin_enable()
566                 self.restful_plugin_create_admin_key()
567                 self.restful_plugin_get_url()
568                 self.restful_plugin_get_certificate()
569
570                 # REST API should be available now
571                 # start making periodic requests (ping)
572                 while True:
573                     try:
574                         self.restful_plugin_ping()
575                         self.ping_failure_count = 0
576                         self.request_update_ping_failures(
577                             self.ping_failure_count)
578                         self.ceph_mgr_failure_count = 0
579                         self.request_update_ceph_mgr_failures(
580                             self.ceph_mgr_failure_count)
581                         time.sleep(CONFIG.restful_plugin_ping_delay_sec)
582                         continue
583                     except RestApiPingFailed as err:
584                         LOG.warning(str(err))
585
586                     LOG.info('REST API ping failure count=%d',
587                              self.ping_failure_count)
588                     self.ping_failure_count += 1
589                     self.request_update_ping_failures(
590                         self.ping_failure_count)
591
592                     # maybe request failed because ceph-mgr is not running
593                     if not self.ceph_mgr_is_running():
594                         self.ceph_mgr_failure_count += 1
595                         self.request_update_ceph_mgr_failures(
596                             self.ceph_mgr_failure_count)
597                         self.ceph_mgr_start()
598                         time.sleep(CONFIG.ceph_mgr_grace_period_sec)
599                         continue
600
601                     # maybe request failed because cluster health is not ok
602                     if not self.ceph_fsid_get():
603                         LOG.info('Unable to get cluster fsid. '
604                                  'Sleep for a while')
605                         time.sleep(CONFIG.cluster_grace_period_sec)
606                         break
607
608                     # too many failures? Restart ceph-mgr and go again
609                     # through configuration steps
610                     if (self.ping_failure_count
611                             % CONFIG.ping_fail_count_restart_mgr == 0):
612                         LOG.info('Too many consecutive REST API failures. '
613                                  'Restart ceph-mgr. Update service '
614                                  'url and certificate')
615                         self.ceph_mgr_stop()
616                         self.restful_plugin_url = ''
617                         self.request_update_plugin_url(self.restful_plugin_url)
618                         self.certificate = ''
619                         self.request_update_certificate(self.certificate)
620                         break
621
622                     time.sleep(CONFIG.restful_plugin_ping_delay_sec)
623
624             except CommandFailed as err:
625                 LOG.warning(str(err))
626                 time.sleep(CONFIG.cluster_grace_period_sec)
627             except CommandTimeout as err:
628                 LOG.warning(str(err))
629             except (CephMgrStartFailed, CephRestfulPluginFailed) as err:
630                 LOG.warning(str(err))
631                 self.ceph_mgr_failure_count += 1
632                 self.request_update_ceph_mgr_failures(
633                     self.ceph_mgr_failure_count)
634                 time.sleep(CONFIG.ceph_mgr_grace_period_sec)
635             except Exception as err:
636                 LOG.exception(err)
637                 time.sleep(CONFIG.cluster_grace_period_sec)
638
639     @staticmethod
640     def run_with_timeout(command, timeout, stderr=subprocess.STDOUT):
641         try:
642             LOG.info('Run command: %s', ' '.join(command))
643             return subprocess.check_output(
644                 ['/usr/bin/timeout', str(timeout)] + command,
645                 stderr=stderr, shell=False).strip()
646         except subprocess.CalledProcessError as err:
647             if err.returncode == GNU_TIMEOUT_EXPIRED_RETCODE:
648                 raise CommandTimeout(command=err.cmd, timeout=timeout)
649             raise CommandFailed(command=err.cmd, reason=str(err),
650                                 out=err.output)
651
652     def ceph_fsid_get(self):
653         return self.run_with_timeout(['/usr/bin/ceph', 'fsid'],
654                                      CONFIG.ceph_cli_timeout_sec)
655
656     def ceph_mgr_has_auth(self):
657         path = '{}/ceph-{}'.format(
658             CONFIG.ceph_mgr_confdir, CONFIG.ceph_mgr_identity)
659         try:
660             os.makedirs(path)
661         except OSError as err:
662             pass
663         try:
664             self.run_with_timeout(
665                 ['/usr/bin/ceph', 'auth', 'get',
666                  'mgr.{}'.format(CONFIG.ceph_mgr_identity),
667                  '-o', '{}/keyring'.format(path)],
668                 CONFIG.ceph_cli_timeout_sec)
669             return True
670         except CommandFailed as err:
671             if 'ENOENT' in str(err):
672                 return False
673             raise
674
675     def ceph_mgr_auth_create(self):
676         if self.ceph_mgr_has_auth():
677             return
678         LOG.info('Create ceph-mgr authentication')
679         self.run_with_timeout(
680             ['/usr/bin/ceph', 'auth', 'get-or-create',
681              'mgr.{}'.format(CONFIG.ceph_mgr_identity),
682              'mon', 'allow *', 'osd', 'allow *'],
683             CONFIG.ceph_cli_timeout_sec)
684
685     def ceph_mgr_is_running(self):
686         if not self.ceph_mgr:
687             return None
688         try:
689             self.ceph_mgr.wait(timeout=0)
690         except psutil.TimeoutExpired:
691             return True
692         return False
693
694     def ceph_mgr_start(self):
695         if self.ceph_mgr_is_running():
696             return
697         self.stop_unmanaged_ceph_mgr()
698         LOG.info('Start ceph-mgr daemon')
699         try:
700             with open(os.devnull, 'wb') as null:
701                 self.ceph_mgr = psutil.Popen(
702                     [CONFIG.ceph_mgr_service,
703                      '--cluster', CONFIG.ceph_mgr_cluster,
704                      '--conf', CONFIG.ceph_mgr_config,
705                      '--id', CONFIG.ceph_mgr_identity,
706                      '-f'],
707                     close_fds=True,
708                     stdout=null,
709                     stderr=null,
710                     shell=False)
711         except (OSError, ValueError) as err:
712             raise CephMgrStartFailed(reason=str(err))
713         time.sleep(CONFIG.ceph_mgr_grace_period_sec)
714
715     def ceph_mgr_stop(self):
716         if not self.ceph_mgr:
717             return
718         LOG.info('Stop ceph-mgr')
719         psutil_terminate_kill(self.ceph_mgr, CONFIG.ceph_mgr_kill_delay_sec)
720
721     def restful_plugin_has_server_port(self):
722         try:
723             with open(os.devnull, 'wb') as null:
724                 out = self.run_with_timeout(
725                     ['/usr/bin/ceph', 'config-key', 'get',
726                      'mgr/restful/server_port'],
727                     CONFIG.ceph_cli_timeout_sec, stderr=null)
728             if out == str(CONFIG.restful_plugin_port):
729                 return True
730             LOG.warning('Restful plugin port mismatch: '
731                         'current=%d, expected=%d', out,
732                         CONFIG.restful_plugin_port)
733         except CommandFailed as err:
734             LOG.warning('Failed to get restful plugin port: '
735                         'reason=%s', str(err))
736         return False
737
738     def restful_plugin_set_server_port(self):
739         if self.restful_plugin_has_server_port():
740             return
741         LOG.info('Set restful plugin port=%d', CONFIG.restful_plugin_port)
742         self.run_with_timeout(
743             ['/usr/bin/ceph', 'config-key', 'set',
744              'mgr/restful/server_port', str(CONFIG.restful_plugin_port)],
745             CONFIG.ceph_cli_timeout_sec)
746
747     def restful_plugin_has_admin_key(self):
748         try:
749             self.run_with_timeout(
750                 ['/usr/bin/ceph', 'config-key', 'get',
751                  'mgr/restful/keys/admin'],
752                 CONFIG.ceph_cli_timeout_sec)
753             return True
754         except CommandFailed:
755             pass
756         return False
757
758     def restful_plugin_create_admin_key(self):
759         if self.restful_plugin_has_admin_key():
760             return
761         LOG.info('Create restful plugin admin key')
762         self.run_with_timeout(
763             ['/usr/bin/ceph', 'restful',
764              'create-key', 'admin'],
765             CONFIG.ceph_cli_timeout_sec)
766
767     def restful_plugin_has_certificate(self):
768         try:
769             self.run_with_timeout(
770                 ['/usr/bin/ceph', 'config-key', 'get',
771                  'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)],
772                 CONFIG.ceph_cli_timeout_sec)
773             self.run_with_timeout(
774                 ['/usr/bin/ceph', 'config-key', 'get',
775                  'mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)],
776                 CONFIG.ceph_cli_timeout_sec)
777             self.run_with_timeout(
778                 ['/usr/bin/ceph', 'config-key', 'get',
779                  'config/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity)],
780                 CONFIG.ceph_cli_timeout_sec)
781             self.run_with_timeout(
782                 ['/usr/bin/ceph', 'config-key', 'get',
783                  '/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity)],
784                 CONFIG.ceph_cli_timeout_sec)
785             return True
786         except CommandFailed:
787             pass
788         return False
789
790     def restful_plugin_create_certificate(self):
791         if self.restful_plugin_has_certificate():
792             return
793         LOG.info('Create restful plugin self signed certificate')
794         path = tempfile.mkdtemp()
795         try:
796             try:
797                 with tempfile.NamedTemporaryFile() as restful_cnf:
798                     restful_cnf.write((
799                         '[req]\n'
800                         'req_extensions = v3_ca\n'
801                         'distinguished_name = req_distinguished_name\n'
802                         '[v3_ca]\n'
803                         'subjectAltName=DNS:{}\n'
804                         'basicConstraints = CA:true\n'
805                         '[ req_distinguished_name ]\n'
806                         '0.organizationName = IT\n'
807                         'commonName = ceph-restful\n').format(
808                             CONFIG.ceph_mgr_identity))
809                     restful_cnf.flush()
810                     subprocess.check_call([
811                         '/usr/bin/openssl', 'req', '-new', '-nodes', '-x509',
812                         '-subj', '/O=IT/CN=' + CONFIG.ceph_mgr_identity,
813                         '-days', '3650',
814                         '-config', restful_cnf.name,
815                         '-out', os.path.join(path, 'crt'),
816                         '-keyout', os.path.join(path, 'key'),
817                         '-extensions', 'v3_ca'])
818             except subprocess.CalledProcessError as err:
819                 raise CommandFailed(
820                     command=' '.join(err.cmd),
821                     reason='failed to generate self-signed certificate: {}'.format(str(err)),
822                     out=err.output)
823             self.run_with_timeout(
824                 ['/usr/bin/ceph', 'config-key', 'set',
825                  'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity),
826                  '-i', os.path.join(path, 'crt')],
827                 CONFIG.ceph_cli_timeout_sec)
828             self.run_with_timeout(
829                 ['/usr/bin/ceph', 'config-key', 'set',
830                  'mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity),
831                  '-i', os.path.join(path, 'crt')],
832                 CONFIG.ceph_cli_timeout_sec)
833             self.run_with_timeout(
834                 ['/usr/bin/ceph', 'config-key', 'set',
835                  'config/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity),
836                  '-i', os.path.join(path, 'key')],
837                 CONFIG.ceph_cli_timeout_sec)
838             self.run_with_timeout(
839                 ['/usr/bin/ceph', 'config-key', 'set',
840                  'mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity),
841                  '-i', os.path.join(path, 'key')],
842                 CONFIG.ceph_cli_timeout_sec)
843         finally:
844             shutil.rmtree(path)
845
846     def restful_plugin_is_enabled(self):
847         command = ['/usr/bin/ceph', 'mgr', 'module', 'ls',
848                    '--format', 'json']
849         with open(os.devnull, 'wb') as null:
850             out = self.run_with_timeout(
851                 command, CONFIG.ceph_cli_timeout_sec, stderr=null)
852         try:
853             if 'restful' in json.loads(out)['enabled_modules']:
854                 return True
855         except ValueError as err:
856             raise CommandFailed(
857                 command=' '.join(command),
858                 reason='unable to decode json: {}'.format(err), out=out)
859         except KeyError as err:
860             raise CommandFailed(
861                 command=' '.join(command),
862                 reason='missing expected key: {}'.format(err), out=out)
863         return False
864
865     def restful_plugin_enable(self):
866         if not self.restful_plugin_is_enabled():
867             LOG.info('Enable restful plugin')
868             self.run_with_timeout(
869                 ['/usr/bin/ceph', 'mgr',
870                  'module', 'enable', 'restful'],
871                 CONFIG.ceph_cli_timeout_sec)
872         time.sleep(CONFIG.restful_plugin_grace_period_sec)
873
874     def restful_plugin_get_url(self):
875         command = ['/usr/bin/ceph', 'mgr', 'services',
876                    '--format', 'json']
877         with open(os.devnull, 'wb') as null:
878             out = self.run_with_timeout(
879                 command, CONFIG.ceph_cli_timeout_sec, stderr=null)
880         try:
881             self.restful_plugin_url = json.loads(out)['restful']
882         except ValueError as err:
883             raise CephRestfulPluginFailed(
884                 reason='unable to decode json: {} output={}'.format(err, out))
885         except KeyError as err:
886             raise CephRestfulPluginFailed(
887                 reason='missing expected key: {} in ouput={}'.format(err, out))
888         self.request_update_plugin_url(self.restful_plugin_url)
889
890     def restful_plugin_get_certificate(self):
891         command = ['/usr/bin/ceph', 'config-key', 'get',
892                    'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)]
893         with open(os.devnull, 'wb') as null:
894             certificate = self.run_with_timeout(
895                 command, CONFIG.ceph_cli_timeout_sec, stderr=null)
896             with open(CONFIG.restful_plugin_cert_path, 'wb') as cert_file:
897                 cert_file.write(certificate)
898             self.certificate = CONFIG.restful_plugin_cert_path
899             self.request_update_certificate(
900                 self.certificate)
901
902     def restful_plugin_ping(self):
903         if not self.restful_plugin_url:
904             raise RestApiPingFailed(reason='missing service url')
905         if not self.certificate:
906             raise RestApiPingFailed(reason='missing certificate')
907         LOG.debug('Ping restful plugin: url=%d', self.restful_plugin_url)
908         try:
909             response = requests.request(
910                 'GET', self.restful_plugin_url, verify=False,
911                 timeout=CONFIG.rest_api_timeout_sec)
912             if not response.ok:
913                 raise RestApiPingFailed(
914                     reason='response not ok ({})'.format(response))
915             LOG.debug('Ping restful plugin OK')
916         except (requests.ConnectionError,
917                 requests.Timeout,
918                 requests.HTTPError) as err:
919             raise RestApiPingFailed(reason=str(err))
920
921     @staticmethod
922     def _make_client_socket():
923         sock = socket.socket(
924             socket.AF_UNIX, socket.SOCK_SEQPACKET)
925         sock.settimeout(2 * CONFIG.rest_api_timeout_sec)
926         sock.connect(CONFIG.service_socket)
927         return sock
928
929     @staticmethod
930     def request_status():
931         try:
932             with contextlib.closing(
933                     ServiceMonitor._make_client_socket()) as sock:
934                 sock.send('status')
935                 status = sock.recv(CONFIG.service_socket_bufsize)
936                 LOG.debug('Status %s', status)
937                 return status.startswith('OK')
938         except socket.error as err:
939             LOG.error('Status error: reason=%s', err)
940             return False
941
942     @staticmethod
943     def request_stop():
944         try:
945             with contextlib.closing(
946                     ServiceMonitor._make_client_socket()) as sock:
947                 sock.send('stop')
948                 response = sock.recv(CONFIG.service_socket_bufsize)
949                 LOG.debug('Stop response: %s', response)
950                 return True
951         except socket.error as err:
952             LOG.error('Stop error: reason=%s', err)
953             return False
954
955     @staticmethod
956     def request_update_ceph_mgr_failures(count):
957         try:
958             with contextlib.closing(
959                     ServiceMonitor._make_client_socket()) as sock:
960                 sock.send('ceph-mgr-failures {}'.format(count))
961                 sock.recv(CONFIG.service_socket_bufsize)
962                 return True
963         except socket.error as err:
964             LOG.error('Stop error: reason=%s', err)
965             return False
966
967     @staticmethod
968     def request_update_ping_failures(count):
969         try:
970             with contextlib.closing(
971                     ServiceMonitor._make_client_socket()) as sock:
972                 sock.send('ping-failures {}'.format(count))
973                 sock.recv(CONFIG.service_socket_bufsize)
974                 return True
975         except socket.error as err:
976             LOG.error('Stop error: reason=%s', err)
977             return False
978
979     @staticmethod
980     def request_update_plugin_url(url):
981         try:
982             with contextlib.closing(
983                     ServiceMonitor._make_client_socket()) as sock:
984                 sock.send('restful-url {}'.format(url))
985                 sock.recv(CONFIG.service_socket_bufsize)
986                 return True
987         except socket.error as err:
988             LOG.error('Stop error: reason=%s', err)
989             return False
990
991     @staticmethod
992     def request_update_certificate(path):
993         try:
994             with contextlib.closing(
995                     ServiceMonitor._make_client_socket()) as sock:
996                 sock.send('certificate {}'.format(path))
997                 sock.recv(CONFIG.service_socket_bufsize)
998                 return True
999         except socket.error as err:
1000             LOG.error('Stop error: reason=%s', err)
1001             return False
1002
1003
1004 class InitWrapper(object):
1005
1006     """Handle System V init script actions: start, stop, restart, etc. """
1007
1008     def __init__(self):
1009
1010         """Dispatch command line action to the corresponding function.
1011
1012            Candidate action functions are all class methods except ones
1013            that start with an underscore.
1014         """
1015
1016         parser = argparse.ArgumentParser()
1017         actions = [m[0]
1018                    for m in inspect.getmembers(self)
1019                    if (inspect.ismethod(m[1])
1020                        and not m[0].startswith('_'))]
1021         parser.add_argument(
1022             'action',
1023             choices=actions)
1024         self.args = parser.parse_args()
1025         getattr(self, self.args.action)()
1026
1027     def start(self):
1028
1029         """Start ServiceMonitor as a daemon unless one is already running.
1030
1031            Use a pipe to report monitor status back to this process.
1032         """
1033
1034         pipe = os.pipe()
1035         child = os.fork()
1036         if child == 0:
1037             os.close(pipe[0])
1038             with daemon.DaemonContext(files_preserve=[pipe[1]]):
1039                 # prevent duplication of messages in log
1040                 global LOG
1041                 LOG = setup_logging(cleanup_handlers=True)
1042                 try:
1043                     monitor = ServiceMonitor()
1044                     status = 'OK'
1045                 except ServiceAlreadyStarted:
1046                     os.write(pipe[1], 'OK')
1047                     os.close(pipe[1])
1048                     return
1049                 except Exception as err:
1050                     status = str(err)
1051                 os.write(pipe[1], status)
1052                 os.close(pipe[1])
1053                 if status == 'OK':
1054                     try:
1055                         monitor.run()
1056                     except ServiceException as err:
1057                         LOG.warning(str(err))
1058                     except Exception as err:
1059                         LOG.exception('Service monitor error: reason=%s', err)
1060         else:
1061             os.close(pipe[1])
1062             try:
1063                 status = os.read(pipe[0], CONFIG.service_socket_bufsize)
1064                 if status == 'OK':
1065                     sys.exit(0)
1066                 else:
1067                     LOG.warning('Service monitor failed to start: '
1068                                 'status=%s', status)
1069             except IOError as err:
1070                 LOG.warning('Failed to read monitor status: reason=%s', err)
1071             os.close(pipe[0])
1072             os.waitpid(child, 0)
1073             sys.exit(1)
1074
1075     def stop(self):
1076
1077         """Tell ServiceMonitor daemon to stop running.
1078
1079            In case request fails stop ServiceMonitor and ceph_mgr proecsses
1080            using SIGTERM followed by SIGKILL.
1081         """
1082
1083         result = ServiceMonitor.request_stop()
1084         if not result:
1085             ceph_mgr = os.path.basename(CONFIG.ceph_mgr_service)
1086             procs = []
1087             for proc in psutil.process_iter():
1088                 name = proc.name()
1089                 if name == CONFIG.service_name:
1090                     procs.append(proc)
1091                 if name == ceph_mgr:
1092                     procs.append(proc)
1093             psutil_terminate_kill(procs, CONFIG.ceph_mgr_kill_delay_sec)
1094
1095     def restart(self):
1096         self.stop()
1097         self.start()
1098
1099     def force_reload(self):
1100         self.stop()
1101         self.start()
1102
1103     def reload(self):
1104         self.stop()
1105         self.start()
1106
1107     def status(self):
1108
1109         """Report status from ServiceMonitor.
1110
1111            We don't just try to access REST API here because ServiceMonitor may
1112            be in the process of starting/configuring ceph-mgr and restful
1113            plugin in which case we report OK to avoid being restarted by SM.
1114         """
1115
1116         status = ServiceMonitor.request_status()
1117         sys.exit(0 if status is True else 1)
1118
1119
1120 if __name__ == '__main__':
1121     InitWrapper()