Handling of subscription merge and inclusion of RMR lib from xapp-framework 20/2420/1
authorwahidw <abdulwahid.w@nokia.com>
Wed, 5 Feb 2020 10:01:12 +0000 (10:01 +0000)
committerwahidw <abdulwahid.w@nokia.com>
Wed, 5 Feb 2020 10:01:22 +0000 (10:01 +0000)
Change-Id: I51f28bf758e542629263fcc83c1e9c13b2b26f3e
Signed-off-by: wahidw <abdulwahid.w@nokia.com>
RELNOTES
cmd/rtmgr.go
container-tag.yaml
pkg/nbi/httprestful.go
pkg/nbi/types.go
pkg/rpe/rpe.go
pkg/rtmgr/types.go
pkg/sbi/control.go [new file with mode: 0644]
pkg/sbi/nngpush.go

index c9f5423..e78a029 100644 (file)
--- a/RELNOTES
+++ b/RELNOTES
@@ -1,3 +1,6 @@
+### v0.4.11
+* Added code for subscription merge and added RMR from xapp-framework 
+
 ### v0.4.10
 * Temporary Fix for R3 (E2M->E2T issue) - retrying when is_Ready flag in socket handle is false 
 
index ceb28e8..bd17d4f 100644 (file)
@@ -145,6 +145,9 @@ func main() {
 
        var m sync.Mutex
 
+       c := sbi.NewControl()
+       go c.Run()
+
        serve(nbiEngine, sbiEngine, sdlEngine, rpeEngine, &m)
        os.Exit(0)
 }
index 42ff158..f8f7249 100644 (file)
@@ -2,4 +2,4 @@
 # By default this file is in the docker build directory,
 # but the location can configured in the JJB template.
 ---
