1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2019 Nokia.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 from kubernetes import client, config
26 # This library provides a massively-simplified interface to the kubernetes
27 # API library to reduce bloat in robot tests.
29 class KubernetesEntity(object):
30 def __init__(self, namespace):
32 self._annotationGensym = ''.join(random.choice(string.ascii_letters) for _ in range(16))
34 # FIXME: this needs to be configurable.
35 config.load_kube_config()
37 self._k8sApp = client.AppsV1Api()
38 self._k8sCore = client.CoreV1Api()
39 self._k8sEV1B1 = client.ExtensionsV1beta1Api()
41 def Deployment(self, name, namespace=None):
42 # this will throw kubernetes.client.rest.ApiException if
43 # the deployment doesn't exist. we'll let robot cope with
46 # calling code will most likely want to check that
47 # deploy.status.replicas == deploy.status.available_replicas
48 return self._k8sApp.read_namespaced_deployment(namespace=namespace or self._ns,
51 def Service(self, name, namespace=None):
52 # as above, we'll rely on this to throw if the svc dne.
54 # not much to check directly here. calling code will want
55 # to hit svc.spec.cluster_ip:r.spec.ports[0..n] with some
56 # sort of health-check request
57 return self._k8sCore.read_namespaced_service(namespace=namespace or self._ns,
60 def Pod(self, name, namepsace=None):
61 return self._k8sCore.read_namespaced_pod(namespace=namespace or self._ns,
64 def Redeploy(self, name, wait=True, timeout=30, namespace=None):
65 # restart an existing deployment by doing a nonsense update
71 { self._annotationGensym: str(time.time()) }}}}}
73 r = self._k8sEV1B1.patch_namespaced_deployment(namespace=namespace or self._ns,
77 r = self.WaitForDeployment(name, timeout, namespace=namespace or self._ns)
80 def WaitForDeployment(self, name, timeout=30, namespace=None):
81 # block until a deployment is available
83 dep = self.Deployment(name, namespace=namespace or self._ns)
84 if dep and dep.status.conditions[-1].type == 'Available':
88 raise TimeoutError('Kubernetes timeout waiting for ' + name + ' to become available')
90 def RetrievePodsForDeployment(self, name, namespace=None):
91 # return the pod names associated with a deployment
92 d = self.Deployment(name, namespace or self._ns)
93 labels = d.spec.selector.match_labels
94 pods = self._k8sCore.list_namespaced_pod(namespace or self._ns,
95 label_selector=",".join(map(lambda k: k + "=" + labels[k],
97 return list(map(lambda i: i.metadata.name, pods.items))
99 def RetrieveLogForPod(self, pod, container='', tail=sys.maxsize, namespace=None):
100 # not really an "entity" thing per se, but.
101 # kinda want to include timestamps, but i don't have a use case for them.
102 return self._k8sCore.read_namespaced_pod_log(namespace=namespace or self._ns,
105 tail_lines=tail).split('\n')[0:-1]
107 def ExecuteCommandInPod(self, pod, cmd, strip_newlines=True, namespace=None):
108 # near as i can tell, the python k8s client doesn't implement
109 # 'kubectl exec'. this is near enough for our purposes.
110 # 'cmd' is an argv list.
111 channels={1: 'stdout', 2: 'stderr', 3: 'k8s'}
112 output={'stdout': [], 'stderr': [], 'k8s': []}
113 path='/api/v1/namespaces/%s/pods/%s/exec?%s&stdin=false&stderr=true&stdout=true&tty=false' % \
114 (namespace or self._ns, pod, urllib.parse.urlencode({'command': cmd}, doseq=True))
115 # we could probably cache and reuse the sslcontext, but meh, we're not
116 # after performance here.
117 ctx=ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
118 c = client.Configuration()
120 async def ExecCoroutine():
121 # base64.channel.k8s.io is also a valid subprotocol, but i don't see any
122 # reason to support it.
123 async with websockets.connect(uri,\
125 subprotocols=["channel.k8s.io"],\
126 extra_headers=c.api_key) as ws:
127 async for message in ws:
128 if message[0] in channels and (not strip_newlines or len(message) > 1):
129 # we probably should throw up if we get an unrecognized channel, but
130 # i really don't want to be bothered with asyncio exception handling
131 # for that vanishingly improbable case.
132 output[channels[message[0]]].extend(message[1:-1].decode('utf-8').split('\n'))
134 ctx.load_verify_locations(c.ssl_ca_cert)
135 if(c.cert_file and c.key_file):
136 ctx.load_cert_chain(c.cert_file, c.key_file)
137 uri = 'wss://%s%s' % (c.host.lstrip('https://'), path)
139 asyncio.get_event_loop().run_until_complete(ExecCoroutine())