2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
26 "github.com/go-openapi/loads"
27 httptransport "github.com/go-openapi/runtime/client"
28 "github.com/go-openapi/runtime/middleware"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
36 apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
37 apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
38 apipolicy "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/policy"
39 apireport "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/report"
40 apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
42 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
43 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
44 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
45 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
46 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/policy"
47 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/query"
48 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/report"
51 type SubscriptionHandler func(models.SubscriptionType, interface{}) (*models.SubscriptionResponse, error)
52 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
53 type SubscriptionDeleteHandler func(string) error
54 type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
56 type Subscriber struct {
64 clientCB SubscriptionResponseCallback
67 func NewSubscriber(host string, timo int) *Subscriber {
69 host = "service-ricplt-submgr-http.ricplt:8088"
79 remoteProt: []string{"http"},
80 timeout: time.Duration(timo) * time.Second,
83 clientUrl: "/ric/v1/subscriptions/response",
85 Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
90 func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
92 var resp apimodel.SubscriptionResponse
93 if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
94 if r.clientCB != nil {
102 // Server interface: listen and receive subscription requests
103 func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
104 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
109 api := operations.NewXappFrameworkAPI(swaggerSpec)
111 // Subscription: Query
112 api.QueryGetAllSubscriptionsHandler = query.GetAllSubscriptionsHandlerFunc(
113 func(p query.GetAllSubscriptionsParams) middleware.Responder {
114 if resp, err := getSubscription(); err == nil {
115 return query.NewGetAllSubscriptionsOK().WithPayload(resp)
117 return query.NewGetAllSubscriptionsInternalServerError()
120 // SubscriptionType: Report
121 api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc(
122 func(p report.SubscribeReportParams) middleware.Responder {
123 if resp, err := createSubscription(models.SubscriptionTypeReport, p.ReportParams); err == nil {
124 return report.NewSubscribeReportCreated().WithPayload(resp)
126 return report.NewSubscribeReportInternalServerError()
129 // SubscriptionType: Policy
130 api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc(
131 func(p policy.SubscribePolicyParams) middleware.Responder {
132 if resp, err := createSubscription(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
133 return policy.NewSubscribePolicyCreated().WithPayload(resp)
135 return policy.NewSubscribePolicyInternalServerError()
138 // SubscriptionType: Delete
139 api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
140 func(p common.UnsubscribeParams) middleware.Responder {
141 if err := delSubscription(p.SubscriptionID); err == nil {
142 return common.NewUnsubscribeNoContent()
144 return common.NewUnsubscribeInternalServerError()
147 server := restapi.NewServer(api)
148 defer server.Shutdown()
149 server.Host = r.localAddr
150 server.Port = r.localPort
152 Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
153 if err := server.Serve(); err != nil {
159 // Server interface: send notification to client
160 func (r *Subscriber) Notify(resp *models.SubscriptionResponse, clientEndpoint string) (err error) {
161 respData, err := json.Marshal(resp)
163 Logger.Error("json.Marshal failed: %v", err)
167 ep, _, _ := net.SplitHostPort(clientEndpoint)
168 _, port, _ := net.SplitHostPort(viper.GetString("local.host"))
169 clientUrl := fmt.Sprintf("http://%s:%s%s", ep, port, r.clientUrl)
171 retries := viper.GetInt("subscription.retryCount")
176 delay := viper.GetInt("subscription.retryDelay")
181 for i := 0; i < retries; i++ {
182 r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
183 if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
188 Logger.Error("%v", err)
190 if r != nil && r.StatusCode != http.StatusOK {
191 Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
193 time.Sleep(time.Duration(delay) * time.Second)
199 // Subscription interface for xApp: Response callback
200 func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
204 // Subscription interface for xApp: REPORT
205 func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (*apimodel.SubscriptionResponse, error) {
206 params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p)
207 result, err := r.CreateTransport().Report.SubscribeReport(params)
209 return &apimodel.SubscriptionResponse{}, err
212 return result.Payload, err
215 // Subscription interface for xApp: POLICY
216 func (r *Subscriber) SubscribePolicy(p *apimodel.PolicyParams) (*apimodel.SubscriptionResponse, error) {
217 params := apipolicy.NewSubscribePolicyParamsWithTimeout(r.timeout).WithPolicyParams(p)
218 result, err := r.CreateTransport().Policy.SubscribePolicy(params)
220 return &apimodel.SubscriptionResponse{}, err
223 return result.Payload, err
226 // Subscription interface for xApp: DELETE
227 func (r *Subscriber) UnSubscribe(subId string) error {
228 params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
229 _, err := r.CreateTransport().Common.Unsubscribe(params)
234 // Subscription interface for xApp: QUERY
235 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
236 resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
238 return models.SubscriptionList{}, err
241 defer resp.Body.Close()
243 contents, err := ioutil.ReadAll(resp.Body)
245 return models.SubscriptionList{}, err
248 subscriptions := models.SubscriptionList{}
249 err = json.Unmarshal([]byte(string(contents)), &subscriptions)
251 return models.SubscriptionList{}, err
254 return subscriptions, nil
257 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
258 return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)