7b2535a1e185653e03ddc8137eee1fd9eda1841b
[it/test.git] / ric_robot_suite / ric-python-utils / ricutils / KubernetesEntity.py
1 #   Copyright (c) 2019 AT&T Intellectual Property.
2 #   Copyright (c) 2019 Nokia.
3 #
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #   Unless required by applicable law or agreed to in writing, software
11 #   distributed under the License is distributed on an "AS IS" BASIS,
12 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #   See the License for the specific language governing permissions and
14 #   limitations under the License.
15
16 from kubernetes import client, config
17 import string
18 import random
19 import time
20
21 # This library provides a massively-simplified interface to the kubernetes
22 # API library to reduce bloat in robot tests.
23
24 class KubernetesEntity(object):
25  def __init__(self, namespace):
26   self._ns = namespace
27   self._annotationGensym = ''.join(random.choice(string.ascii_letters) for _ in range(16))
28
29   # FIXME: this needs to be configurable.
30   config.load_kube_config()
31
32   self._k8sApp = client.AppsV1Api()
33   self._k8sCore = client.CoreV1Api()
34   self._k8sEV1B1 = client.ExtensionsV1beta1Api()
35
36  def Deployment(self, name):
37   # this will throw kubernetes.client.rest.ApiException if
38   # the deployment doesn't exist.  we'll let robot cope with
39   # that.
40
41   # calling code will most likely want to check that
42   # deploy.status.replicas == deploy.status.available_replicas
43   return self._k8sApp.read_namespaced_deployment(namespace=self._ns,
44                                                  name=name)
45
46  def Service(self, name):
47   # as above, we'll rely on this to throw if the svc dne.
48
49   # not much to check directly here.  calling code will want
50   # to hit svc.spec.cluster_ip:r.spec.ports[0..n] with some
51   # sort of health-check request
52   return self._k8sCore.read_namespaced_service(namespace=self._ns,
53                                                name=name)
54
55  def Pod(self, name):
56   return self._k8sCore.read_namespaced_pod(namespace=self._ns,
57                                            name=name)
58
59  def Redeploy(self, name, wait=True, timeout=30):
60   # restart an existing deployment by doing a nonsense update
61   # to its spec.
62   body = {'spec':
63           {'template':
64            {'metadata':
65             {'annotations':
66              { self._annotationGensym: str(time.time()) }}}}}
67
68   r = self._k8sEV1B1.patch_namespaced_deployment(namespace=self._ns,
69                                                  name=name,
70                                                  body=body)
71   if wait:
72    r = self.WaitForDeployment(name, timeout)
73   return r
74
75  def WaitForDeployment(self, name, timeout=30):
76   # block until a deployment is available
77   while timeout > 0:
78    dep = self.Deployment(name)
79    if dep and dep.status.conditions[-1].type == 'Available':
80     return True
81    time.sleep(1)
82    timeout -= 1
83   raise TimeoutError('Kubernetes timeout waiting for ' + name + ' to become available')
84