2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
26 "github.com/go-openapi/loads"
27 httptransport "github.com/go-openapi/runtime/client"
28 "github.com/go-openapi/runtime/middleware"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
36 apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
37 apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
38 apipolicy "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/policy"
39 apireport "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/report"
40 apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
42 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
43 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
44 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
45 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
46 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/policy"
47 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/query"
48 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/report"
51 type SubscriptionHandler func(models.SubscriptionType, interface{}) (*models.SubscriptionResponse, error)
52 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
53 type SubscriptionDeleteHandler func(string) error
54 type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
56 type Subscriber struct {
64 clientCB SubscriptionResponseCallback
67 func NewSubscriber(host string, timo int) *Subscriber {
69 host = "service-ricplt-submgr-http:8088"
79 remoteProt: []string{"http"},
80 timeout: time.Duration(timo) * time.Second,
83 clientUrl: "/ric/v1/subscriptions/response",
85 Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
90 func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
92 var resp apimodel.SubscriptionResponse
93 if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
94 if r.clientCB != nil {
102 // Server interface: listen and receive subscription requests
103 func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
104 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
109 api := operations.NewXappFrameworkAPI(swaggerSpec)
111 // Subscription: Query
112 api.QueryGetAllSubscriptionsHandler = query.GetAllSubscriptionsHandlerFunc(
113 func(p query.GetAllSubscriptionsParams) middleware.Responder {
114 if resp, err := getSubscription(); err == nil {
115 return query.NewGetAllSubscriptionsOK().WithPayload(resp)
117 return query.NewGetAllSubscriptionsInternalServerError()
120 // SubscriptionType: Report
121 api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc(
122 func(p report.SubscribeReportParams) middleware.Responder {
123 if resp, err := createSubscription(models.SubscriptionTypeReport, p.ReportParams); err == nil {
124 return report.NewSubscribeReportCreated().WithPayload(resp)
126 return report.NewSubscribeReportInternalServerError()
129 // SubscriptionType: Policy
130 api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc(
131 func(p policy.SubscribePolicyParams) middleware.Responder {
132 if resp, err := createSubscription(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
133 return policy.NewSubscribePolicyCreated().WithPayload(resp)
135 return policy.NewSubscribePolicyInternalServerError()
138 // SubscriptionType: Delete
139 api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
140 func(p common.UnsubscribeParams) middleware.Responder {
141 if err := delSubscription(p.SubscriptionID); err == nil {
142 return common.NewUnsubscribeNoContent()
144 return common.NewUnsubscribeInternalServerError()
147 server := restapi.NewServer(api)
148 defer server.Shutdown()
149 server.Host = r.localAddr
150 server.Port = r.localPort
152 Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
153 if err := server.Serve(); err != nil {
159 // Server interface: send notification to client
160 func (r *Subscriber) Notify(resp *models.SubscriptionResponse, clientEndpoint string) (err error) {
161 respData, err := json.Marshal(resp)
163 Logger.Error("json.Marshal failed: %v", err)
167 port := strings.Split(viper.GetString("local.host"), ":")[1]
168 clientUrl := fmt.Sprintf("http://%s:%s%s", clientEndpoint, port, r.clientUrl)
170 retries := viper.GetInt("subscription.retryCount")
175 delay := viper.GetInt("subscription.retryDelay")
180 for i := 0; i < retries; i++ {
181 r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
182 if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
187 Logger.Error("%v", err)
189 if r != nil && r.StatusCode != http.StatusOK {
190 Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
192 time.Sleep(time.Duration(delay) * time.Second)
198 // Subscription interface for xApp: Response callback
199 func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
203 // Subscription interface for xApp: REPORT
204 func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (*apimodel.SubscriptionResponse, error) {
205 params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p)
206 result, err := r.CreateTransport().Report.SubscribeReport(params)
208 return &apimodel.SubscriptionResponse{}, err
211 return result.Payload, err
214 // Subscription interface for xApp: POLICY
215 func (r *Subscriber) SubscribePolicy(p *apimodel.PolicyParams) (*apimodel.SubscriptionResponse, error) {
216 params := apipolicy.NewSubscribePolicyParamsWithTimeout(r.timeout).WithPolicyParams(p)
217 result, err := r.CreateTransport().Policy.SubscribePolicy(params)
219 return &apimodel.SubscriptionResponse{}, err
222 return result.Payload, err
225 // Subscription interface for xApp: DELETE
226 func (r *Subscriber) UnSubscribe(subId string) error {
227 params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
228 _, err := r.CreateTransport().Common.Unsubscribe(params)
233 // Subscription interface for xApp: QUERY
234 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
235 resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
237 return models.SubscriptionList{}, err
240 defer resp.Body.Close()
242 contents, err := ioutil.ReadAll(resp.Body)
244 return models.SubscriptionList{}, err
247 subscriptions := models.SubscriptionList{}
248 err = json.Unmarshal([]byte(string(contents)), &subscriptions)
250 return models.SubscriptionList{}, err
253 return subscriptions, nil
256 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
257 return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)