Updated base image plus Python to 3.11
[pti/o2.git] / helm_sdk / utils.py
1 ########
2 # Copyright (c) 2019 Cloudify Platform Ltd. All rights reserved
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #        http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 #    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #    * See the License for the specific language governing permissions and
14 #    * limitations under the License.
15
16 import os
17 import copy
18 import threading
19 import subprocess
20
21 from helm_sdk.filters import obfuscate_passwords
22
23 from helm_sdk._compat import StringIO, text_type
24 from helm_sdk.exceptions import CloudifyHelmSDKError
25
26 FLAGS_LIST_TO_VALIDATE = ['kube-apiserver', 'kube-token', 'kubeconfig']
27
28
29 def run_subprocess(command,
30                    logger,
31                    cwd=None,
32                    additional_env=None,
33                    additional_args=None,
34                    return_output=False):
35     if additional_args is None:
36         additional_args = {}
37     args_to_pass = copy.deepcopy(additional_args)
38     if additional_env:
39         passed_env = args_to_pass.setdefault('env', {})
40         passed_env.update(os.environ)
41         passed_env.update(additional_env)
42
43     logger.info(
44         "Running: command={cmd}, cwd={cwd}, additional_args={args}".format(
45             cmd=obfuscate_passwords(command),
46             cwd=cwd,
47             args=obfuscate_passwords(args_to_pass)))
48
49     process = subprocess.Popen(
50         args=command,
51         stdout=subprocess.PIPE,
52         stderr=subprocess.PIPE,
53         stdin=None,
54         cwd=cwd,
55         **args_to_pass)
56
57     if return_output:
58         stdout_consumer = CapturingOutputConsumer(
59             process.stdout)
60     else:
61         stdout_consumer = LoggingOutputConsumer(
62             process.stdout, logger, "<out> ")
63     stderr_consumer = LoggingOutputConsumer(
64         process.stderr, logger, "<err> ")
65
66     return_code = process.wait()
67     stdout_consumer.join()
68     stderr_consumer.join()
69
70     if return_code:
71         raise subprocess.CalledProcessError(return_code,
72                                             [obfuscate_passwords(cmd_element)
73                                              for cmd_element in command])
74
75     output = stdout_consumer.buffer.getvalue() if return_output else None
76     logger.info("Returning output:\n{0}".format(
77         obfuscate_passwords(output) if output is not None else '<None>'))
78
79     return output
80
81
82 # Stolen from the script plugin, until this class
83 # moves to a utils module in cloudify-common.
84 class OutputConsumer(object):
85     def __init__(self, out):
86         self.out = out
87         self.consumer = threading.Thread(target=self.consume_output)
88         self.consumer.daemon = True
89
90     def consume_output(self):
91         for line in self.out:
92             self.handle_line(line)
93         self.out.close()
94
95     def handle_line(self, line):
96         raise NotImplementedError("Must be implemented by subclass.")
97
98     def join(self):
99         self.consumer.join()
100
101
102 class LoggingOutputConsumer(OutputConsumer):
103     def __init__(self, out, logger, prefix):
104         OutputConsumer.__init__(self, out)
105         self.logger = logger
106         self.prefix = prefix
107         self.consumer.start()
108
109     def handle_line(self, line):
110         self.logger.info(
111             "{0}{1}".format(text_type(self.prefix),
112                             obfuscate_passwords(
113                                 line.decode('utf-8').rstrip('\n'))))
114
115
116 class CapturingOutputConsumer(OutputConsumer):
117     def __init__(self, out):
118         OutputConsumer.__init__(self, out)
119         self.buffer = StringIO()
120         self.consumer.start()
121
122     def handle_line(self, line):
123         self.buffer.write(line.decode('utf-8'))
124
125     def get_buffer(self):
126         return self.buffer
127
128
129 def prepare_parameter(arg_dict):
130     """
131     Prepare single parameter.
132     :param arg_dict: dictionary with the name of the flag and value(optional)
133     :return: "--name=value" or -"-name"
134     """
135     try:
136         param_string = "--" + arg_dict["name"]
137         return param_string + '=' + arg_dict.get("value") if arg_dict.get(
138             "value") else param_string
139     except KeyError:
140         raise CloudifyHelmSDKError("Parameter name doesen't exist.")
141
142
143 def prepare_set_parameters(set_values):
144     """
145     Prepare set parameters for install command.
146     :param set_values: list of dictionaries with the name of the variable to
147     set command and its value.
148     :return list like: ["--set", "name=value","--set",
149     """
150     set_list = []
151     for set_dict in set_values:
152         set_list.append('--set')
153         try:
154             set_list.append(set_dict["name"] + "=" + set_dict["value"])
155         except KeyError:
156             raise CloudifyHelmSDKError(
157                 "\"set\" parameter name or value is missing.")
158     return set_list
159
160
161 def validate_no_collisions_between_params_and_flags(flags):
162     if [flag for flag in flags if flag['name'] in FLAGS_LIST_TO_VALIDATE]:
163         raise CloudifyHelmSDKError(
164             'Please do not pass {flags_list} under "flags" property,'
165             'each of them has a known property.'.format(
166                 flags_list=FLAGS_LIST_TO_VALIDATE))