X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=AcumosXappAdapter%2Frmracumosadapter.py;fp=AcumosXappAdapter%2Frmracumosadapter.py;h=16ce7263856e8869ba5f5befdddb00c0b27cca07;hb=d53cfa8dd719817cbfea48ad032c8181515df2a3;hp=0000000000000000000000000000000000000000;hpb=e8b074b74616674d29327d306f88cca56ffd0ae2;p=ric-app%2Fml.git diff --git a/AcumosXappAdapter/rmracumosadapter.py b/AcumosXappAdapter/rmracumosadapter.py new file mode 100644 index 0000000..16ce726 --- /dev/null +++ b/AcumosXappAdapter/rmracumosadapter.py @@ -0,0 +1,127 @@ +# ========================LICENSE_START================================= +# O-RAN-SC +# %% +# Copyright (c) 2019 AT&T Intellectual Property. +# %% +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========================LICENSE_END=================================== + +# Adapter from RMR to standard Acumos model microservices. Must be deployed in the same pod as the Acumos model. +# Translates RMR protocol messages into calls into Acumos RPC calls. + + +from rmr import rmr +import time +import sys +import signal +import json +import requests + +verbose = True +requireartifacts = True + +confdir = '/conf/' +conffilename = 'config.json' +protobuffilename = 'model.proto' +metadatafilename = 'metadata.json' + +configfilename = confdir + conffilename + +if verbose: + print("Reading config file") + +# Fetch and parse config file which must be mounted as a volume during deployment +try: + with open(configfilename) as f: + conf = json.load(f) +except: + print('Cannot read/parse config file at', configfilename, '; aborting') + exit(1) + +methodurl = conf['microserviceRootURL'] + conf['methodRoot'] +artifacturl = conf["microserviceRootURL"] + conf['artifactRoot'] + +if verbose: + print ('\nRetrieving artifacts from Acumos model microservice\n') + +# See if we can retrieve protobuf and metadata artifacts from running model. Not all models may provide these, but we +# should have a retry mechanism added for robustness +try: + r = requests.get(artifacturl + 'protobuf') + protobuf = r.content + with open(confdir + protobuffilename, 'wb') as f: + f.write(protobuf) + if verbose: + print('Protbuf:') + print(protobuf.decode('ascii')) + r = requests.get(artifacturl + 'metadata') + metadata = r.content + with open(confdir + metadatafilename, 'wb') as f: + f.write(metadata) + if verbose: + print('\nMetadata:') + print(metadata.decode('ascii')) +except: + if requireartifacts: + print('Problem with retrieving/saving model protobuf and/or metadata; aborting.') + +method1 = conf['methods']['1'] +method1url = methodurl + method1['service'] +method1headers = {'content-type': method1['content-type'], 'accept': method1['return-type']} + +if verbose: + print('\nInitializing RMR\n') + +if verbose: + print('\Awaiting connections') + + +# NNG cleanup on signal +def signal_handler(sig, frame): + if verbose: + print('SIGINT received! Cleaning up rmr') + rmr.rmr_close(mrc) + print("Exiting") + sys.exit(0) + + +# Initialize RMR +mrc = rmr.rmr_init("4560".encode('utf-8'), rmr.RMR_MAX_RCV_BYTES, 0x00) +while rmr.rmr_ready(mrc) == 0: + time.sleep(1) + if verbose: + print("Not yet ready") +rmr.rmr_set_stimeout(mrc, 2) + + +# Capture ctrl-c +signal.signal(signal.SIGINT, signal_handler) + + +sbuf = None +while True: + if verbose: + print("Waiting for a message; will time out after 2000ms") + sbuf = rmr.rmr_torcv_msg(mrc, sbuf, 2000) + summary = rmr.message_summary(sbuf) + if verbose and summary['message state'] == 12: + print("Nothing received.") + else: + if verbose: + print("Message received: {}".format(summary)) + payload = sbuf['payload'] + # Call Acumos microservice + r = requests.post(method1url, headers=method1headers, body=payload) + val = r.content + rmr.set_payload_and_length(val, sbuf) + sbuf = rmr.rmr_rts_msg(mrc, sbuf)