Making Route Distribution Synchronous
[ric-plt/rtmgr.git] / pkg / nbi / httprestful.go
index 98209e7..f49f3ae 100644 (file)
@@ -56,39 +56,20 @@ import (
        "time"
 )
 
-//var myClient = &http.Client{Timeout: 1 * time.Second}
-
 type HttpRestful struct {
        Engine
        LaunchRest                   LaunchRestHandler
-       RecvXappCallbackData         RecvXappCallbackDataHandler
-       RecvNewE2Tdata               RecvNewE2TdataHandler
-       ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
        RetrieveStartupData          RetrieveStartupDataHandler
 }
 
 func NewHttpRestful() *HttpRestful {
        instance := new(HttpRestful)
        instance.LaunchRest = launchRest
-       instance.RecvXappCallbackData = recvXappCallbackData
-       instance.RecvNewE2Tdata = recvNewE2Tdata
-       instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
        instance.RetrieveStartupData = retrieveStartupData
        return instance
 }
 
-// ToDo: Use Range over channel. Read and return only the latest one.
-func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
-       var xappData *models.XappCallbackData
-       // Drain the channel as we are only looking for the latest value until
-       // xapp manager sends all xapp data with every request.
-       length := len(dataChannel)
-       //xapp.Logger.Info(length)
-       for i := 0; i <= length; i++ {
-               xapp.Logger.Info("data received")
-               // If no data received from the REST, it blocks.
-               xappData = <-dataChannel
-       }
+func recvXappCallbackData(xappData *models.XappCallbackData) (*[]rtmgr.XApp, error) {
        if nil != xappData {
                var xapps []rtmgr.XApp
                err := json.Unmarshal([]byte(xappData.XApps), &xapps)
@@ -101,13 +82,10 @@ func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr
        return nil, nil
 }
 
-func recvNewE2Tdata(dataChannel <-chan *models.E2tData) (*rtmgr.E2TInstance, string, error) {
-       var e2tData *models.E2tData
+func recvNewE2Tdata(e2tData *models.E2tData) (*rtmgr.E2TInstance, string, error) {
        var str string
        xapp.Logger.Info("data received")
 
-       e2tData = <-dataChannel
-
        if nil != e2tData {
 
                e2tinst := rtmgr.E2TInstance{
@@ -146,7 +124,7 @@ func validateXappCallbackData(callbackData *models.XappCallbackData) error {
        return nil
 }
 
-func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
+func provideXappHandleHandlerImpl(data *models.XappCallbackData) error {
        if data != nil {
                xapp.Logger.Debug("Received callback data")
        }
@@ -155,7 +133,20 @@ func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *
                xapp.Logger.Warn("XApp callback data validation failed: " + err.Error())
                return err
        } else {
-               datach <- data
+               appdata, err := recvXappCallbackData(data)
+                if err != nil {
+                       xapp.Logger.Error("cannot get data from rest api dute to: " + err.Error())
+                } else if appdata != nil {
+                        xapp.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
+                        alldata, err1 := httpGetXApps(xapp.Config.GetString("xmurl"))
+                        if alldata != nil && err1 == nil {
+                               m.Lock()
+                                sdlEngine.WriteXApps(xapp.Config.GetString("rtfile"), alldata)
+                                m.Unlock()
+                               return sendRoutesToAll()
+                        }
+                }
+
                return nil
        }
 }
@@ -223,18 +214,17 @@ func checkValidaE2TAddress(e2taddress string) bool {
 
 }
 
-func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
-       data *models.XappSubscriptionData) error {
+func provideXappSubscriptionHandleImpl(data *models.XappSubscriptionData) error {
        xapp.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
        err := validateXappSubscriptionData(data)
        if err != nil {
                xapp.Logger.Error(err.Error())
                return err
        }
-       subchan <- data
-       //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
+        xapp.Logger.Debug("received XApp subscription data")
+        addSubscription(&rtmgr.Subs, data)
        xapp.Logger.Debug("Endpoints: %v", rtmgr.Eps)
-       return nil
+       return sendRoutesToAll()
 }
 
 func subscriptionExists(data *models.XappSubscriptionData) bool {
@@ -249,8 +239,7 @@ func subscriptionExists(data *models.XappSubscriptionData) bool {
        return present
 }
 
-func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData,
-       data *models.XappSubscriptionData) error {
+func deleteXappSubscriptionHandleImpl(data *models.XappSubscriptionData) error {
        xapp.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl")
        err := validateXappSubscriptionData(data)
        if err != nil {
@@ -264,11 +253,13 @@ func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscription
                return err
        }
 
-       subdelchan <- data
-       return nil
+        xapp.Logger.Debug("received XApp subscription delete data")
+        delSubscription(&rtmgr.Subs, data)
+       return sendRoutesToAll()
+
 }
 
-func updateXappSubscriptionHandleImpl(subupdatechan chan<- *rtmgr.XappList, data *models.XappList, subid uint16) error {
+func updateXappSubscriptionHandleImpl(data *models.XappList, subid uint16) error {
        xapp.Logger.Debug("Invoked updateXappSubscriptionHandleImpl")
 
        var fqdnlist []rtmgr.FqDn
@@ -289,20 +280,26 @@ func updateXappSubscriptionHandleImpl(subupdatechan chan<- *rtmgr.XappList, data
                        return err
                }
        }
-       subupdatechan <- &xapplist
-       return nil
+        xapp.Logger.Debug("received XApp subscription Merge data")
+        updateSubscription(&xapplist)
+       return sendRoutesToAll()
 }
 
-func createNewE2tHandleHandlerImpl(e2taddchan chan<- *models.E2tData,
-       data *models.E2tData) error {
+func createNewE2tHandleHandlerImpl(data *models.E2tData) error {
        xapp.Logger.Debug("Invoked createNewE2tHandleHandlerImpl")
        err := validateE2tData(data)
        if err != nil {
                xapp.Logger.Error(err.Error())
                return err
        }
-       e2taddchan <- data
-       return nil
+       //e2taddchan <- data
+       e2data, meiddata, _ := recvNewE2Tdata(data)
+        xapp.Logger.Debug("received create New E2T data")
+        m.Lock()
+        sdlEngine.WriteNewE2TInstance(xapp.Config.GetString("rtfile"), e2data, meiddata)
+        m.Unlock()
+        return sendRoutesToAll()
+
 }
 
 func validateE2TAddressRANListData(assRanE2tData models.RanE2tMap) error {
@@ -323,32 +320,37 @@ func validateE2TAddressRANListData(assRanE2tData models.RanE2tMap) error {
        return nil
 }
 
-func associateRanToE2THandlerImpl(assranchan chan<- models.RanE2tMap,
-       data models.RanE2tMap) error {
+func associateRanToE2THandlerImpl(data models.RanE2tMap) error {
        xapp.Logger.Debug("Invoked associateRanToE2THandlerImpl")
        err := validateE2TAddressRANListData(data)
        if err != nil {
                xapp.Logger.Warn(" Association of RAN to E2T Instance data validation failed: " + err.Error())
                return err
        }
-       assranchan <- data
-       return nil
+        xapp.Logger.Debug("received associate RAN list to E2T instance mapping from E2 Manager")
+        m.Lock()
+        sdlEngine.WriteAssRANToE2TInstance(xapp.Config.GetString("rtfile"), data)
+        m.Unlock()
+       return sendRoutesToAll()
+
 }
 
-func disassociateRanToE2THandlerImpl(disassranchan chan<- models.RanE2tMap,
-       data models.RanE2tMap) error {
+func disassociateRanToE2THandlerImpl(data models.RanE2tMap) error {
        xapp.Logger.Debug("Invoked disassociateRanToE2THandlerImpl")
        err := validateE2TAddressRANListData(data)
        if err != nil {
                xapp.Logger.Warn(" Disassociation of RAN List from E2T Instance data validation failed: " + err.Error())
                return err
        }
-       disassranchan <- data
-       return nil
+        xapp.Logger.Debug("received disassociate RANs from E2T instance")
+        m.Lock()
+        sdlEngine.WriteDisAssRANFromE2TInstance(xapp.Config.GetString("rtfile"), data)
+        m.Unlock()
+       return sendRoutesToAll()
+
 }
 
-func deleteE2tHandleHandlerImpl(e2tdelchan chan<- *models.E2tDeleteData,
-       data *models.E2tDeleteData) error {
+func deleteE2tHandleHandlerImpl(data *models.E2tDeleteData) error {
        xapp.Logger.Debug("Invoked deleteE2tHandleHandlerImpl")
 
        err := validateDeleteE2tData(data)
@@ -356,9 +358,11 @@ func deleteE2tHandleHandlerImpl(e2tdelchan chan<- *models.E2tDeleteData,
                xapp.Logger.Error(err.Error())
                return err
        }
+        m.Lock()
+        sdlEngine.WriteDeleteE2TInstance(xapp.Config.GetString("rtfile"), data)
+        m.Unlock()
+       return sendRoutesToAll()
 
-       e2tdelchan <- data
-       return nil
 }
 
 func dumpDebugData() (models.Debuginfo, error) {
@@ -378,8 +382,7 @@ func dumpDebugData() (models.Debuginfo, error) {
        return response, err
 }
 
-func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData, subupdatechan chan<- *rtmgr.XappList,
-       subdelchan chan<- *models.XappSubscriptionData, e2taddchan chan<- *models.E2tData, assranchan chan<- models.RanE2tMap, disassranchan chan<- models.RanE2tMap, e2tdelchan chan<- *models.E2tDeleteData) {
+func launchRest(nbiif *string){
        swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
        if err != nil {
                //log.Fatalln(err)
@@ -405,7 +408,7 @@ func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan c
        api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
                func(params handle.ProvideXappHandleParams) middleware.Responder {
                        xapp.Logger.Info("Data received on Http interface")
-                       err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
+                       err := provideXappHandleHandlerImpl(params.XappCallbackData)
                        if err != nil {
                                xapp.Logger.Error("Invalid XApp callback data: " + err.Error())
                                return handle.NewProvideXappHandleBadRequest()
@@ -415,77 +418,67 @@ func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan c
                })
        api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
                func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
-                       err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
+                       err := provideXappSubscriptionHandleImpl(params.XappSubscriptionData)
                        if err != nil {
                                return handle.NewProvideXappSubscriptionHandleBadRequest()
                        } else {
-                               //Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
-                               time.Sleep(1 * time.Second)
                                return handle.NewGetHandlesOK()
                        }
                })
        api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc(
                func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder {
-                       err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData)
+                       err := deleteXappSubscriptionHandleImpl(params.XappSubscriptionData)
                        if err != nil {
                                return handle.NewDeleteXappSubscriptionHandleNoContent()
                        } else {
-                               //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
-                               time.Sleep(1 * time.Second)
                                return handle.NewGetHandlesOK()
                        }
                })
        api.HandleUpdateXappSubscriptionHandleHandler = handle.UpdateXappSubscriptionHandleHandlerFunc(
                func(params handle.UpdateXappSubscriptionHandleParams) middleware.Responder {
-                       err := updateXappSubscriptionHandleImpl(subupdatechan, &params.XappList, params.SubscriptionID)
+                       err := updateXappSubscriptionHandleImpl(&params.XappList, params.SubscriptionID)
                        if err != nil {
                                return handle.NewUpdateXappSubscriptionHandleBadRequest()
                        } else {
-                               //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
-                               time.Sleep(1 * time.Second)
                                return handle.NewUpdateXappSubscriptionHandleCreated()
                        }
                })
        api.HandleCreateNewE2tHandleHandler = handle.CreateNewE2tHandleHandlerFunc(
                func(params handle.CreateNewE2tHandleParams) middleware.Responder {
-                       err := createNewE2tHandleHandlerImpl(e2taddchan, params.E2tData)
+                       err := createNewE2tHandleHandlerImpl(params.E2tData)
                        if err != nil {
                                return handle.NewCreateNewE2tHandleBadRequest()
                        } else {
-                               time.Sleep(1 * time.Second)
                                return handle.NewCreateNewE2tHandleCreated()
                        }
                })
 
        api.HandleAssociateRanToE2tHandleHandler = handle.AssociateRanToE2tHandleHandlerFunc(
                func(params handle.AssociateRanToE2tHandleParams) middleware.Responder {
-                       err := associateRanToE2THandlerImpl(assranchan, params.RanE2tList)
+                       err := associateRanToE2THandlerImpl(params.RanE2tList)
                        if err != nil {
                                return handle.NewAssociateRanToE2tHandleBadRequest()
                        } else {
-                               time.Sleep(1 * time.Second)
                                return handle.NewAssociateRanToE2tHandleCreated()
                        }
                })
 
        api.HandleDissociateRanHandler = handle.DissociateRanHandlerFunc(
                func(params handle.DissociateRanParams) middleware.Responder {
-                       err := disassociateRanToE2THandlerImpl(disassranchan, params.DissociateList)
+                       err := disassociateRanToE2THandlerImpl(params.DissociateList)
                        if err != nil {
                                return handle.NewDissociateRanBadRequest()
                        } else {
-                               time.Sleep(1 * time.Second)
                                return handle.NewDissociateRanCreated()
                        }
                })
 
        api.HandleDeleteE2tHandleHandler = handle.DeleteE2tHandleHandlerFunc(
                func(params handle.DeleteE2tHandleParams) middleware.Responder {
-                       err := deleteE2tHandleHandlerImpl(e2tdelchan, params.E2tData)
+                       err := deleteE2tHandleHandlerImpl(params.E2tData)
                        if err != nil {
                                return handle.NewDeleteE2tHandleBadRequest()
                        } else {
-                               time.Sleep(1 * time.Second)
                                return handle.NewDeleteE2tHandleCreated()
                        }
                })
@@ -670,120 +663,14 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile
 }
 
 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string, e2murl string,
-       sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool, m *sync.Mutex) error {
+       sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) error {
        err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, e2murl, sdlEngine)
        if err != nil {
                xapp.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error())
                return err
        }
-
-       datach := make(chan *models.XappCallbackData, 10)
-       subschan := make(chan *models.XappSubscriptionData, 10)
-       subdelchan := make(chan *models.XappSubscriptionData, 10)
-       subupdatechan := make(chan *rtmgr.XappList, 10)
-       e2taddchan := make(chan *models.E2tData, 10)
-       associateranchan := make(chan models.RanE2tMap, 10)
-       disassociateranchan := make(chan models.RanE2tMap, 10)
-       e2tdelchan := make(chan *models.E2tDeleteData, 10)
-       xapp.Logger.Info("Launching Rest Http service")
-       go func() {
-               r.LaunchRest(&nbiif, datach, subschan, subupdatechan, subdelchan, e2taddchan, associateranchan, disassociateranchan, e2tdelchan)
-       }()
-
-       go func() {
-               for {
-                       data, err := r.RecvXappCallbackData(datach)
-                       if err != nil {
-                               xapp.Logger.Error("cannot get data from rest api dute to: " + err.Error())
-                       } else if data != nil {
-                               xapp.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
-                               alldata, err1 := httpGetXApps(xmurl)
-                               if alldata != nil && err1 == nil {
-                                       m.Lock()
-                                       sdlEngine.WriteXApps(fileName, alldata)
-                                       m.Unlock()
-                                       triggerSBI <- true
-                               }
-                       }
-               }
-       }()
-
-       go func() {
-               for {
-                       data := <-subschan
-                       xapp.Logger.Debug("received XApp subscription data")
-                       addSubscription(&rtmgr.Subs, data)
-                       triggerSBI <- true
-               }
-       }()
-
-       go func() {
-               for {
-                       data := <-subdelchan
-                       xapp.Logger.Debug("received XApp subscription delete data")
-                       delSubscription(&rtmgr.Subs, data)
-                       triggerSBI <- true
-               }
-       }()
-
-       go func() {
-               for {
-                       data := <-subupdatechan
-                       xapp.Logger.Debug("received XApp subscription Merge data")
-                       updateSubscription(data)
-                       triggerSBI <- true
-               }
-       }()
-
-       go func() {
-               for {
-
-                       data, meiddata, _ := r.RecvNewE2Tdata(e2taddchan)
-                       if data != nil {
-                               xapp.Logger.Debug("received create New E2T data")
-                               m.Lock()
-                               sdlEngine.WriteNewE2TInstance(fileName, data, meiddata)
-                               m.Unlock()
-                               triggerSBI <- true
-                       }
-               }
-       }()
-
-       go func() {
-               for {
-                       data := <-associateranchan
-                       xapp.Logger.Debug("received associate RAN list to E2T instance mapping from E2 Manager")
-                       m.Lock()
-                       sdlEngine.WriteAssRANToE2TInstance(fileName, data)
-                       m.Unlock()
-                       triggerSBI <- true
-               }
-       }()
-
        go func() {
-               for {
-
-                       data := <-disassociateranchan
-                       xapp.Logger.Debug("received disassociate RANs from E2T instance")
-                       m.Lock()
-                       sdlEngine.WriteDisAssRANFromE2TInstance(fileName, data)
-                       m.Unlock()
-                       triggerSBI <- true
-               }
-       }()
-
-       go func() {
-               for {
-
-                       data := <-e2tdelchan
-                       xapp.Logger.Debug("received Delete E2T data")
-                       if data != nil {
-                               m.Lock()
-                               sdlEngine.WriteDeleteE2TInstance(fileName, data)
-                               m.Unlock()
-                               triggerSBI <- true
-                       }
-               }
+               r.LaunchRest(&nbiif)
        }()
 
        return nil