X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=ric_robot_suite%2Fric-python-utils%2Fricutils%2FKubernetesEntity.py;fp=ric_robot_suite%2Fric-python-utils%2Fricutils%2FKubernetesEntity.py;h=7b2535a1e185653e03ddc8137eee1fd9eda1841b;hb=c5fa07bcd8cbd614bcd813cac698385b789bcfcb;hp=0000000000000000000000000000000000000000;hpb=59f84608ec15c016958a6e0e0ddd813f376c0925;p=it%2Ftest.git diff --git a/ric_robot_suite/ric-python-utils/ricutils/KubernetesEntity.py b/ric_robot_suite/ric-python-utils/ricutils/KubernetesEntity.py new file mode 100644 index 0000000..7b2535a --- /dev/null +++ b/ric_robot_suite/ric-python-utils/ricutils/KubernetesEntity.py @@ -0,0 +1,84 @@ +# Copyright (c) 2019 AT&T Intellectual Property. +# Copyright (c) 2019 Nokia. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kubernetes import client, config +import string +import random +import time + +# This library provides a massively-simplified interface to the kubernetes +# API library to reduce bloat in robot tests. + +class KubernetesEntity(object): + def __init__(self, namespace): + self._ns = namespace + self._annotationGensym = ''.join(random.choice(string.ascii_letters) for _ in range(16)) + + # FIXME: this needs to be configurable. + config.load_kube_config() + + self._k8sApp = client.AppsV1Api() + self._k8sCore = client.CoreV1Api() + self._k8sEV1B1 = client.ExtensionsV1beta1Api() + + def Deployment(self, name): + # this will throw kubernetes.client.rest.ApiException if + # the deployment doesn't exist. we'll let robot cope with + # that. + + # calling code will most likely want to check that + # deploy.status.replicas == deploy.status.available_replicas + return self._k8sApp.read_namespaced_deployment(namespace=self._ns, + name=name) + + def Service(self, name): + # as above, we'll rely on this to throw if the svc dne. + + # not much to check directly here. calling code will want + # to hit svc.spec.cluster_ip:r.spec.ports[0..n] with some + # sort of health-check request + return self._k8sCore.read_namespaced_service(namespace=self._ns, + name=name) + + def Pod(self, name): + return self._k8sCore.read_namespaced_pod(namespace=self._ns, + name=name) + + def Redeploy(self, name, wait=True, timeout=30): + # restart an existing deployment by doing a nonsense update + # to its spec. + body = {'spec': + {'template': + {'metadata': + {'annotations': + { self._annotationGensym: str(time.time()) }}}}} + + r = self._k8sEV1B1.patch_namespaced_deployment(namespace=self._ns, + name=name, + body=body) + if wait: + r = self.WaitForDeployment(name, timeout) + return r + + def WaitForDeployment(self, name, timeout=30): + # block until a deployment is available + while timeout > 0: + dep = self.Deployment(name) + if dep and dep.status.conditions[-1].type == 'Available': + return True + time.sleep(1) + timeout -= 1 + raise TimeoutError('Kubernetes timeout waiting for ' + name + ' to become available') +