2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
29 "github.com/spf13/viper"
30 "github.com/valyala/fastjson"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33 "gerrit.oran-osc.org/r/ric-plt/o1mediator/pkg/sbi"
37 #cgo LDFLAGS: -lsysrepo -lyang
42 #include <sysrepo/values.h>
47 var sbiClient sbi.SBIClientInterface
51 func NewNbi(s sbi.SBIClientInterface) *Nbi {
55 schemas: viper.GetStringSlice("nbi.schemas"),
56 cleanupChan: make(chan bool),
61 func (n *Nbi) Start() bool {
62 if ok := n.Setup(n.schemas); !ok {
63 log.Error("NBI: SYSREPO initialization failed, bailing out!")
66 log.Info("NBI: SYSREPO initialization done ... processing O1 requests!")
71 func (n *Nbi) Stop() {
72 C.sr_unsubscribe(n.subscription)
73 C.sr_session_stop(n.session)
74 C.sr_disconnect(n.connection)
76 log.Info("NBI: SYSREPO cleanup done gracefully!")
79 func (n *Nbi) Setup(schemas []string) bool {
80 rc := C.sr_connect(0, &n.connection)
81 if C.SR_ERR_OK != rc {
82 log.Error("NBI: sr_connect failed: %s", C.GoString(C.sr_strerror(rc)))
86 rc = C.sr_session_start(n.connection, C.SR_DS_RUNNING, &n.session)
87 if C.SR_ERR_OK != rc {
88 log.Error("NBI: sr_session_start failed: %s", C.GoString(C.sr_strerror(rc)))
93 if ok := n.DoSubscription(schemas); ok == true {
96 time.Sleep(time.Duration(5 * time.Second))
101 func (n *Nbi) DoSubscription(schemas []string) bool {
102 log.Info("Subscribing YANG modules ... %v", schemas)
103 for _, module := range schemas {
104 modName := C.CString(module)
105 defer C.free(unsafe.Pointer(modName))
107 if done := n.SubscribeModule(modName); !done {
114 func (n *Nbi) SubscribeModule(module *C.char) bool {
115 rc := C.sr_module_change_subscribe(n.session, module, nil, C.sr_module_change_cb(C.module_change_cb), nil, 0, 0, &n.subscription)
116 if C.SR_ERR_OK != rc {
117 log.Info("NBI: sr_module_change_subscribe failed: %s", C.GoString(C.sr_strerror(rc)))
123 //export nbiModuleChangeCB
124 func nbiModuleChangeCB(session *C.sr_session_ctx_t, module *C.char, xpath *C.char, event C.sr_event_t, reqId C.int) C.int {
125 changedModule := C.GoString(module)
126 changedXpath := C.GoString(xpath)
128 log.Info("NBI: Module change callback - event='%d' module=%s xpath=%s reqId=%d", event, changedModule, changedXpath, reqId)
130 if C.SR_EV_CHANGE == event {
131 configJson := C.yang_data_sr2json(session, module, event, &nbiClient.oper)
132 err := nbiClient.ManageXapps(changedModule, C.GoString(configJson), int(nbiClient.oper))
134 return C.SR_ERR_OPERATION_FAILED
138 if C.SR_EV_DONE == event {
139 configJson := C.get_data_json(session, module)
140 err := nbiClient.ManageConfigmaps(changedModule, C.GoString(configJson), int(nbiClient.oper))
142 return C.SR_ERR_OPERATION_FAILED
149 func (n *Nbi) ManageXapps(module, configJson string, oper int) error {
150 log.Info("ManageXapps: module=%s configJson=%s", module, configJson)
152 if configJson == "" || module != "o-ran-sc-ric-xapp-desc-v1" {
156 root := fmt.Sprintf("%s:ric", module)
157 jsonList, err := n.ParseJsonArray(configJson, root, "xapps", "xapp")
162 for _, m := range jsonList {
163 xappName := string(m.GetStringBytes("name"))
164 namespace := string(m.GetStringBytes("namespace"))
165 relName := string(m.GetStringBytes("release-name"))
166 version := string(m.GetStringBytes("version"))
168 desc := sbiClient.BuildXappDescriptor(xappName, namespace, relName, version)
170 case C.SR_OP_CREATED:
171 return sbiClient.DeployXapp(desc)
172 case C.SR_OP_DELETED:
173 return sbiClient.UndeployXapp(desc)
175 return errors.New(fmt.Sprintf("Operation '%d' not supported!", oper))
181 func (n *Nbi) ManageConfigmaps(module, configJson string, oper int) error {
182 log.Info("ManageConfig: module=%s configJson=%s", module, configJson)
184 if configJson == "" || module != "o-ran-sc-ric-ueec-config-v1" {
188 if oper != C.SR_OP_MODIFIED {
189 return errors.New(fmt.Sprintf("Operation '%d' not supported!", oper))
192 value, err := n.ParseJson(configJson)
197 root := fmt.Sprintf("%s:ric", module)
198 appName := string(value.GetStringBytes(root, "config", "name"))
199 namespace := string(value.GetStringBytes(root, "config", "namespace"))
200 control := value.Get(root, "config", "control").String()
203 err = json.Unmarshal([]byte(strings.ReplaceAll(control, "\\", "")), &f)
205 log.Info("json.Unmarshal failed: %v", err)
209 xappConfig := sbiClient.BuildXappConfig(appName, namespace, f)
210 return sbiClient.ModifyXappConfig(xappConfig)
213 func (n *Nbi) ParseJson(dsContent string) (*fastjson.Value, error) {
214 var p fastjson.Parser
215 v, err := p.Parse(dsContent)
217 log.Info("fastjson.Parser failed: %v", err)
222 func (n *Nbi) ParseJsonArray(dsContent, model, top, elem string) ([]*fastjson.Value, error) {
223 v, err := n.ParseJson(dsContent)
227 return v.GetArray(model, top, elem), nil