# limitations under the License.
from kubernetes import client, config
+import sys
import string
import random
import time
-import sys
+import ssl
+import asyncio
+import websockets
+import urllib.parse
# This library provides a massively-simplified interface to the kubernetes
# API library to reduce bloat in robot tests.
self._k8sCore = client.CoreV1Api()
self._k8sEV1B1 = client.ExtensionsV1beta1Api()
- def Deployment(self, name):
+ def Deployment(self, name, namespace=None):
# this will throw kubernetes.client.rest.ApiException if
# the deployment doesn't exist. we'll let robot cope with
# that.
# calling code will most likely want to check that
# deploy.status.replicas == deploy.status.available_replicas
- return self._k8sApp.read_namespaced_deployment(namespace=self._ns,
+ return self._k8sApp.read_namespaced_deployment(namespace=namespace or self._ns,
name=name)
- def Service(self, name):
+ def StatefulSet(self, name, namespace=None):
+ # as above, but for statefulsets, and with the assumption
+ # that the typical check here sfst.replicas == sfst.ready_replicas
+ return self._k8sApp.read_namespaced_stateful_set(namespace = namespace or self._ns,
+ name=name)
+
+ def Service(self, name, namespace=None):
# as above, we'll rely on this to throw if the svc dne.
# not much to check directly here. calling code will want
# to hit svc.spec.cluster_ip:r.spec.ports[0..n] with some
# sort of health-check request
- return self._k8sCore.read_namespaced_service(namespace=self._ns,
+ return self._k8sCore.read_namespaced_service(namespace=namespace or self._ns,
name=name)
- def Pod(self, name):
- return self._k8sCore.read_namespaced_pod(namespace=self._ns,
+ def Pod(self, name, namespace=None):
+ return self._k8sCore.read_namespaced_pod(namespace=namespace or self._ns,
name=name)
- def Redeploy(self, name, wait=True, timeout=30):
+ def Redeploy(self, name, wait=True, timeout=30, namespace=None):
# restart an existing deployment by doing a nonsense update
# to its spec.
body = {'spec':
{'annotations':
{ self._annotationGensym: str(time.time()) }}}}}
- r = self._k8sEV1B1.patch_namespaced_deployment(namespace=self._ns,
+ r = self._k8sEV1B1.patch_namespaced_deployment(namespace=namespace or self._ns,
name=name,
body=body)
if wait:
- r = self.WaitForDeployment(name, timeout)
+ r = self.WaitForDeployment(name, timeout, namespace=namespace or self._ns)
return r
- def WaitForDeployment(self, name, timeout=30):
+ def WaitForDeployment(self, name, timeout=30, namespace=None):
# block until a deployment is available
while timeout > 0:
- dep = self.Deployment(name)
+ dep = self.Deployment(name, namespace=namespace or self._ns)
if dep and dep.status.conditions[-1].type == 'Available':
return True
time.sleep(1)
timeout -= 1
raise TimeoutError('Kubernetes timeout waiting for ' + name + ' to become available')
- def RetrievePodsForDeployment(self, name):
+ def RetrievePodsForDeployment(self, name, namespace=None):
# return the pod names associated with a deployment
- d = self.Deployment(name)
+ d = self.Deployment(name, namespace or self._ns)
labels = d.spec.selector.match_labels
- pods = self._k8sCore.list_namespaced_pod(self._ns,
+ pods = self._k8sCore.list_namespaced_pod(namespace or self._ns,
label_selector=",".join(map(lambda k: k + "=" + labels[k],
labels)))
- return map(lambda i: i.metadata.name, pods.items)
-
- def RetrieveLogForPod(self, pod, container='', tail=sys.maxsize):
+ return list(map(lambda i: i.metadata.name, pods.items))
+
+ def RetrieveLogForPod(self, pod, container='', tail=sys.maxsize, namespace=None):
# not really an "entity" thing per se, but.
# kinda want to include timestamps, but i don't have a use case for them.
- return self._k8sCore.read_namespaced_pod_log(namespace=self._ns,
+ return self._k8sCore.read_namespaced_pod_log(namespace=namespace or self._ns,
name=pod,
container=container,
tail_lines=tail).split('\n')[0:-1]
+
+ def ExecuteCommandInPod(self, pod, cmd, strip_newlines=True, namespace=None):
+ # near as i can tell, the python k8s client doesn't implement
+ # 'kubectl exec'. this is near enough for our purposes.
+ # 'cmd' is an argv list.
+ channels={1: 'stdout', 2: 'stderr', 3: 'k8s'}
+ output={'stdout': [], 'stderr': [], 'k8s': []}
+ path='/api/v1/namespaces/%s/pods/%s/exec?%s&stdin=false&stderr=true&stdout=true&tty=false' % \
+ (namespace or self._ns, pod, urllib.parse.urlencode({'command': cmd}, doseq=True))
+ # we could probably cache and reuse the sslcontext, but meh, we're not
+ # after performance here.
+ ctx=ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ c = client.Configuration()
+
+ async def ExecCoroutine():
+ # base64.channel.k8s.io is also a valid subprotocol, but i don't see any
+ # reason to support it.
+ async with websockets.connect(uri,\
+ ssl=ctx,\
+ subprotocols=["channel.k8s.io"],\
+ extra_headers=c.api_key) as ws:
+ async for message in ws:
+ if message[0] in channels and (not strip_newlines or len(message) > 1):
+ # we probably should throw up if we get an unrecognized channel, but
+ # i really don't want to be bothered with asyncio exception handling
+ # for that vanishingly improbable case.
+ output[channels[message[0]]].extend(message[1:-1].decode('utf-8').split('\n'))
+
+ ctx.load_verify_locations(c.ssl_ca_cert)
+ if(c.cert_file and c.key_file):
+ ctx.load_cert_chain(c.cert_file, c.key_file)
+ uri = 'wss://%s%s' % (c.host.lstrip('https://'), path)
+
+ asyncio.get_event_loop().run_until_complete(ExecCoroutine())
+
+ return(output)