f4583bcafc0b9fc1b60713ffd2cd09f228df95a5
[it/test.git] / ric_robot_suite / ric-python-utils / ricutils / KubernetesEntity.py
1 #   Copyright (c) 2019 AT&T Intellectual Property.
2 #   Copyright (c) 2019 Nokia.
3 #
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #   Unless required by applicable law or agreed to in writing, software
11 #   distributed under the License is distributed on an "AS IS" BASIS,
12 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #   See the License for the specific language governing permissions and
14 #   limitations under the License.
15
16 from kubernetes import client, config
17 import sys
18 import string
19 import random
20 import time
21 import ssl
22 import asyncio
23 import websockets
24 import urllib.parse
25
26 # This library provides a massively-simplified interface to the kubernetes
27 # API library to reduce bloat in robot tests.
28
29 class KubernetesEntity(object):
30  def __init__(self, namespace):
31   self._ns = namespace
32   self._annotationGensym = ''.join(random.choice(string.ascii_letters) for _ in range(16))
33
34   # FIXME: this needs to be configurable.
35   config.load_kube_config()
36
37   self._k8sApp = client.AppsV1Api()
38   self._k8sCore = client.CoreV1Api()
39   self._k8sEV1B1 = client.ExtensionsV1beta1Api()
40
41  def Deployment(self, name, namespace=None):
42   # this will throw kubernetes.client.rest.ApiException if
43   # the deployment doesn't exist.  we'll let robot cope with
44   # that.
45
46   # calling code will most likely want to check that
47   # deploy.status.replicas == deploy.status.available_replicas
48   return self._k8sApp.read_namespaced_deployment(namespace=namespace or self._ns,
49                                                  name=name)
50
51  def Service(self, name, namespace=None):
52   # as above, we'll rely on this to throw if the svc dne.
53
54   # not much to check directly here.  calling code will want
55   # to hit svc.spec.cluster_ip:r.spec.ports[0..n] with some
56   # sort of health-check request
57   return self._k8sCore.read_namespaced_service(namespace=namespace or self._ns,
58                                                name=name)
59
60  def Pod(self, name, namepsace=None):
61   return self._k8sCore.read_namespaced_pod(namespace=namespace or self._ns,
62                                            name=name)
63
64  def Redeploy(self, name, wait=True, timeout=30, namespace=None):
65   # restart an existing deployment by doing a nonsense update
66   # to its spec.
67   body = {'spec':
68           {'template':
69            {'metadata':
70             {'annotations':
71              { self._annotationGensym: str(time.time()) }}}}}
72
73   r = self._k8sEV1B1.patch_namespaced_deployment(namespace=namespace or self._ns,
74                                                  name=name,
75                                                  body=body)
76   if wait:
77    r = self.WaitForDeployment(name, timeout, namespace=namespace or self._ns)
78   return r
79
80  def WaitForDeployment(self, name, timeout=30, namespace=None):
81   # block until a deployment is available
82   while timeout > 0:
83    dep = self.Deployment(name, namespace=namespace or self._ns)
84    if dep and dep.status.conditions[-1].type == 'Available':
85     return True
86    time.sleep(1)
87    timeout -= 1
88   raise TimeoutError('Kubernetes timeout waiting for ' + name + ' to become available')
89
90  def RetrievePodsForDeployment(self, name, namespace=None):
91   # return the pod names associated with a deployment
92   d = self.Deployment(name, namespace or self._ns)
93   labels = d.spec.selector.match_labels
94   pods = self._k8sCore.list_namespaced_pod(namespace or self._ns,
95                                            label_selector=",".join(map(lambda k: k + "=" + labels[k], 
96                                                                        labels)))
97   return list(map(lambda i: i.metadata.name, pods.items))
98
99  def RetrieveLogForPod(self, pod, container='', tail=sys.maxsize, namespace=None):
100   # not really an "entity" thing per se, but.
101   # kinda want to include timestamps, but i don't have a use case for them.
102   return self._k8sCore.read_namespaced_pod_log(namespace=namespace or self._ns,
103                                                name=pod,
104                                                container=container,
105                                                tail_lines=tail).split('\n')[0:-1]
106
107  def ExecuteCommandInPod(self, pod, cmd, strip_newlines=True, namespace=None):
108    # near as i can tell, the python k8s client doesn't implement
109    # 'kubectl exec'.  this is near enough for our purposes.
110    # 'cmd' is an argv list.
111    channels={1: 'stdout', 2: 'stderr', 3: 'k8s'}
112    output={'stdout': [], 'stderr': [], 'k8s': []}
113    path='/api/v1/namespaces/%s/pods/%s/exec?%s&stdin=false&stderr=true&stdout=true&tty=false' % \
114         (namespace or self._ns, pod, urllib.parse.urlencode({'command': cmd}, doseq=True))
115    # we could probably cache and reuse the sslcontext, but meh, we're not
116    # after performance here.
117    ctx=ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
118    c = client.Configuration()
119    
120    async def ExecCoroutine():
121       # base64.channel.k8s.io is also a valid subprotocol, but i don't see any
122       # reason to support it.
123       async with websockets.connect(uri,\
124                                     ssl=ctx,\
125                                     subprotocols=["channel.k8s.io"],\
126                                     extra_headers=c.api_key) as ws:
127         async for message in ws:
128            if message[0] in channels and (not strip_newlines or len(message) > 1):
129              # we probably should throw up if we get an unrecognized channel, but
130              # i really don't want to be bothered with asyncio exception handling
131              # for that vanishingly improbable case.
132              output[channels[message[0]]].extend(message[1:-1].decode('utf-8').split('\n'))
133             
134    ctx.load_verify_locations(c.ssl_ca_cert)
135    if(c.cert_file and c.key_file):
136      ctx.load_cert_chain(c.cert_file, c.key_file)
137    uri = 'wss://%s%s' % (c.host.lstrip('https://'), path)
138
139    asyncio.get_event_loop().run_until_complete(ExecCoroutine())
140   
141    return(output)