-tag: 0.4.10
+tag: 0.4.11
index 1d5e1fa..ac56aa0 100644 (file)
@@ -38,8 +38,8 @@ import (
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "github.com/go-openapi/loads"
        "github.com/go-openapi/runtime/middleware"
-       "net/url"
        "net"
+       "net/url"
        "os"
        "routing-manager/pkg/models"
        "routing-manager/pkg/restapi"
@@ -49,9 +49,9 @@ import (
        "routing-manager/pkg/rtmgr"
        "routing-manager/pkg/sdl"
        "strconv"
-       "time"
-       "sync"
        "strings"
+       "sync"
+       "time"
 )
 
 //var myClient = &http.Client{Timeout: 1 * time.Second}
@@ -60,7 +60,7 @@ type HttpRestful struct {
        Engine
        LaunchRest                   LaunchRestHandler
        RecvXappCallbackData         RecvXappCallbackDataHandler
-        RecvNewE2Tdata               RecvNewE2TdataHandler 
+       RecvNewE2Tdata               RecvNewE2TdataHandler
        ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
        RetrieveStartupData          RetrieveStartupDataHandler
 }
@@ -69,7 +69,7 @@ func NewHttpRestful() *HttpRestful {
        instance := new(HttpRestful)
        instance.LaunchRest = launchRest
        instance.RecvXappCallbackData = recvXappCallbackData
-        instance.RecvNewE2Tdata = recvNewE2Tdata
+       instance.RecvNewE2Tdata = recvNewE2Tdata
        instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
        instance.RetrieveStartupData = retrieveStartupData
        return instance
@@ -99,37 +99,37 @@ func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr
        return nil, nil
 }
 
-func recvNewE2Tdata(dataChannel <-chan *models.E2tData) (*rtmgr.E2TInstance,string,error) {
-        var e2tData *models.E2tData
+func recvNewE2Tdata(dataChannel <-chan *models.E2tData) (*rtmgr.E2TInstance, string, error) {
+       var e2tData *models.E2tData
        var str string
-        xapp.Logger.Info("data received")
+       xapp.Logger.Info("data received")
 
-        e2tData = <-dataChannel
+       e2tData = <-dataChannel
 
-        if nil != e2tData {
+       if nil != e2tData {
 
-                       e2tinst :=  rtmgr.E2TInstance {
-                                Ranlist : make([]string, len(e2tData.RanNamelist)),
+               e2tinst := rtmgr.E2TInstance{
+                       Ranlist: make([]string, len(e2tData.RanNamelist)),
+               }
+
+               e2tinst.Fqdn = *e2tData.E2TAddress
+               e2tinst.Name = "E2TERMINST"
+               copy(e2tinst.Ranlist, e2tData.RanNamelist)
+               if len(e2tData.RanNamelist) > 0 {
+                       var meidar string
+                       for _, meid := range e2tData.RanNamelist {
+                               meidar += meid + " "
                        }
+                       str = "mme_ar|" + *e2tData.E2TAddress + "|" + strings.TrimSuffix(meidar, " ")
+               }
+               return &e2tinst, str, nil
 
-        e2tinst.Fqdn = *e2tData.E2TAddress
-        e2tinst.Name = "E2TERMINST"
-       copy(e2tinst.Ranlist, e2tData.RanNamelist)
-       if (len(e2tData.RanNamelist) > 0) {
-           var meidar string
-           for _, meid := range e2tData.RanNamelist {
-               meidar += meid + " "
-           }
-           str = "mme_ar|" + *e2tData.E2TAddress + "|" + strings.TrimSuffix(meidar," ")
+       } else {
+               xapp.Logger.Info("No data")
        }
-        return &e2tinst,str,nil
-
-        } else {
-                xapp.Logger.Info("No data")
-        }
 
-        xapp.Logger.Debug("Nothing received on the Http interface")
-        return nil,str,nil
+       xapp.Logger.Debug("Nothing received on the Http interface")
+       return nil, str, nil
 }
 
 func validateXappCallbackData(callbackData *models.XappCallbackData) error {
@@ -172,21 +172,21 @@ func validateXappSubscriptionData(data *models.XappSubscriptionData) error {
 func validateE2tData(data *models.E2tData) error {
 
        e2taddress_key := *data.E2TAddress
-        if (e2taddress_key == "") {
-                return fmt.Errorf("E2TAddress is empty!!!")
-        }
+       if e2taddress_key == "" {
+               return fmt.Errorf("E2TAddress is empty!!!")
+       }
        stringSlice := strings.Split(e2taddress_key, ":")
-       if (len(stringSlice) == 1) {
-               return fmt.Errorf("E2T E2TAddress is not a proper format like ip:port, %v", e2taddress_key )
+       if len(stringSlice) == 1 {
+               return fmt.Errorf("E2T E2TAddress is not a proper format like ip:port, %v", e2taddress_key)
        }
 
        _, err := net.LookupIP(stringSlice[0])
        if err != nil {
                return fmt.Errorf("E2T E2TAddress DNS look up failed, E2TAddress: %v", stringSlice[0])
-        }
+       }
 
        if checkValidaE2TAddress(e2taddress_key) {
-               return fmt.Errorf("E2TAddress already exist!!!, E2TAddress: %v",e2taddress_key)
+               return fmt.Errorf("E2TAddress already exist!!!, E2TAddress: %v", e2taddress_key)
        }
 
        return nil
@@ -194,21 +194,20 @@ func validateE2tData(data *models.E2tData) error {
 
 func validateDeleteE2tData(data *models.E2tDeleteData) error {
 
-        if (*data.E2TAddress == "") {
-                return fmt.Errorf("E2TAddress is empty!!!")
-        }
+       if *data.E2TAddress == "" {
+               return fmt.Errorf("E2TAddress is empty!!!")
+       }
 
        for _, element := range data.RanAssocList {
                e2taddress_key := *element.E2TAddress
                stringSlice := strings.Split(e2taddress_key, ":")
 
-               if (len(stringSlice) == 1) {
+               if len(stringSlice) == 1 {
                        return fmt.Errorf("E2T Delete - RanAssocList E2TAddress is not a proper format like ip:port, %v", e2taddress_key)
                }
 
-
                if !checkValidaE2TAddress(e2taddress_key) {
-                               return fmt.Errorf("E2TAddress doesn't exist!!!, E2TAddress: %v",e2taddress_key)
+                       return fmt.Errorf("E2TAddress doesn't exist!!!, E2TAddress: %v", e2taddress_key)
                }
 
        }
@@ -267,16 +266,41 @@ func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscription
        return nil
 }
 
+func updateXappSubscriptionHandleImpl(subupdatechan chan<- *rtmgr.XappList, data *models.XappList, subid uint16) error {
+       xapp.Logger.Debug("Invoked updateXappSubscriptionHandleImpl")
+
+       var fqdnlist []rtmgr.FqDn
+       for _, item := range *data {
+               fqdnlist = append(fqdnlist, rtmgr.FqDn(*item))
+       }
+       xapplist := rtmgr.XappList{SubscriptionID: subid, FqdnList: fqdnlist}
+       var subdata models.XappSubscriptionData
+       var id int32
+       id = int32(subid)
+       subdata.SubscriptionID = &id
+       for _, items := range fqdnlist {
+               subdata.Address = items.Address
+               subdata.Port = items.Port
+               err := validateXappSubscriptionData(&subdata)
+               if err != nil {
+                       xapp.Logger.Error(err.Error())
+                       return err
+               }
+       }
+       subupdatechan <- &xapplist
+       return nil
+}
+
 func createNewE2tHandleHandlerImpl(e2taddchan chan<- *models.E2tData,
-        data *models.E2tData) error {
-        xapp.Logger.Debug("Invoked createNewE2tHandleHandlerImpl")
-        err := validateE2tData(data)
-        if err != nil {
-                xapp.Logger.Error(err.Error())
-                return err
-        }
-        e2taddchan <- data
-        return nil
+       data *models.E2tData) error {
+       xapp.Logger.Debug("Invoked createNewE2tHandleHandlerImpl")
+       err := validateE2tData(data)
+       if err != nil {
+               xapp.Logger.Error(err.Error())
+               return err
+       }
+       e2taddchan <- data
+       return nil
 }
 
 func validateE2TAddressRANListData(assRanE2tData models.RanE2tMap) error {
@@ -290,7 +314,7 @@ func validateE2TAddressRANListData(assRanE2tData models.RanE2tMap) error {
 
                e2taddress_key := *element.E2TAddress
                if !checkValidaE2TAddress(e2taddress_key) {
-                       return fmt.Errorf("E2TAddress doesn't exist!!!, E2TAddress: %v",e2taddress_key)
+                       return fmt.Errorf("E2TAddress doesn't exist!!!, E2TAddress: %v", e2taddress_key)
                }
 
        }
@@ -298,44 +322,44 @@ func validateE2TAddressRANListData(assRanE2tData models.RanE2tMap) error {
 }
 
 func associateRanToE2THandlerImpl(assranchan chan<- models.RanE2tMap,
-        data models.RanE2tMap) error {
-        xapp.Logger.Debug("Invoked associateRanToE2THandlerImpl")
+       data models.RanE2tMap) error {
+       xapp.Logger.Debug("Invoked associateRanToE2THandlerImpl")
        err := validateE2TAddressRANListData(data)
        if err != nil {
                xapp.Logger.Warn(" Association of RAN to E2T Instance data validation failed: " + err.Error())
                return err
        }
        assranchan <- data
-        return nil
+       return nil
 }
 
 func disassociateRanToE2THandlerImpl(disassranchan chan<- models.RanE2tMap,
-        data models.RanE2tMap) error {
-        xapp.Logger.Debug("Invoked disassociateRanToE2THandlerImpl")
+       data models.RanE2tMap) error {
+       xapp.Logger.Debug("Invoked disassociateRanToE2THandlerImpl")
        err := validateE2TAddressRANListData(data)
        if err != nil {
                xapp.Logger.Warn(" Disassociation of RAN List from E2T Instance data validation failed: " + err.Error())
                return err
        }
        disassranchan <- data
-        return nil
+       return nil
 }
 
 func deleteE2tHandleHandlerImpl(e2tdelchan chan<- *models.E2tDeleteData,
-        data *models.E2tDeleteData) error {
-        xapp.Logger.Debug("Invoked deleteE2tHandleHandlerImpl")
+       data *models.E2tDeleteData) error {
+       xapp.Logger.Debug("Invoked deleteE2tHandleHandlerImpl")
 
-        err := validateDeleteE2tData(data)
-        if err != nil {
-                xapp.Logger.Error(err.Error())
-                return err
-        }
+       err := validateDeleteE2tData(data)
+       if err != nil {
+               xapp.Logger.Error(err.Error())
+               return err
+       }
 
-        e2tdelchan <- data
-        return nil
+       e2tdelchan <- data
+       return nil
 }
 
-func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData,
+func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData, subupdatechan chan<- *rtmgr.XappList,
        subdelchan chan<- *models.XappSubscriptionData, e2taddchan chan<- *models.E2tData, assranchan chan<- models.RanE2tMap, disassranchan chan<- models.RanE2tMap, e2tdelchan chan<- *models.E2tDeleteData) {
        swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
        if err != nil {
@@ -392,49 +416,60 @@ func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan c
                                return handle.NewGetHandlesOK()
                        }
                })
-       api.HandleCreateNewE2tHandleHandler = handle.CreateNewE2tHandleHandlerFunc(
-                func(params handle.CreateNewE2tHandleParams) middleware.Responder {
-                        err := createNewE2tHandleHandlerImpl(e2taddchan, params.E2tData)
-                        if err != nil {
-                                return handle.NewCreateNewE2tHandleBadRequest()
-                        } else {
+       api.HandleUpdateXappSubscriptionHandleHandler = handle.UpdateXappSubscriptionHandleHandlerFunc(
+               func(params handle.UpdateXappSubscriptionHandleParams) middleware.Responder {
+                       err := updateXappSubscriptionHandleImpl(subupdatechan, &params.XappList, params.SubscriptionID)
+                       if err != nil {
+                               return handle.NewUpdateXappSubscriptionHandleBadRequest()
+                       } else {
+                               //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
+                               time.Sleep(1 * time.Second)
+                               return handle.NewUpdateXappSubscriptionHandleCreated()
+                       }
+               })
+       api.HandleCreateNewE2tHandleHandler = handle.CreateNewE2tHandleHandlerFunc(
+               func(params handle.CreateNewE2tHandleParams) middleware.Responder {
+                       err := createNewE2tHandleHandlerImpl(e2taddchan, params.E2tData)
+                       if err != nil {
+                               return handle.NewCreateNewE2tHandleBadRequest()
+                       } else {
                                time.Sleep(1 * time.Second)
-                                return handle.NewCreateNewE2tHandleCreated()
-                        }
-                })
+                               return handle.NewCreateNewE2tHandleCreated()
+                       }
+               })
 
-       api.HandleAssociateRanToE2tHandleHandler = handle.AssociateRanToE2tHandleHandlerFunc(
+       api.HandleAssociateRanToE2tHandleHandler = handle.AssociateRanToE2tHandleHandlerFunc(
                func(params handle.AssociateRanToE2tHandleParams) middleware.Responder {
-                        err := associateRanToE2THandlerImpl(assranchan, params.RanE2tList)
+                       err := associateRanToE2THandlerImpl(assranchan, params.RanE2tList)
                        if err != nil {
-                                return handle.NewAssociateRanToE2tHandleBadRequest()
-                        } else {
+                               return handle.NewAssociateRanToE2tHandleBadRequest()
+                       } else {
                                time.Sleep(1 * time.Second)
-                                return handle.NewAssociateRanToE2tHandleCreated()
-                        }
-                })
+                               return handle.NewAssociateRanToE2tHandleCreated()
+                       }
+               })
 
-       api.HandleDissociateRanHandler = handle.DissociateRanHandlerFunc(
-               func(params handle.DissociateRanParams) middleware.Responder {
+       api.HandleDissociateRanHandler = handle.DissociateRanHandlerFunc(
+               func(params handle.DissociateRanParams) middleware.Responder {
                        err := disassociateRanToE2THandlerImpl(disassranchan, params.DissociateList)
                        if err != nil {
-                                return handle.NewDissociateRanBadRequest()
-                        } else {
+                               return handle.NewDissociateRanBadRequest()
+                       } else {
                                time.Sleep(1 * time.Second)
-                                return handle.NewDissociateRanCreated()
-                        }
-                })
-
-       api.HandleDeleteE2tHandleHandler = handle.DeleteE2tHandleHandlerFunc(
-                func(params handle.DeleteE2tHandleParams) middleware.Responder {
-                        err := deleteE2tHandleHandlerImpl(e2tdelchan, params.E2tData)
-                        if err != nil {
-                                return handle.NewDeleteE2tHandleBadRequest()
-                        } else {
+                               return handle.NewDissociateRanCreated()
+                       }
+               })
+
+       api.HandleDeleteE2tHandleHandler = handle.DeleteE2tHandleHandlerFunc(
+               func(params handle.DeleteE2tHandleParams) middleware.Responder {
+                       err := deleteE2tHandleHandlerImpl(e2tdelchan, params.E2tData)
+                       if err != nil {
+                               return handle.NewDeleteE2tHandleBadRequest()
+                       } else {
                                time.Sleep(1 * time.Second)
-                                return handle.NewDeleteE2tHandleCreated()
-                        }
-                })
+                               return handle.NewDeleteE2tHandleCreated()
+                       }
+               })
        // start to serve API
        xapp.Logger.Info("Starting the HTTP Rest service")
        if err := server.Serve(); err != nil {
@@ -479,7 +514,7 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile
                        }
                        xapp.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
                        // Combine the xapps data and platform data before writing to the SDL
-                       ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData, E2Ts:  make(map[string]rtmgr.E2TInstance)}
+                       ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData, E2Ts: make(map[string]rtmgr.E2TInstance)}
                        writeErr := sdlEngine.WriteAll(fileName, ricData)
                        if writeErr != nil {
                                xapp.Logger.Error(writeErr.Error())
@@ -510,13 +545,14 @@ func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, co
        datach := make(chan *models.XappCallbackData, 10)
        subschan := make(chan *models.XappSubscriptionData, 10)
        subdelchan := make(chan *models.XappSubscriptionData, 10)
+       subupdatechan := make(chan *rtmgr.XappList, 10)
        e2taddchan := make(chan *models.E2tData, 10)
        associateranchan := make(chan models.RanE2tMap, 10)
        disassociateranchan := make(chan models.RanE2tMap, 10)
        e2tdelchan := make(chan *models.E2tDeleteData, 10)
        xapp.Logger.Info("Launching Rest Http service")
        go func() {
-               r.LaunchRest(&nbiif, datach, subschan, subdelchan, e2taddchan, associateranchan, disassociateranchan, e2tdelchan)
+               r.LaunchRest(&nbiif, datach, subschan, subupdatechan, subdelchan, e2taddchan, associateranchan, disassociateranchan, e2tdelchan)
        }()
 
        go func() {
@@ -555,56 +591,65 @@ func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, co
                }
        }()
 
-        go func() {
-                for {
-                        xapp.Logger.Debug("received create New E2T data")
+       go func() {
+               for {
+                       data := <-subupdatechan
+                       xapp.Logger.Debug("received XApp subscription Merge data")
+                       updateSubscription(data)
+                       triggerSBI <- true
+               }
+       }()
+
+       go func() {
+               for {
 
-                        data, meiddata,_ := r.RecvNewE2Tdata(e2taddchan)
-                        if data != nil {
+                       data, meiddata, _ := r.RecvNewE2Tdata(e2taddchan)
+                       if data != nil {
+                               xapp.Logger.Debug("received create New E2T data")
                                m.Lock()
-                                sdlEngine.WriteNewE2TInstance(fileName, data,meiddata)
+                               sdlEngine.WriteNewE2TInstance(fileName, data, meiddata)
                                m.Unlock()
-                                triggerSBI <- true
-                        }
-                }
-        }()
+                               triggerSBI <- true
+                       }
+               }
+       }()
 
-        go func() {
-                for {
+       go func() {
+               for {
                        data := <-associateranchan
-                        xapp.Logger.Debug("received associate RAN list to E2T instance mapping from E2 Manager")
+                       xapp.Logger.Debug("received associate RAN list to E2T instance mapping from E2 Manager")
                        m.Lock()
-                        sdlEngine.WriteAssRANToE2TInstance(fileName, data)
+                       sdlEngine.WriteAssRANToE2TInstance(fileName, data)
                        m.Unlock()
-                        triggerSBI <- true
-                }
-        }()
+                       triggerSBI <- true
+               }
+       }()
 
-        go func() {
-                for {
+       go func() {
+               for {
 
                        data := <-disassociateranchan
-                        xapp.Logger.Debug("received disassociate RANs from E2T instance")
+                       xapp.Logger.Debug("received disassociate RANs from E2T instance")
                        m.Lock()
-                        sdlEngine.WriteDisAssRANFromE2TInstance(fileName, data)
+                       sdlEngine.WriteDisAssRANFromE2TInstance(fileName, data)
                        m.Unlock()
-                        triggerSBI <- true
-                }
-        }()
+                       triggerSBI <- true
+               }
+       }()
 
-        go func() {
-                for {
-                        xapp.Logger.Debug("received Delete E2T data")
+       go func() {
+               for {
 
                        data := <-e2tdelchan
-                        if data != nil {
+                       xapp.Logger.Debug("received Delete E2T data")
+                       if data != nil {
                                m.Lock()
-                                sdlEngine.WriteDeleteE2TInstance(fileName, data)
+                               sdlEngine.WriteDeleteE2TInstance(fileName, data)
                                m.Unlock()
-                                triggerSBI <- true
-                        }
-                }
-        }()
+                               triggerSBI <- true
+                       }
+               }
+       }()
 
        return nil
 }
@@ -614,6 +659,7 @@ func (r *HttpRestful) Terminate() error {
 }
 
 func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
+       xapp.Logger.Debug("Adding the subscription into the subscriptions list")
        var b = false
        sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
        for _, elem := range *subs {
@@ -647,3 +693,39 @@ func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubsc
        }
        return present
 }
+
+func updateSubscription(data *rtmgr.XappList) {
+
+       var subdata models.XappSubscriptionData
+       var id int32
+       var matchingsubid, deletecount uint8
+       id = int32(data.SubscriptionID)
+       subdata.SubscriptionID = &id
+       for _, subs := range rtmgr.Subs {
+               if int32(data.SubscriptionID) == subs.SubID {
+                       matchingsubid++
+               }
+       }
+
+       for deletecount < matchingsubid {
+               for _, subs := range rtmgr.Subs {
+                       if int32(data.SubscriptionID) == subs.SubID {
+                               subdata.SubscriptionID = &subs.SubID
+                               subdata.Address = &subs.Fqdn
+                               subdata.Port = &subs.Port
+                               xapp.Logger.Debug("Deletion Subscription List has %v", subdata)
+                               delSubscription(&rtmgr.Subs, &subdata)
+                               break
+                       }
+               }
+               deletecount++
+       }
+
+       for _, items := range data.FqdnList {
+               subdata.Address = items.Address
+               subdata.Port = items.Port
+               xapp.Logger.Debug("Adding Subscription List has %v", subdata)
+               addSubscription(&rtmgr.Subs, &subdata)
+       }
+
+}
index 1acf09c..80e7fd2 100644 (file)
@@ -40,7 +40,7 @@ import (
 type FetchAllXAppsHandler func(string) (*[]rtmgr.XApp, error)
 type RecvXappCallbackDataHandler func(<-chan *models.XappCallbackData) (*[]rtmgr.XApp, error)
 type RecvNewE2TdataHandler func(<-chan *models.E2tData) (*rtmgr.E2TInstance, string, error)
-type LaunchRestHandler func(*string, chan<- *models.XappCallbackData, chan<- *models.XappSubscriptionData, chan<- *models.XappSubscriptionData, chan<- *models.E2tData, chan<- models.RanE2tMap, chan<- models.RanE2tMap, chan<- *models.E2tDeleteData)
+type LaunchRestHandler func(*string, chan<- *models.XappCallbackData, chan<- *models.XappSubscriptionData, chan<- *rtmgr.XappList, chan<- *models.XappSubscriptionData, chan<- *models.E2tData, chan<- models.RanE2tMap, chan<- models.RanE2tMap, chan<- *models.E2tDeleteData)
 type ProvideXappHandleHandlerImpl func(chan<- *models.XappCallbackData, *models.XappCallbackData) error
 type RetrieveStartupDataHandler func(string, string, string, string, sdl.Engine) error
 
index 8ee9b52..1c006b4 100644 (file)
@@ -152,7 +152,7 @@ func (r *Rpe) addRoute_rx_list(messageType string, tx *rtmgr.Endpoint, rx []rtmg
        //      xapp.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId)
 }
 
-func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
        xapp.Logger.Debug("rpe.generateXappRoutes invoked")
        xapp.Logger.Debug("Endpoint: %v, xAppType: %v", xAppEp.Name, xAppEp.XAppType)
        if xAppEp.XAppType != sbi.PlatformType && (len(xAppEp.TxMessages) > 0 || len(xAppEp.RxMessages) > 0) {
@@ -179,7 +179,7 @@ func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, e2TermEp *rtmgr.Endpoin
 
 }
 
-func (r *Rpe) generateSubscriptionRoutes(selectedxAppEp *rtmgr.Endpoint, e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+func (r *Rpe) generateSubscriptionRoutes(selectedxAppEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
        xapp.Logger.Debug("rpe.addSubscriptionRoutes invoked")
        subscriptionList := &rtmgr.Subs
        for _, subscription := range *subscriptionList {
@@ -196,14 +196,14 @@ func (r *Rpe) generateSubscriptionRoutes(selectedxAppEp *rtmgr.Endpoint, e2TermE
                        r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, subscription.SubID, "")
                        r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID, "")
                        //E2 Termination -> xApp
-                       r.addRoute("RIC_INDICATION", e2TermEp, xAppEp, routeTable, subscription.SubID, "")
-                       r.addRoute("RIC_CONTROL_ACK", e2TermEp, xAppEp, routeTable, subscription.SubID, "")
-                       r.addRoute("RIC_CONTROL_FAILURE", e2TermEp, xAppEp, routeTable, subscription.SubID, "")
+                       r.addRoute("RIC_INDICATION", nil, xAppEp, routeTable, subscription.SubID, "")
+                       r.addRoute("RIC_CONTROL_ACK", nil, xAppEp, routeTable, subscription.SubID, "")
+                       r.addRoute("RIC_CONTROL_FAILURE", nil, xAppEp, routeTable, subscription.SubID, "")
                }
        }
 }
 
-func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.Endpoint, e2ManEp *rtmgr.Endpoint, ueManEp *rtmgr.Endpoint, rsmEp *rtmgr.Endpoint, a1mediatorEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.Endpoint, e2ManEp *rtmgr.Endpoint, rsmEp *rtmgr.Endpoint, a1mediatorEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
        xapp.Logger.Debug("rpe.generatePlatformRoutes invoked")
        //Platform Routes --- Subscription Routes
        //Subscription Manager -> E2 Termination
@@ -215,8 +215,8 @@ func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.
                        sendEp = subManEp
                case "E2MAN":
                        sendEp = e2ManEp
-               case "UEMAN":
-                       sendEp = ueManEp
+               //case "UEMAN":
+               //      sendEp = ueManEp
                case "RSM":
                        sendEp = rsmEp
                case "A1MEDIATOR":
@@ -227,8 +227,8 @@ func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.
                        Ep = subManEp
                case "E2MAN":
                        Ep = e2ManEp
-               case "UEMAN":
-                       Ep = ueManEp
+               //case "UEMAN":
+               //      Ep = ueManEp
                case "RSM":
                        Ep = rsmEp
                case "A1MEDIATOR":
@@ -248,11 +248,11 @@ func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable
        xapp.Logger.Debug("rpe.generateRouteTable invoked")
        xapp.Logger.Debug("Endpoint List:  %v", endPointList)
        routeTable := &rtmgr.RouteTable{}
-       e2TermEp := getEndpointByName(&endPointList, "E2TERM")
+       /*e2TermEp := getEndpointByName(&endPointList, "E2TERM")
        if e2TermEp == nil {
                xapp.Logger.Error("Platform component not found: %v", "E2 Termination")
                xapp.Logger.Debug("Endpoints: %v", endPointList)
-       }
+       }*/
        subManEp := getEndpointByName(&endPointList, "SUBMAN")
        if subManEp == nil {
                xapp.Logger.Error("Platform component not found: %v", "Subscription Manager")
@@ -263,11 +263,11 @@ func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable
                xapp.Logger.Error("Platform component not found: %v", "E2 Manager")
                xapp.Logger.Debug("Endpoints: %v", endPointList)
        }
-       ueManEp := getEndpointByName(&endPointList, "UEMAN")
+       /*ueManEp := getEndpointByName(&endPointList, "UEMAN")
        if ueManEp == nil {
                xapp.Logger.Error("Platform component not found: %v", "UE Manger")
                xapp.Logger.Debug("Endpoints: %v", endPointList)
-       }
+       }*/
        rsmEp := getEndpointByName(&endPointList, "RSM")
        if rsmEp == nil {
                xapp.Logger.Error("Platform component not found: %v", "Resource Status Manager")
@@ -284,13 +284,13 @@ func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable
                xapp.Logger.Error("Platform component not found: %v", "E2 Termination List")
                xapp.Logger.Debug("Endpoints: %v", endPointList)
        }
-       r.generatePlatformRoutes(e2TermListEp, subManEp, e2ManEp, ueManEp, rsmEp, A1MediatorEp, routeTable)
+       r.generatePlatformRoutes(e2TermListEp, subManEp, e2ManEp, rsmEp, A1MediatorEp, routeTable)
 
        for _, endPoint := range endPointList {
                xapp.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType)
                if endPoint.XAppType != sbi.PlatformType && (len(endPoint.TxMessages) > 0 || len(endPoint.RxMessages) > 0) {
-                       r.generateXappRoutes(endPoint, e2TermEp, subManEp, routeTable)
-                       r.generateSubscriptionRoutes(endPoint, e2TermEp, subManEp, routeTable)
+                       r.generateXappRoutes(endPoint, subManEp, routeTable)
+                       r.generateSubscriptionRoutes(endPoint, subManEp, routeTable)
                }
        }
        return routeTable
