X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fxapp%2Fsubscription.go;h=a7e7a3ca5a1f863ad00f492447c2c25cf1eeb385;hb=9ea6c7860300c299b9fe68caaf8aff61b3ec71d2;hp=1cdce827e201d7b70d078a6a7e7dd6a6dd274cef;hpb=5120ec103eead26427b977f4b2456c2ce57b36ac;p=ric-plt%2Fxapp-frame.git diff --git a/pkg/xapp/subscription.go b/pkg/xapp/subscription.go index 1cdce82..a7e7a3c 100755 --- a/pkg/xapp/subscription.go +++ b/pkg/xapp/subscription.go @@ -20,27 +20,33 @@ package xapp import ( + "bytes" + "encoding/json" + "fmt" "github.com/go-openapi/loads" httptransport "github.com/go-openapi/runtime/client" "github.com/go-openapi/runtime/middleware" "github.com/go-openapi/strfmt" + "github.com/spf13/viper" + "io/ioutil" + "net/http" + "os" "time" apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi" - apicontrol "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/control" - apipolicy "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/policy" - apireport "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/report" + apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common" apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations" - "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/control" - "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/policy" - "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/report" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common" ) -type SubscriptionReportHandler func(models.SubscriptionType, interface{}) (models.SubscriptionResult, error) +type SubscriptionHandler func(interface{}) (*models.SubscriptionResponse, error) +type SubscriptionQueryHandler func() (models.SubscriptionList, error) +type SubscriptionDeleteHandler func(string) error +type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse) type Subscriber struct { localAddr string @@ -49,29 +55,51 @@ type Subscriber struct { remoteUrl string remoteProt []string timeout time.Duration + clientUrl string + clientCB SubscriptionResponseCallback } func NewSubscriber(host string, timo int) *Subscriber { if host == "" { - host = "service-ricplt-submgr-http:8088" + pltnamespace := os.Getenv("PLT_NAMESPACE") + if pltnamespace == "" { + pltnamespace = "ricplt" + } + host = fmt.Sprintf("service-%s-submgr-http.%s:8088", pltnamespace, pltnamespace) } if timo == 0 { timo = 20 } - return &Subscriber{ + r := &Subscriber{ remoteHost: host, remoteUrl: "/ric/v1", remoteProt: []string{"http"}, timeout: time.Duration(timo) * time.Second, localAddr: "0.0.0.0", localPort: 8088, + clientUrl: "/ric/v1/subscriptions/response", + } + Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST") + + return r +} + +func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) { + if req.Body != nil { + var resp apimodel.SubscriptionResponse + if err := json.NewDecoder(req.Body).Decode(&resp); err == nil { + if r.clientCB != nil { + r.clientCB(&resp) + } + } + req.Body.Close() } } // Server interface: listen and receive subscription requests -func (r *Subscriber) Listen(handler SubscriptionReportHandler) error { +func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error { swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON) if err != nil { return err @@ -79,31 +107,31 @@ func (r *Subscriber) Listen(handler SubscriptionReportHandler) error { api := operations.NewXappFrameworkAPI(swaggerSpec) - // SubscriptionType: Report - api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc( - func(p report.SubscribeReportParams) middleware.Responder { - if resp, err := handler(models.SubscriptionTypeReport, p.ReportParams); err == nil { - return report.NewSubscribeReportCreated().WithPayload(resp) + // Subscription: Query + api.CommonGetAllSubscriptionsHandler = common.GetAllSubscriptionsHandlerFunc( + func(p common.GetAllSubscriptionsParams) middleware.Responder { + if resp, err := getSubscription(); err == nil { + return common.NewGetAllSubscriptionsOK().WithPayload(resp) } - return report.NewSubscribeReportInternalServerError() + return common.NewGetAllSubscriptionsInternalServerError() }) - // SubscriptionType: Control - api.ControlSubscribeControlHandler = control.SubscribeControlHandlerFunc( - func(p control.SubscribeControlParams) middleware.Responder { - if resp, err := handler(models.SubscriptionTypeControl, p.ControlParams); err == nil { - return control.NewSubscribeControlCreated().WithPayload(resp) + // Subscription: Subscribe + api.CommonSubscribeHandler = common.SubscribeHandlerFunc( + func(params common.SubscribeParams) middleware.Responder { + if resp, err := createSubscription(params.SubscriptionParams); err == nil { + return common.NewSubscribeCreated().WithPayload(resp) } - return control.NewSubscribeControlInternalServerError() + return common.NewSubscribeInternalServerError() }) - // SubscriptionType: policy - api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc( - func(p policy.SubscribePolicyParams) middleware.Responder { - if resp, err := handler(models.SubscriptionTypePolicy, p.PolicyParams); err == nil { - return policy.NewSubscribePolicyCreated().WithPayload(resp) + // Subscription: Unsubscribe + api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc( + func(p common.UnsubscribeParams) middleware.Responder { + if err := delSubscription(p.SubscriptionID); err == nil { + return common.NewUnsubscribeNoContent() } - return policy.NewSubscribePolicyInternalServerError() + return common.NewUnsubscribeInternalServerError() }) server := restapi.NewServer(api) @@ -118,39 +146,120 @@ func (r *Subscriber) Listen(handler SubscriptionReportHandler) error { return nil } -// Subscription interface for xApp: REPORT -func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (apimodel.SubscriptionResult, error) { - params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p) - result, err := r.CreateTransport().Report.SubscribeReport(params) +// Server interface: send notification to client +func (r *Subscriber) Notify(resp *models.SubscriptionResponse, ep models.SubscriptionParamsClientEndpoint) (err error) { + respData, err := json.Marshal(resp) if err != nil { - return apimodel.SubscriptionResult{}, err + Logger.Error("json.Marshal failed: %v", err) + return err } - return result.Payload, err + clientUrl := fmt.Sprintf("http://%s:%d%s", ep.Host, *ep.HTTPPort, r.clientUrl) + + retries := viper.GetInt("subscription.retryCount") + if retries == 0 { + retries = 10 + } + + delay := viper.GetInt("subscription.retryDelay") + if delay == 0 { + delay = 5 + } + + for i := 0; i < retries; i++ { + r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData)) + if err == nil && (r != nil && r.StatusCode == http.StatusOK) { + break + } + + if err != nil { + Logger.Error("%v", err) + } + if r != nil && r.StatusCode != http.StatusOK { + Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode) + } + time.Sleep(time.Duration(delay) * time.Second) + } + + return err } -// Subscription interface for xApp: CONTROL -func (r *Subscriber) SubscribeControl(p *apimodel.ControlParams) (apimodel.SubscriptionResult, error) { - params := apicontrol.NewSubscribeControlParamsWithTimeout(r.timeout).WithControlParams(p) - result, err := r.CreateTransport().Control.SubscribeControl(params) +// Subscription interface for xApp: Response callback +func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) { + r.clientCB = c +} + +// Subscription interface for xApp +func (r *Subscriber) Subscribe(p *apimodel.SubscriptionParams) (*apimodel.SubscriptionResponse, error) { + params := apicommon.NewSubscribeParamsWithTimeout(r.timeout).WithSubscriptionParams(p) + result, err := r.CreateTransport().Common.Subscribe(params) if err != nil { - return apimodel.SubscriptionResult{}, err + return &apimodel.SubscriptionResponse{}, err } return result.Payload, err } -// Subscription interface for xApp: POLICY -func (r *Subscriber) SubscribePolicy(p *apimodel.PolicyParams) (apimodel.SubscriptionResult, error) { - params := apipolicy.NewSubscribePolicyParamsWithTimeout(r.timeout).WithPolicyParams(p) - result, err := r.CreateTransport().Policy.SubscribePolicy(params) +// Subscription interface for xApp: DELETE +func (r *Subscriber) Unsubscribe(subId string) error { + params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId) + _, err := r.CreateTransport().Common.Unsubscribe(params) + + return err +} + +// Subscription interface for xApp: QUERY +func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) { + resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl)) if err != nil { - return apimodel.SubscriptionResult{}, err + return models.SubscriptionList{}, err } - return result.Payload, err + defer resp.Body.Close() + + contents, err := ioutil.ReadAll(resp.Body) + if err != nil { + return models.SubscriptionList{}, err + } + + subscriptions := models.SubscriptionList{} + err = json.Unmarshal([]byte(string(contents)), &subscriptions) + if err != nil { + return models.SubscriptionList{}, err + } + + return subscriptions, nil } -func (s *Subscriber) CreateTransport() *apiclient.RICSubscription { - return apiclient.New(httptransport.New(s.remoteHost, s.remoteUrl, s.remoteProt), strfmt.Default) +func (r *Subscriber) CreateTransport() *apiclient.RICSubscription { + return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default) } + +/*func (r *Subscriber) getXappConfig() (appconfig models.XappConfigList, err error) { + + Logger.Error("Inside getXappConfig") + + var metadata models.ConfigMetadata + var xappconfig models.XAppConfig + name := viper.GetString("name") + configtype := "json" + metadata.XappName = &name + metadata.ConfigType = &configtype + + configFile, err := os.Open("/opt/ric/config/config-file.json") + if err != nil { + Logger.Error("Cannot open config file: %v", err) + return nil,errors.New("Could Not parse the config file") + } + + body, err := ioutil.ReadAll(configFile) + + defer configFile.Close() + + xappconfig.Metadata = &metadata + xappconfig.Config = body + + appconfig = append(appconfig,&xappconfig) + + return appconfig,nil +}*/