Fixed integration and subscription routes related issues for R3
[ric-plt/rtmgr.git] / pkg / nbi / httprestful.go
index 035501a..8ceca1d 100644 (file)
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
+
+
+   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+   platform project (RICP).
+
 ==================================================================================
 */
 /*
 ==================================================================================
 */
 /*
 
 package nbi
 
 
 package nbi
 
+//noinspection GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference
 import (
 import (
+       "encoding/json"
+       "errors"
        "fmt"
        "fmt"
-       "os"
-       "time"
+       "github.com/go-openapi/loads"
+       "github.com/go-openapi/runtime/middleware"
        "net/url"
        "net/url"
-       "strconv"
-       "errors"
-       "encoding/json"
-       "routing-manager/pkg/rtmgr"
-       "routing-manager/pkg/rpe"
-       "routing-manager/pkg/sdl"
+       "os"
        "routing-manager/pkg/models"
        "routing-manager/pkg/restapi"
        "routing-manager/pkg/restapi/operations"
        "routing-manager/pkg/models"
        "routing-manager/pkg/restapi"
        "routing-manager/pkg/restapi/operations"
-       "github.com/go-openapi/runtime/middleware"
        "routing-manager/pkg/restapi/operations/handle"
        "routing-manager/pkg/restapi/operations/handle"
-       loads "github.com/go-openapi/loads"
+       "routing-manager/pkg/rpe"
+       "routing-manager/pkg/rtmgr"
+       "routing-manager/pkg/sdl"
+       "strconv"
+       "time"
 )
 
 //var myClient = &http.Client{Timeout: 1 * time.Second}
 
 type HttpRestful struct {
 )
 
 //var myClient = &http.Client{Timeout: 1 * time.Second}
 
 type HttpRestful struct {
-       NbiEngine
+       Engine
        LaunchRest                   LaunchRestHandler
        RecvXappCallbackData         RecvXappCallbackDataHandler
        ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
        LaunchRest                   LaunchRestHandler
        RecvXappCallbackData         RecvXappCallbackDataHandler
        ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
@@ -76,8 +82,8 @@ func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr
                xappData = <-dataChannel
        }
        if nil != xappData {
                xappData = <-dataChannel
        }
        if nil != xappData {
-                var xapps []rtmgr.XApp
-                err := json.Unmarshal([]byte(xappData.XApps), &xapps)
+               var xapps []rtmgr.XApp
+               err := json.Unmarshal([]byte(xappData.XApps), &xapps)
                return &xapps, err
        } else {
                rtmgr.Logger.Info("No data")
                return &xapps, err
        } else {
                rtmgr.Logger.Info("No data")
@@ -85,17 +91,16 @@ func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr
 
        rtmgr.Logger.Debug("Nothing received on the Http interface")
        return nil, nil
 
        rtmgr.Logger.Debug("Nothing received on the Http interface")
        return nil, nil
-
 }
 
 func validateXappCallbackData(callbackData *models.XappCallbackData) error {
        if len(callbackData.XApps) == 0 {
 }
 
 func validateXappCallbackData(callbackData *models.XappCallbackData) error {
        if len(callbackData.XApps) == 0 {
-               return fmt.Errorf("Invalid Data field: \"%s\"", callbackData.XApps)
+               return fmt.Errorf("invalid Data field: \"%s\"", callbackData.XApps)
        }
        var xapps []rtmgr.XApp
        }
        var xapps []rtmgr.XApp
-        err := json.Unmarshal([]byte(callbackData.XApps), &xapps)
-        if err != nil {
-               return fmt.Errorf("Unmarshal failed: \"%s\"", err.Error())
+       err := json.Unmarshal([]byte(callbackData.XApps), &xapps)
+       if err != nil {
+               return fmt.Errorf("unmarshal failed: \"%s\"", err.Error())
        }
        return nil
 }
        }
        return nil
 }
@@ -106,10 +111,10 @@ func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *
        }
        err := validateXappCallbackData(data)
        if err != nil {
        }
        err := validateXappCallbackData(data)
        if err != nil {
-               rtmgr.Logger.Debug("XApp callback data validation failed: "+err.Error())
+               rtmgr.Logger.Warn("XApp callback data validation failed: " + err.Error())
                return err
        } else {
                return err
        } else {
-               datach<-data
+               datach <- data
                return nil
        }
 }
                return nil
        }
 }
@@ -126,7 +131,7 @@ func validateXappSubscriptionData(data *models.XappSubscriptionData) error {
 }
 
 func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
 }
 
 func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
-                                       data *models.XappSubscriptionData) error {
+       data *models.XappSubscriptionData) error {
        rtmgr.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
        err := validateXappSubscriptionData(data)
        if err != nil {
        rtmgr.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
        err := validateXappSubscriptionData(data)
        if err != nil {
@@ -139,131 +144,172 @@ func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionDa
        return nil
 }
 
        return nil
 }
 
-func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData) {
-        swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
-        if err != nil {
-                //log.Fatalln(err)
-                rtmgr.Logger.Error(err.Error())
-                os.Exit(1)
-        }
+func subscriptionExists(data *models.XappSubscriptionData) bool {
+       present := false
+       sub := rtmgr.Subscription{SubID: *data.SubscriptionID, Fqdn: *data.Address, Port: *data.Port}
+       for _, elem := range rtmgr.Subs {
+               if elem == sub {
+                       present = true
+                       break
+               }
+       }
+       return present
+}
+
+func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData,
+       data *models.XappSubscriptionData) error {
+       rtmgr.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl")
+       err := validateXappSubscriptionData(data)
+       if err != nil {
+               rtmgr.Logger.Error(err.Error())
+               return err
+       }
+
+       if !subscriptionExists(data) {
+               rtmgr.Logger.Warn("subscription not found: %d", *data.SubscriptionID)
+               err := fmt.Errorf("subscription not found: %d", *data.SubscriptionID)
+               return err
+       }
+
+       subdelchan <- data
+       return nil
+}
+
+func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData,
+       subdelchan chan<- *models.XappSubscriptionData) {
+       swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
+       if err != nil {
+               //log.Fatalln(err)
+               rtmgr.Logger.Error(err.Error())
+               os.Exit(1)
+       }
        nbiUrl, err := url.Parse(*nbiif)
        if err != nil {
                rtmgr.Logger.Error(err.Error())
                os.Exit(1)
        }
        nbiUrl, err := url.Parse(*nbiif)
        if err != nil {
                rtmgr.Logger.Error(err.Error())
                os.Exit(1)
        }
-        api := operations.NewRoutingManagerAPI(swaggerSpec)
-        server := restapi.NewServer(api)
-        defer server.Shutdown()
-
-        server.Port, err = strconv.Atoi(nbiUrl.Port())
-        if err != nil {
-               rtmgr.Logger.Error("Invalid NBI RestAPI port")
-               os.Exit(1)
-        }
-        server.Host = nbiUrl.Hostname()
-        // set handlers
-        api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
-                func(params handle.ProvideXappHandleParams) middleware.Responder {
-                rtmgr.Logger.Info("Data received on Http interface")
-               err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
-               if err != nil {
-                       rtmgr.Logger.Error("Invalid XApp callback data: "+err.Error())
-                       return handle.NewProvideXappHandleBadRequest()
-               } else {
-                       return handle.NewGetHandlesOK()
-               }
-        })
+       api := operations.NewRoutingManagerAPI(swaggerSpec)
+       server := restapi.NewServer(api)
+       defer server.Shutdown()
+
+       server.Port, err = strconv.Atoi(nbiUrl.Port())
+       if err != nil {
+               rtmgr.Logger.Error("Invalid NBI RestAPI port")
+               os.Exit(1)
+       }
+       server.Host = "0.0.0.0"
+       // set handlers
+       api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
+               func(params handle.ProvideXappHandleParams) middleware.Responder {
+                       rtmgr.Logger.Info("Data received on Http interface")
+                       err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
+                       if err != nil {
+                               rtmgr.Logger.Error("Invalid XApp callback data: " + err.Error())
+                               return handle.NewProvideXappHandleBadRequest()
+                       } else {
+                               return handle.NewGetHandlesOK()
+                       }
+               })
        api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
                func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
                        err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
                        if err != nil {
                                return handle.NewProvideXappSubscriptionHandleBadRequest()
                        } else {
        api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
                func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
                        err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
                        if err != nil {
                                return handle.NewProvideXappSubscriptionHandleBadRequest()
                        } else {
+                               //Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
+                               time.Sleep(1 * time.Second)
                                return handle.NewGetHandlesOK()
                        }
                })
                                return handle.NewGetHandlesOK()
                        }
                })
-        // start to serve API
-        rtmgr.Logger.Info("Starting the HTTP Rest service")
-        if err := server.Serve(); err != nil {
-                rtmgr.Logger.Error(err.Error())
-        }
-}
-
-func httpGetXapps(xmurl string) (*[]rtmgr.XApp, error) {
-        rtmgr.Logger.Info("Invoked httpgetter.fetchXappList: " + xmurl)
-        r, err := myClient.Get(xmurl)
-        if err != nil {
-                return nil, err
-        }
-        defer r.Body.Close()
-
-        if r.StatusCode == 200 {
-                rtmgr.Logger.Debug("http client raw response: %v", r)
-                var xapps []rtmgr.XApp
-                err = json.NewDecoder(r.Body).Decode(&xapps)
-                if err != nil {
-                        rtmgr.Logger.Warn("Json decode failed: " + err.Error())
-                }
-                rtmgr.Logger.Info("HTTP GET: OK")
-                rtmgr.Logger.Debug("httpgetter.fetchXappList returns: %v", xapps)
-                return &xapps, err
-        }
-        rtmgr.Logger.Warn("httpgetter got an unexpected http status code: %v", r.StatusCode)
-        return nil, nil
+       api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc(
+               func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder {
+                       err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData)
+                       if err != nil {
+                               return handle.NewDeleteXappSubscriptionHandleNoContent()
+                       } else {
+                               //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
+                               time.Sleep(1 * time.Second)
+                               return handle.NewGetHandlesOK()
+                       }
+               })
+       // start to serve API
+       rtmgr.Logger.Info("Starting the HTTP Rest service")
+       if err := server.Serve(); err != nil {
+               rtmgr.Logger.Error(err.Error())
+       }
 }
 
 }
 
-func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.SdlEngine) error {
-        var readErr error
-        var maxRetries = 10
-
-                for i := 1; i <= maxRetries; i++ {
-                        time.Sleep(2 * time.Second)
-
-                        xappData, err := httpGetXapps(xmurl)
+func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) {
+       rtmgr.Logger.Info("Invoked httprestful.httpGetXApps: " + xmurl)
+       r, err := myClient.Get(xmurl)
+       if err != nil {
+               return nil, err
+       }
+       defer r.Body.Close()
 
 
-                        if xappData != nil && err == nil {
-                               pcData, confErr := rtmgr.GetPlatformComponents(configfile)
-                               if confErr != nil {
-                                       rtmgr.Logger.Error(confErr.Error())
-                                       return confErr
-                               }
+       if r.StatusCode == 200 {
+               rtmgr.Logger.Debug("http client raw response: %v", r)
+               var xapps []rtmgr.XApp
+               err = json.NewDecoder(r.Body).Decode(&xapps)
+               if err != nil {
+                       rtmgr.Logger.Warn("Json decode failed: " + err.Error())
+               }
+               rtmgr.Logger.Info("HTTP GET: OK")
+               rtmgr.Logger.Debug("httprestful.httpGetXApps returns: %v", xapps)
+               return &xapps, err
+       }
+       rtmgr.Logger.Warn("httprestful got an unexpected http status code: %v", r.StatusCode)
+       return nil, nil
+}
 
 
-                                rtmgr.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
-                               // Combine the xapps data and platform data before writing to the SDL
-                               ricData := &rtmgr.RicComponents{Xapps: *xappData, Pcs: *pcData}
-
-                                writeErr := sdlEngine.WriteAll(fileName, ricData)
-                                if writeErr != nil {
-                                        rtmgr.Logger.Error(writeErr.Error())
-                                }
-                                // post subscription req to appmgr
-                                readErr = PostSubReq(xmurl, nbiif)
-                                if readErr == nil {
-                                        return nil
-                                }
-                        } else if err == nil {
-                                readErr = errors.New("Unexpected HTTP status code")
-                        } else {
-                                rtmgr.Logger.Warn("cannot get xapp data due to: " + err.Error())
-                                readErr = err
-                        }
-                }
-        return readErr
+func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.Engine) error {
+       var readErr error
+       var maxRetries = 10
+       for i := 1; i <= maxRetries; i++ {
+               time.Sleep(2 * time.Second)
+               xappData, err := httpGetXApps(xmurl)
+               if xappData != nil && err == nil {
+                       pcData, confErr := rtmgr.GetPlatformComponents(configfile)
+                       if confErr != nil {
+                               rtmgr.Logger.Error(confErr.Error())
+                               return confErr
+                       }
+                       rtmgr.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
+                       // Combine the xapps data and platform data before writing to the SDL
+                       ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData}
+                       writeErr := sdlEngine.WriteAll(fileName, ricData)
+                       if writeErr != nil {
+                               rtmgr.Logger.Error(writeErr.Error())
+                       }
+                       // post subscription req to appmgr
+                       readErr = PostSubReq(xmurl, nbiif)
+                       if readErr == nil {
+                               return nil
+                       }
+               } else if err == nil {
+                       readErr = errors.New("unexpected HTTP status code")
+               } else {
+                       rtmgr.Logger.Warn("cannot get xapp data due to: " + err.Error())
+                       readErr = err
+               }
+       }
+       return readErr
 }
 
 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
 }
 
 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
-                                sdlEngine sdl.SdlEngine, rpeEngine rpe.RpeEngine, triggerSBI chan<- bool) error {
+       sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool) error {
        err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
        if err != nil {
        err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
        if err != nil {
-               rtmgr.Logger.Error("Exiting as nbi failed to get the intial startup data from the xapp manager: " + err.Error())
+               rtmgr.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error())
                return err
        }
 
        datach := make(chan *models.XappCallbackData, 10)
        subschan := make(chan *models.XappSubscriptionData, 10)
                return err
        }
 
        datach := make(chan *models.XappCallbackData, 10)
        subschan := make(chan *models.XappSubscriptionData, 10)
+       subdelchan := make(chan *models.XappSubscriptionData, 10)
        rtmgr.Logger.Info("Launching Rest Http service")
        go func() {
        rtmgr.Logger.Info("Launching Rest Http service")
        go func() {
-               r.LaunchRest(&nbiif, datach, subschan)
+               r.LaunchRest(&nbiif, datach, subschan, subdelchan)
        }()
 
        go func() {
        }()
 
        go func() {
@@ -272,11 +318,16 @@ func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, co
                        if err != nil {
                                rtmgr.Logger.Error("cannot get data from rest api dute to: " + err.Error())
                        } else if data != nil {
                        if err != nil {
                                rtmgr.Logger.Error("cannot get data from rest api dute to: " + err.Error())
                        } else if data != nil {
-                               sdlEngine.WriteXapps(fileName, data)
-                               triggerSBI <- true
+                               rtmgr.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
+                               alldata, err1 := httpGetXApps(xmurl)
+                               if alldata != nil && err1 == nil {
+                                       sdlEngine.WriteXApps(fileName, alldata)
+                                       triggerSBI <- true
+                               }
                        }
                }
        }()
                        }
                }
        }()
+
        go func() {
                for {
                        data := <-subschan
        go func() {
                for {
                        data := <-subschan
@@ -286,6 +337,15 @@ func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, co
                }
        }()
 
                }
        }()
 
+       go func() {
+               for {
+                       data := <-subdelchan
+                       rtmgr.Logger.Debug("received XApp subscription delete data")
+                       delSubscription(&rtmgr.Subs, data)
+                       triggerSBI <- true
+               }
+       }()
+
        return nil
 }
 
        return nil
 }
 
@@ -294,8 +354,8 @@ func (r *HttpRestful) Terminate() error {
 }
 
 func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
 }
 
 func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
-       var b bool = false
-       sub := rtmgr.Subscription{SubID:*xappSubData.SubscriptionID, Fqdn:*xappSubData.Address, Port:*xappSubData.Port,}
+       var b = false
+       sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
        for _, elem := range *subs {
                if elem == sub {
                        rtmgr.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem)
        for _, elem := range *subs {
                if elem == sub {
                        rtmgr.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem)
@@ -308,3 +368,22 @@ func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubsc
        return b
 }
 
        return b
 }
 
+func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
+       rtmgr.Logger.Debug("Deleteing the subscription from the subscriptions list")
+       var present = false
+       sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
+       for i, elem := range *subs {
+               if elem == sub {
+                       present = true
+                       // Since the order of the list is not important, we are swapping the last element
+                       // with the matching element and replacing the list with list(n-1) elements.
+                       (*subs)[len(*subs)-1], (*subs)[i] = (*subs)[i], (*subs)[len(*subs)-1]
+                       *subs = (*subs)[:len(*subs)-1]
+                       break
+               }
+       }
+       if present == false {
+               rtmgr.Logger.Warn("rtmgr.delSubscription: Subscription = %v, not present in the existing subscriptions", xappSubData)
+       }
+       return present
+}