1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2019 Nokia.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 from kubernetes import client, config
22 # This library provides a massively-simplified interface to the kubernetes
23 # API library to reduce bloat in robot tests.
25 class KubernetesEntity(object):
26 def __init__(self, namespace):
28 self._annotationGensym = ''.join(random.choice(string.ascii_letters) for _ in range(16))
30 # FIXME: this needs to be configurable.
31 config.load_kube_config()
33 self._k8sApp = client.AppsV1Api()
34 self._k8sCore = client.CoreV1Api()
35 self._k8sEV1B1 = client.ExtensionsV1beta1Api()
37 def Deployment(self, name):
38 # this will throw kubernetes.client.rest.ApiException if
39 # the deployment doesn't exist. we'll let robot cope with
42 # calling code will most likely want to check that
43 # deploy.status.replicas == deploy.status.available_replicas
44 return self._k8sApp.read_namespaced_deployment(namespace=self._ns,
47 def Service(self, name):
48 # as above, we'll rely on this to throw if the svc dne.
50 # not much to check directly here. calling code will want
51 # to hit svc.spec.cluster_ip:r.spec.ports[0..n] with some
52 # sort of health-check request
53 return self._k8sCore.read_namespaced_service(namespace=self._ns,
57 return self._k8sCore.read_namespaced_pod(namespace=self._ns,
60 def Redeploy(self, name, wait=True, timeout=30):
61 # restart an existing deployment by doing a nonsense update
67 { self._annotationGensym: str(time.time()) }}}}}
69 r = self._k8sEV1B1.patch_namespaced_deployment(namespace=self._ns,
73 r = self.WaitForDeployment(name, timeout)
76 def WaitForDeployment(self, name, timeout=30):
77 # block until a deployment is available
79 dep = self.Deployment(name)
80 if dep and dep.status.conditions[-1].type == 'Available':
84 raise TimeoutError('Kubernetes timeout waiting for ' + name + ' to become available')
86 def RetrievePodsForDeployment(self, name):
87 # return the pod names associated with a deployment
88 d = self.Deployment(name)
89 labels = d.spec.selector.match_labels
90 pods = self._k8sCore.list_namespaced_pod(self._ns,
91 label_selector=",".join(map(lambda k: k + "=" + labels[k],
93 return map(lambda i: i.metadata.name, pods.items)
95 def RetrieveLogForPod(self, pod, container='', tail=sys.maxsize):
96 # not really an "entity" thing per se, but.
97 # kinda want to include timestamps, but i don't have a use case for them.
98 return self._k8sCore.read_namespaced_pod_log(namespace=self._ns,
101 tail_lines=tail).split('\n')[0:-1]