049d17df2b1eb39a282a509f00de455f3134a0fa
[it/test.git] / ric_robot_suite / ric-python-utils / ricutils / KubernetesEntity.py
1 #   Copyright (c) 2019 AT&T Intellectual Property.
2 #   Copyright (c) 2019 Nokia.
3 #
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #   Unless required by applicable law or agreed to in writing, software
11 #   distributed under the License is distributed on an "AS IS" BASIS,
12 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #   See the License for the specific language governing permissions and
14 #   limitations under the License.
15
16 from kubernetes import client, config
17 import sys
18 import string
19 import random
20 import time
21 import ssl
22 import asyncio
23 import websockets
24 import urllib.parse
25
26 # This library provides a massively-simplified interface to the kubernetes
27 # API library to reduce bloat in robot tests.
28
29 class KubernetesEntity(object):
30  def __init__(self, namespace):
31   self._ns = namespace
32   self._annotationGensym = ''.join(random.choice(string.ascii_letters) for _ in range(16))
33
34   # FIXME: this needs to be configurable.
35   config.load_kube_config()
36
37   self._k8sApp = client.AppsV1Api()
38   self._k8sCore = client.CoreV1Api()
39   self._k8sEV1B1 = client.ExtensionsV1beta1Api()
40
41  def Deployment(self, name, namespace=None):
42   # this will throw kubernetes.client.rest.ApiException if
43   # the deployment doesn't exist.  we'll let robot cope with
44   # that.
45
46   # calling code will most likely want to check that
47   # deploy.status.replicas == deploy.status.available_replicas
48   return self._k8sApp.read_namespaced_deployment(namespace=namespace or self._ns,
49                                                  name=name)
50
51  def StatefulSet(self, name, namespace=None):
52   # as above, but for statefulsets, and with the assumption
53   # that the typical check here sfst.replicas == sfst.ready_replicas
54   return self._k8sApp.read_namespaced_stateful_set(namespace = namespace or self._ns,
55                                                    name=name)
56  
57  def Service(self, name, namespace=None):
58   # as above, we'll rely on this to throw if the svc dne.
59
60   # not much to check directly here.  calling code will want
61   # to hit svc.spec.cluster_ip:r.spec.ports[0..n] with some
62   # sort of health-check request
63   return self._k8sCore.read_namespaced_service(namespace=namespace or self._ns,
64                                                name=name)
65
66  def Pod(self, name, namepsace=None):
67   return self._k8sCore.read_namespaced_pod(namespace=namespace or self._ns,
68                                            name=name)
69
70  def Redeploy(self, name, wait=True, timeout=30, namespace=None):
71   # restart an existing deployment by doing a nonsense update
72   # to its spec.
73   body = {'spec':
74           {'template':
75            {'metadata':
76             {'annotations':
77              { self._annotationGensym: str(time.time()) }}}}}
78
79   r = self._k8sEV1B1.patch_namespaced_deployment(namespace=namespace or self._ns,
80                                                  name=name,
81                                                  body=body)
82   if wait:
83    r = self.WaitForDeployment(name, timeout, namespace=namespace or self._ns)
84   return r
85
86  def WaitForDeployment(self, name, timeout=30, namespace=None):
87   # block until a deployment is available
88   while timeout > 0:
89    dep = self.Deployment(name, namespace=namespace or self._ns)
90    if dep and dep.status.conditions[-1].type == 'Available':
91     return True
92    time.sleep(1)
93    timeout -= 1
94   raise TimeoutError('Kubernetes timeout waiting for ' + name + ' to become available')
95
96  def RetrievePodsForDeployment(self, name, namespace=None):
97   # return the pod names associated with a deployment
98   d = self.Deployment(name, namespace or self._ns)
99   labels = d.spec.selector.match_labels
100   pods = self._k8sCore.list_namespaced_pod(namespace or self._ns,
101                                            label_selector=",".join(map(lambda k: k + "=" + labels[k], 
102                                                                        labels)))
103   return list(map(lambda i: i.metadata.name, pods.items))
104
105  def RetrieveLogForPod(self, pod, container='', tail=sys.maxsize, namespace=None):
106   # not really an "entity" thing per se, but.
107   # kinda want to include timestamps, but i don't have a use case for them.
108   return self._k8sCore.read_namespaced_pod_log(namespace=namespace or self._ns,
109                                                name=pod,
110                                                container=container,
111                                                tail_lines=tail).split('\n')[0:-1]
112
113  def ExecuteCommandInPod(self, pod, cmd, strip_newlines=True, namespace=None):
114    # near as i can tell, the python k8s client doesn't implement
115    # 'kubectl exec'.  this is near enough for our purposes.
116    # 'cmd' is an argv list.
117    channels={1: 'stdout', 2: 'stderr', 3: 'k8s'}
118    output={'stdout': [], 'stderr': [], 'k8s': []}
119    path='/api/v1/namespaces/%s/pods/%s/exec?%s&stdin=false&stderr=true&stdout=true&tty=false' % \
120         (namespace or self._ns, pod, urllib.parse.urlencode({'command': cmd}, doseq=True))
121    # we could probably cache and reuse the sslcontext, but meh, we're not
122    # after performance here.
123    ctx=ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
124    c = client.Configuration()
125    
126    async def ExecCoroutine():
127       # base64.channel.k8s.io is also a valid subprotocol, but i don't see any
128       # reason to support it.
129       async with websockets.connect(uri,\
130                                     ssl=ctx,\
131                                     subprotocols=["channel.k8s.io"],\
132                                     extra_headers=c.api_key) as ws:
133         async for message in ws:
134            if message[0] in channels and (not strip_newlines or len(message) > 1):
135              # we probably should throw up if we get an unrecognized channel, but
136              # i really don't want to be bothered with asyncio exception handling
137              # for that vanishingly improbable case.
138              output[channels[message[0]]].extend(message[1:-1].decode('utf-8').split('\n'))
139             
140    ctx.load_verify_locations(c.ssl_ca_cert)
141    if(c.cert_file and c.key_file):
142      ctx.load_cert_chain(c.cert_file, c.key_file)
143    uri = 'wss://%s%s' % (c.host.lstrip('https://'), path)
144
145    asyncio.get_event_loop().run_until_complete(ExecCoroutine())
146   
147    return(output)