X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fxapp%2Fsubscription.go;h=4ecc2624aec601eebd23c2d587a76c26be728c82;hb=b8b191fe31602c53414f8086381aef1a48e8be09;hp=6ceabf98b802dc29e2787fb114ad1e229b98c4e7;hpb=b6341a5002c90da3d531a710f1038d716ef8c9bc;p=ric-plt%2Fxapp-frame.git diff --git a/pkg/xapp/subscription.go b/pkg/xapp/subscription.go index 6ceabf9..4ecc262 100755 --- a/pkg/xapp/subscription.go +++ b/pkg/xapp/subscription.go @@ -20,14 +20,18 @@ package xapp import ( + "bytes" "encoding/json" "fmt" "github.com/go-openapi/loads" httptransport "github.com/go-openapi/runtime/client" "github.com/go-openapi/runtime/middleware" "github.com/go-openapi/strfmt" + "github.com/spf13/viper" "io/ioutil" + "net" "net/http" + "os" "time" apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi" @@ -48,6 +52,7 @@ import ( type SubscriptionHandler func(models.SubscriptionType, interface{}) (*models.SubscriptionResponse, error) type SubscriptionQueryHandler func() (models.SubscriptionList, error) type SubscriptionDeleteHandler func(string) error +type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse) type Subscriber struct { localAddr string @@ -56,29 +61,51 @@ type Subscriber struct { remoteUrl string remoteProt []string timeout time.Duration + clientUrl string + clientCB SubscriptionResponseCallback } func NewSubscriber(host string, timo int) *Subscriber { if host == "" { - host = "service-ricplt-submgr-http:8088" + pltnamespace := os.Getenv("PLT_NAMESPACE") + if pltnamespace == "" { + pltnamespace = "ricplt" + } + host = fmt.Sprintf("service-%s-submgr-http.%s:8088", pltnamespace, pltnamespace) } if timo == 0 { timo = 20 } - return &Subscriber{ + r := &Subscriber{ remoteHost: host, remoteUrl: "/ric/v1", remoteProt: []string{"http"}, timeout: time.Duration(timo) * time.Second, localAddr: "0.0.0.0", localPort: 8088, + clientUrl: "/ric/v1/subscriptions/response", + } + Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST") + + return r +} + +func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) { + if req.Body != nil { + var resp apimodel.SubscriptionResponse + if err := json.NewDecoder(req.Body).Decode(&resp); err == nil { + if r.clientCB != nil { + r.clientCB(&resp) + } + } + req.Body.Close() } } // Server interface: listen and receive subscription requests -func (r *Subscriber) Listen(add SubscriptionHandler, get SubscriptionQueryHandler, del SubscriptionDeleteHandler) error { +func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error { swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON) if err != nil { return err @@ -86,10 +113,10 @@ func (r *Subscriber) Listen(add SubscriptionHandler, get SubscriptionQueryHandle api := operations.NewXappFrameworkAPI(swaggerSpec) - // Subscription: query + // Subscription: Query api.QueryGetAllSubscriptionsHandler = query.GetAllSubscriptionsHandlerFunc( func(p query.GetAllSubscriptionsParams) middleware.Responder { - if resp, err := get(); err == nil { + if resp, err := getSubscription(); err == nil { return query.NewGetAllSubscriptionsOK().WithPayload(resp) } return query.NewGetAllSubscriptionsInternalServerError() @@ -98,25 +125,25 @@ func (r *Subscriber) Listen(add SubscriptionHandler, get SubscriptionQueryHandle // SubscriptionType: Report api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc( func(p report.SubscribeReportParams) middleware.Responder { - if resp, err := add(models.SubscriptionTypeReport, p.ReportParams); err == nil { + if resp, err := createSubscription(models.SubscriptionTypeReport, p.ReportParams); err == nil { return report.NewSubscribeReportCreated().WithPayload(resp) } return report.NewSubscribeReportInternalServerError() }) - // SubscriptionType: policy + // SubscriptionType: Policy api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc( func(p policy.SubscribePolicyParams) middleware.Responder { - if resp, err := add(models.SubscriptionTypePolicy, p.PolicyParams); err == nil { + if resp, err := createSubscription(models.SubscriptionTypePolicy, p.PolicyParams); err == nil { return policy.NewSubscribePolicyCreated().WithPayload(resp) } return policy.NewSubscribePolicyInternalServerError() }) - // SubscriptionType: delete + // SubscriptionType: Delete api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc( func(p common.UnsubscribeParams) middleware.Responder { - if err := del(p.SubscriptionID); err == nil { + if err := delSubscription(p.SubscriptionID); err == nil { return common.NewUnsubscribeNoContent() } return common.NewUnsubscribeInternalServerError() @@ -134,6 +161,51 @@ func (r *Subscriber) Listen(add SubscriptionHandler, get SubscriptionQueryHandle return nil } +// Server interface: send notification to client +func (r *Subscriber) Notify(resp *models.SubscriptionResponse, clientEndpoint string) (err error) { + respData, err := json.Marshal(resp) + if err != nil { + Logger.Error("json.Marshal failed: %v", err) + return err + } + + ep, _, _ := net.SplitHostPort(clientEndpoint) + _, port, _ := net.SplitHostPort(fmt.Sprintf(":%d", GetPortData("http").Port)) + clientUrl := fmt.Sprintf("http://%s:%s%s", ep, port, r.clientUrl) + + retries := viper.GetInt("subscription.retryCount") + if retries == 0 { + retries = 10 + } + + delay := viper.GetInt("subscription.retryDelay") + if delay == 0 { + delay = 5 + } + + for i := 0; i < retries; i++ { + r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData)) + if err == nil && (r != nil && r.StatusCode == http.StatusOK) { + break + } + + if err != nil { + Logger.Error("%v", err) + } + if r != nil && r.StatusCode != http.StatusOK { + Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode) + } + time.Sleep(time.Duration(delay) * time.Second) + } + + return err +} + +// Subscription interface for xApp: Response callback +func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) { + r.clientCB = c +} + // Subscription interface for xApp: REPORT func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (*apimodel.SubscriptionResponse, error) { params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p)