Further enhancements
[ric-plt/xapp-frame.git] / pkg / xapp / subscription.go
index 1cdce82..a7e7a3c 100755 (executable)
 package xapp
 
 import (
+       "bytes"
+       "encoding/json"
+       "fmt"
        "github.com/go-openapi/loads"
        httptransport "github.com/go-openapi/runtime/client"
        "github.com/go-openapi/runtime/middleware"
        "github.com/go-openapi/strfmt"
+       "github.com/spf13/viper"
+       "io/ioutil"
+       "net/http"
+       "os"
        "time"
 
        apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
-       apicontrol "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/control"
-       apipolicy "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/policy"
-       apireport "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/report"
+       apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
        apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
 
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
-       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/control"
-       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/policy"
-       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/report"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
 )
 
-type SubscriptionReportHandler func(models.SubscriptionType, interface{}) (models.SubscriptionResult, error)
+type SubscriptionHandler func(interface{}) (*models.SubscriptionResponse, error)
+type SubscriptionQueryHandler func() (models.SubscriptionList, error)
+type SubscriptionDeleteHandler func(string) error
+type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
 
 type Subscriber struct {
        localAddr  string
@@ -49,29 +55,51 @@ type Subscriber struct {
        remoteUrl  string
        remoteProt []string
        timeout    time.Duration
+       clientUrl  string
+       clientCB   SubscriptionResponseCallback
 }
 
 func NewSubscriber(host string, timo int) *Subscriber {
        if host == "" {
-               host = "service-ricplt-submgr-http:8088"
+               pltnamespace := os.Getenv("PLT_NAMESPACE")
+               if pltnamespace == "" {
+                       pltnamespace = "ricplt"
+               }
+               host = fmt.Sprintf("service-%s-submgr-http.%s:8088", pltnamespace, pltnamespace)
        }
 
        if timo == 0 {
                timo = 20
        }
 
-       return &Subscriber{
+       r := &Subscriber{
                remoteHost: host,
                remoteUrl:  "/ric/v1",
                remoteProt: []string{"http"},
                timeout:    time.Duration(timo) * time.Second,
                localAddr:  "0.0.0.0",
                localPort:  8088,
+               clientUrl:  "/ric/v1/subscriptions/response",
+       }
+       Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
+
+       return r
+}
+
+func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
+       if req.Body != nil {
+               var resp apimodel.SubscriptionResponse
+               if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
+                       if r.clientCB != nil {
+                               r.clientCB(&resp)
+                       }
+               }
+               req.Body.Close()
        }
 }
 
 // Server interface: listen and receive subscription requests