index 91a8aeb..b9c4cd6 100644 (file)
@@ -105,8 +105,8 @@ type RicComponents struct {
 
 type Subscription struct {
        SubID int32
-       Fqdn  string
-       Port  uint16
+       Fqdn string
+       Port uint16
 }
 
 type PlatformRoutes []struct {
@@ -121,4 +121,12 @@ type RtmgrRoutes struct {
        Prs PlatformRoutes      `json:"PlatformRoutes"`
 }
 
+type FqDn struct {
+       Address *string
+        Port *uint16
+}
 
+type XappList struct {
+        SubscriptionID  uint16
+       FqdnList []FqDn
+}
diff --git a/pkg/sbi/control.go b/pkg/sbi/control.go
new file mode 100644 (file)
index 0000000..67d8eca
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+   platform project (RICP).
+
+==================================================================================
+*/
+package sbi
+
+import "C"
+
+import (
+        "errors"
+        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+        "strconv"
+)
+
+
+func NewControl() Control {
+
+        return Control{make(chan *xapp.RMRParams)}
+}
+
+
+type Control struct {
+        rcChan      chan *xapp.RMRParams
+}
+
+
+func (c *Control) Run() {
+        go c.controlLoop()
+        xapp.Run(c)
+}
+
+func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
+        c.rcChan <- rp
+        return
+}
+
+func (c *Control) controlLoop() {
+        for {
+                msg := <-c.rcChan
+                switch msg.Mtype {
+                case xapp.RICMessageTypes["RIC_SUB_REQ"]:
+                       xapp.Logger.Info("Message handling when RMR instance queries for Routes")
+                default:
+                        err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded")
+                        xapp.Logger.Error("Unknown message type: %v", err)
+                }
+        }
+}
+
index ac61c0b..1404319 100644 (file)
 
 package sbi
 
+/*
+#include <time.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <rmr/rmr.h>
+#include <rmr/RIC_message_types.h>
+
+
+#cgo CFLAGS: -I../
+#cgo LDFLAGS: -lrmr_nng -lnng
+*/
+import "C"
+
 import (
        "errors"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
@@ -36,12 +50,14 @@ import (
        _ "nanomsg.org/go/mangos/v2/transport/all"
        "routing-manager/pkg/rtmgr"
        "strconv"
-       "time"
 )
 
+
+
 type NngPush struct {
        Sbi
        NewSocket CreateNewNngSocketHandler
+        rcChan      chan *xapp.RMRParams
 }
 
 func NewNngPush() *NngPush {
@@ -130,40 +146,14 @@ func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
        return nil
 }
 
-/*
 func (c *NngPush) DistributeAll(policies *[]string) error {
        xapp.Logger.Debug("Invoked: sbi.DistributeAll")
        xapp.Logger.Debug("args: %v", *policies)
        for _, ep := range rtmgr.Eps {
-                       if ep.IsReady {
-                               go c.send(ep, policies)
-                       } else {
-                               xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready")
-                       }
-               }
-       }
-       return nil
-}
-
-*/
-
-/*
-       Temporary solution for R3 - E2M -> E2T issue
-*/
-func (c *NngPush) DistributeAll(policies *[]string) error {
-       xapp.Logger.Debug("Invoked: sbi.DistributeAll")
-       xapp.Logger.Debug("args: %v", *policies)
-       for _, ep := range rtmgr.Eps {
-               i := 1
-               for i< 5 {
-                       if ep.IsReady {
-                               go c.send(ep, policies)
-                               break
-                       } else {
-                               xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready" + " Retry count " + strconv.Itoa(i))
-                               time.Sleep(10 * time.Millisecond)
-                               i++
-                       }
+               if ep.IsReady {
+                       go c.send(ep, policies)
+               } else {
+                       xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready")
                }
        }
        return nil