RMR handler for A1 policy response 76/9876/9
authornaman.gupta <naman.gupta@samsung.com>
Wed, 30 Nov 2022 09:51:16 +0000 (15:21 +0530)
committernaman.gupta <naman.gupta@samsung.com>
Mon, 5 Dec 2022 14:20:51 +0000 (19:50 +0530)
RMR handler for A1 policy response(messagetype 20011) to xapp

Signed-off-by: naman.gupta <naman.gupta@samsung.com>
Change-Id: I8bd0d6065e450185fd88e243bc06225deb5a1656

a1-go/Dockerfile
a1-go/pkg/policy/policyManager.go [new file with mode: 0644]
a1-go/pkg/policy/types.go [new file with mode: 0644]
a1-go/pkg/resthooks/resthooks.go
a1-go/pkg/rmr/rmr.go

index a89ed15..9db6bd6 100644 (file)
 
 FROM nexus3.o-ran-sc.org:10002/o-ran-sc/bldr-ubuntu20-c-go:1.1.0 AS a1-build
 
+
+#TODO check why defualt golang is not working
+ARG GOVERSION="1.18.5"
+RUN wget -nv https://dl.google.com/go/go${GOVERSION}.linux-amd64.tar.gz \
+     && tar -xf go${GOVERSION}.linux-amd64.tar.gz \
+     && mv go /opt/go/${GOVERSION} \
+     && rm -f go*.gz
+
+
+ENV DEFAULTPATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
+ENV PATH=$DEFAULTPATH:/usr/local/go/bin:/opt/go/${GOVERSION}/bin:/root/go/bin
+
 RUN apt-get update -y && apt-get install -y jq
 
 # Update CA certificates
diff --git a/a1-go/pkg/policy/policyManager.go b/a1-go/pkg/policy/policyManager.go
new file mode 100644 (file)
index 0000000..bcf7a1a
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+==================================================================================
+  Copyright (c) 2022 Samsung
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+   platform project (RICP).
+==================================================================================
+*/
+
+package policy
+
+import (
+       "strconv"
+
+       "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
+       "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
+)
+
+const (
+       a1HandlerPrefix = "a1.policy_handler."
+       a1MediatorNs    = "A1m_ns"
+)
+
+func NewPolicyManager(sdl *sdlgo.SyncStorage) *PolicyManager {
+       return createPolicyManager(sdl)
+}
+
+func createPolicyManager(sdlInst iSdl) *PolicyManager {
+       pm := &PolicyManager{
+               db: sdlInst,
+       }
+       return pm
+}
+func (pm *PolicyManager) SetPolicyInstanceStatus(policyTypeId int, policyInstanceID int, status string) error {
+       a1.Logger.Debug("message recieved for %d and %d", policyTypeId, policyInstanceID)
+       instancehandlerKey := a1HandlerPrefix + strconv.FormatInt((int64(policyTypeId)), 10) + "." + strconv.FormatInt((int64(policyInstanceID)), 10)
+       err := pm.db.Set(a1MediatorNs, instancehandlerKey, status)
+       if err != nil {
+               a1.Logger.Error("error1 :%+v", err)
+               return err
+       }
+       return nil
+}
diff --git a/a1-go/pkg/policy/types.go b/a1-go/pkg/policy/types.go
new file mode 100644 (file)
index 0000000..9386ead
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+==================================================================================
+  Copyright (c) 2022 Samsung
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+   platform project (RICP).
+==================================================================================
+*/
+
+package policy
+
+type PolicyManager struct {
+       db iSdl
+}
+type iSdl interface {
+       Set(ns string, pairs ...interface{}) error
+}
index 8fb07df..508bc49 100644 (file)
@@ -30,6 +30,8 @@ import (
 
        "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
        "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models"
+       "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy"
+       "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/restapi/operations/a1_mediator"
        "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/rmr"
        "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
        "github.com/santhosh-tekuri/jsonschema/v5"
@@ -85,14 +87,18 @@ func (rh *Resthook) IsValidJson(err error) bool {
        return err == invalidJsonSchema
 }
 func NewResthook() *Resthook {
-       return createResthook(sdlgo.NewSyncStorage(), rmr.NewRMRSender())
+       sdl := sdlgo.NewSyncStorage()
+       policyManager := policy.NewPolicyManager(sdl)
+       return createResthook(sdl, rmr.NewRMRSender(policyManager))
 }
 
 func createResthook(sdlInst iSdl, rmrSenderInst rmr.IRmrSender) *Resthook {
-       return &Resthook{
+       rh := &Resthook{
                db:             sdlInst,
                iRmrSenderInst: rmrSenderInst,
        }
+
+       return rh
 }
 
 func (rh *Resthook) GetA1Health() bool {
index 660089b..2e158f8 100644 (file)
 package rmr
 
 import (
+       "encoding/json"
+
        "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
+       "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
 )
 
@@ -31,28 +34,36 @@ const (
 )
 
 type RmrSender struct {
-       rmrclient *xapp.RMRClient
+       rmrclient     *xapp.RMRClient
+       policyManager *policy.PolicyManager
 }
 
 type IRmrSender interface {
        RmrSendToXapp(httpBodyString string, messagetype int) bool
 }
 
-func NewRMRSender() IRmrSender {
+func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender {
        RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
                StatDesc: "",
                RmrData: xapp.PortData{
+                       //TODO: Read configuration from config file
                        Name:              "",
                        MaxSize:           65534,
                        ThreadType:        0,
                        LowLatency:        false,
                        FastAck:           false,
                        MaxRetryOnFailure: 1,
+                       Port:              4561,
                },
        })
-       return &RmrSender{
-               rmrclient: RMRclient,
+
+       rmrsender := &RmrSender{
+               rmrclient:     RMRclient,
+               policyManager: policyManager,
        }
+
+       rmrsender.RmrRecieveStart()
+       return rmrsender
 }
 
 func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool {
@@ -71,3 +82,39 @@ func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool
        a1.Logger.Debug("rmrSendToXapp: sending: %+v", s)
        return s
 }
+
+func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) {
+       a1.Logger.Debug("In the Consume function")
+       id := xapp.Rmr.GetRicMessageName(msg.Mtype)
+       a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen)
+
+       switch id {
+
+       case "A1_POLICY_RESP":
+               a1.Logger.Debug("Recived policy responose")
+               payload := msg.Payload
+               a1.Logger.Debug("message recieved : %s", payload)
+               var result map[string]interface{}
+               err := json.Unmarshal([]byte(payload), &result)
+               if err != nil {
+                       a1.Logger.Error("Unmarshal error : %+v", err)
+                       return err
+               }
+               a1.Logger.Debug("message recieved for %d and %d with status : %s", result["policy_type_id"], result["policy_instance_id"], result["status"])
+               rmr.policyManager.SetPolicyInstanceStatus(int(result["policy_type_id"].(float64)), int(result["policy_instance_id"].(float64)), result["status"].(string))
+       default:
+               xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype)
+       }
+
+       defer func() {
+               rmr.rmrclient.Free(msg.Mbuf)
+               msg.Mbuf = nil
+       }()
+       return
+}
+
+func (rmr *RmrSender) RmrRecieveStart() {
+       a1.Logger.Debug("Inside RmrRecieveStart function ")
+       rmr.rmrclient.Start(rmr)
+       a1.Logger.Debug("Reciever started")
+}