2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
25 "github.com/go-openapi/loads"
26 httptransport "github.com/go-openapi/runtime/client"
27 "github.com/go-openapi/runtime/middleware"
28 "github.com/go-openapi/strfmt"
33 apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
34 apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
35 apipolicy "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/policy"
36 apireport "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/report"
37 apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
39 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
40 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
41 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
42 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
43 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/policy"
44 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/query"
45 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/report"
48 type SubscriptionHandler func(models.SubscriptionType, interface{}) (*models.SubscriptionResponse, error)
49 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
50 type SubscriptionDeleteHandler func(string) error
52 type Subscriber struct {
61 func NewSubscriber(host string, timo int) *Subscriber {
63 host = "service-ricplt-submgr-http:8088"
73 remoteProt: []string{"http"},
74 timeout: time.Duration(timo) * time.Second,
80 // Server interface: listen and receive subscription requests
81 func (r *Subscriber) Listen(add SubscriptionHandler, get SubscriptionQueryHandler, del SubscriptionDeleteHandler) error {
82 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
87 api := operations.NewXappFrameworkAPI(swaggerSpec)
89 // Subscription: query
90 api.QueryGetAllSubscriptionsHandler = query.GetAllSubscriptionsHandlerFunc(
91 func(p query.GetAllSubscriptionsParams) middleware.Responder {
92 if resp, err := get(); err == nil {
93 return query.NewGetAllSubscriptionsOK().WithPayload(resp)
95 return query.NewGetAllSubscriptionsInternalServerError()
98 // SubscriptionType: Report
99 api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc(
100 func(p report.SubscribeReportParams) middleware.Responder {
101 if resp, err := add(models.SubscriptionTypeReport, p.ReportParams); err == nil {
102 return report.NewSubscribeReportCreated().WithPayload(resp)
104 return report.NewSubscribeReportInternalServerError()
107 // SubscriptionType: policy
108 api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc(
109 func(p policy.SubscribePolicyParams) middleware.Responder {
110 if resp, err := add(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
111 return policy.NewSubscribePolicyCreated().WithPayload(resp)
113 return policy.NewSubscribePolicyInternalServerError()
116 // SubscriptionType: delete
117 api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
118 func(p common.UnsubscribeParams) middleware.Responder {
119 if err := del(p.SubscriptionID); err == nil {
120 return common.NewUnsubscribeNoContent()
122 return common.NewUnsubscribeInternalServerError()
125 server := restapi.NewServer(api)
126 defer server.Shutdown()
127 server.Host = r.localAddr
128 server.Port = r.localPort
130 Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
131 if err := server.Serve(); err != nil {
137 // Subscription interface for xApp: REPORT
138 func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (*apimodel.SubscriptionResponse, error) {
139 params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p)
140 result, err := r.CreateTransport().Report.SubscribeReport(params)
142 return &apimodel.SubscriptionResponse{}, err
145 return result.Payload, err
148 // Subscription interface for xApp: POLICY
149 func (r *Subscriber) SubscribePolicy(p *apimodel.PolicyParams) (*apimodel.SubscriptionResponse, error) {
150 params := apipolicy.NewSubscribePolicyParamsWithTimeout(r.timeout).WithPolicyParams(p)
151 result, err := r.CreateTransport().Policy.SubscribePolicy(params)
153 return &apimodel.SubscriptionResponse{}, err
156 return result.Payload, err
159 // Subscription interface for xApp: DELETE
160 func (r *Subscriber) UnSubscribe(subId string) error {
161 params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
162 _, err := r.CreateTransport().Common.Unsubscribe(params)
167 // Subscription interface for xApp: QUERY
168 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
169 resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
171 return models.SubscriptionList{}, err
174 defer resp.Body.Close()
176 contents, err := ioutil.ReadAll(resp.Body)
178 return models.SubscriptionList{}, err
181 subscriptions := models.SubscriptionList{}
182 err = json.Unmarshal([]byte(string(contents)), &subscriptions)
184 return models.SubscriptionList{}, err
187 return subscriptions, nil
190 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
191 return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)