-func (r *Subscriber) Listen(handler SubscriptionReportHandler) error {
+func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
        swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
        if err != nil {
                return err
@@ -79,31 +107,31 @@ func (r *Subscriber) Listen(handler SubscriptionReportHandler) error {
 
        api := operations.NewXappFrameworkAPI(swaggerSpec)
 
-       // SubscriptionType: Report
-       api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc(
-               func(p report.SubscribeReportParams) middleware.Responder {
-                       if resp, err := handler(models.SubscriptionTypeReport, p.ReportParams); err == nil {
-                               return report.NewSubscribeReportCreated().WithPayload(resp)
+       // Subscription: Query
+       api.CommonGetAllSubscriptionsHandler = common.GetAllSubscriptionsHandlerFunc(
+               func(p common.GetAllSubscriptionsParams) middleware.Responder {
+                       if resp, err := getSubscription(); err == nil {
+                               return common.NewGetAllSubscriptionsOK().WithPayload(resp)
                        }
-                       return report.NewSubscribeReportInternalServerError()
+                       return common.NewGetAllSubscriptionsInternalServerError()
                })
 
-       // SubscriptionType: Control
-       api.ControlSubscribeControlHandler = control.SubscribeControlHandlerFunc(
-               func(p control.SubscribeControlParams) middleware.Responder {
-                       if resp, err := handler(models.SubscriptionTypeControl, p.ControlParams); err == nil {
-                               return control.NewSubscribeControlCreated().WithPayload(resp)
+       // Subscription: Subscribe
+       api.CommonSubscribeHandler = common.SubscribeHandlerFunc(
+               func(params common.SubscribeParams) middleware.Responder {
+                       if resp, err := createSubscription(params.SubscriptionParams); err == nil {
+                               return common.NewSubscribeCreated().WithPayload(resp)
                        }
-                       return control.NewSubscribeControlInternalServerError()
+                       return common.NewSubscribeInternalServerError()
                })
 
-       // SubscriptionType: policy
-       api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc(
-               func(p policy.SubscribePolicyParams) middleware.Responder {
-                       if resp, err := handler(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
-                               return policy.NewSubscribePolicyCreated().WithPayload(resp)
+       // Subscription: Unsubscribe
+       api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
+               func(p common.UnsubscribeParams) middleware.Responder {
+                       if err := delSubscription(p.SubscriptionID); err == nil {
+                               return common.NewUnsubscribeNoContent()
                        }
-                       return policy.NewSubscribePolicyInternalServerError()
+                       return common.NewUnsubscribeInternalServerError()
                })
 
        server := restapi.NewServer(api)
@@ -118,39 +146,120 @@ func (r *Subscriber) Listen(handler SubscriptionReportHandler) error {
        return nil
 }
 
-// Subscription interface for xApp: REPORT
-func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (apimodel.SubscriptionResult, error) {
-       params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p)
-       result, err := r.CreateTransport().Report.SubscribeReport(params)
+// Server interface: send notification to client
+func (r *Subscriber) Notify(resp *models.SubscriptionResponse, ep models.SubscriptionParamsClientEndpoint) (err error) {
+       respData, err := json.Marshal(resp)
        if err != nil {
-               return apimodel.SubscriptionResult{}, err
+               Logger.Error("json.Marshal failed: %v", err)
+               return err
        }
 
-       return result.Payload, err
+       clientUrl := fmt.Sprintf("http://%s:%d%s", ep.Host, *ep.HTTPPort, r.clientUrl)
+
+       retries := viper.GetInt("subscription.retryCount")
+       if retries == 0 {
+               retries = 10
+       }
+
+       delay := viper.GetInt("subscription.retryDelay")
+       if delay == 0 {
+               delay = 5
+       }
+
+       for i := 0; i < retries; i++ {
+               r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
+               if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
+                       break
+               }
+
+               if err != nil {
+                       Logger.Error("%v", err)
+               }
+               if r != nil && r.StatusCode != http.StatusOK {
+                       Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
+               }
+               time.Sleep(time.Duration(delay) * time.Second)
+       }
+
+       return err
 }
 
-// Subscription interface for xApp: CONTROL
-func (r *Subscriber) SubscribeControl(p *apimodel.ControlParams) (apimodel.SubscriptionResult, error) {
-       params := apicontrol.NewSubscribeControlParamsWithTimeout(r.timeout).WithControlParams(p)
-       result, err := r.CreateTransport().Control.SubscribeControl(params)
+// Subscription interface for xApp: Response callback
+func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
+       r.clientCB = c
+}
+
+// Subscription interface for xApp
+func (r *Subscriber) Subscribe(p *apimodel.SubscriptionParams) (*apimodel.SubscriptionResponse, error) {
+       params := apicommon.NewSubscribeParamsWithTimeout(r.timeout).WithSubscriptionParams(p)
+       result, err := r.CreateTransport().Common.Subscribe(params)
        if err != nil {
-               return apimodel.SubscriptionResult{}, err
+               return &apimodel.SubscriptionResponse{}, err
        }
 
        return result.Payload, err
 }
 
-// Subscription interface for xApp: POLICY
-func (r *Subscriber) SubscribePolicy(p *apimodel.PolicyParams) (apimodel.SubscriptionResult, error) {
-       params := apipolicy.NewSubscribePolicyParamsWithTimeout(r.timeout).WithPolicyParams(p)
-       result, err := r.CreateTransport().Policy.SubscribePolicy(params)
+// Subscription interface for xApp: DELETE
+func (r *Subscriber) Unsubscribe(subId string) error {
+       params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
+       _, err := r.CreateTransport().Common.Unsubscribe(params)
+
+       return err
+}
+
+// Subscription interface for xApp: QUERY
+func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
+       resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
        if err != nil {
-               return apimodel.SubscriptionResult{}, err
+               return models.SubscriptionList{}, err
        }
 
-       return result.Payload, err
+       defer resp.Body.Close()
+
+       contents, err := ioutil.ReadAll(resp.Body)
+       if err != nil {
+               return models.SubscriptionList{}, err
+       }
+
+       subscriptions := models.SubscriptionList{}
+       err = json.Unmarshal([]byte(string(contents)), &subscriptions)
+       if err != nil {
+               return models.SubscriptionList{}, err
+       }
+
+       return subscriptions, nil
 }
 
-func (s *Subscriber) CreateTransport() *apiclient.RICSubscription {
-       return apiclient.New(httptransport.New(s.remoteHost, s.remoteUrl, s.remoteProt), strfmt.Default)
+func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
+       return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)
 }
+
+/*func (r *Subscriber) getXappConfig() (appconfig models.XappConfigList, err error) {
+
+    Logger.Error("Inside getXappConfig")
+
+               var metadata models.ConfigMetadata
+        var xappconfig models.XAppConfig
+        name := viper.GetString("name")
+        configtype := "json"
+               metadata.XappName = &name
+               metadata.ConfigType = &configtype
+
+        configFile, err := os.Open("/opt/ric/config/config-file.json")
+        if err != nil {
+                Logger.Error("Cannot open config file: %v", err)
+                return nil,errors.New("Could Not parse the config file")
+        }
+
+        body, err := ioutil.ReadAll(configFile)
+
+        defer configFile.Close()
+
+               xappconfig.Metadata = &metadata
+               xappconfig.Config = body
+
+        appconfig = append(appconfig,&xappconfig)
+
+               return appconfig,nil
+}*/