ffc34fbd8861b6b42294bb60d53c6f728d1841ae
[ric-plt/xapp-frame.git] / pkg / xapp / subscription.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "encoding/json"
24         "fmt"
25         "github.com/go-openapi/loads"
26         httptransport "github.com/go-openapi/runtime/client"
27         "github.com/go-openapi/runtime/middleware"
28         "github.com/go-openapi/strfmt"
29         "io/ioutil"
30         "net/http"
31         "time"
32
33         apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
34         apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
35         apipolicy "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/policy"
36         apireport "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/report"
37         apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
38
39         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
40         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
41         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
42         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
43         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/policy"
44         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/query"
45         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/report"
46 )
47
48 type SubscriptionHandler func(models.SubscriptionType, interface{}) (models.SubscriptionResponse, error)
49 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
50 type SubscriptionDeleteHandler func(string) error
51
52 type Subscriber struct {
53         localAddr  string
54         localPort  int
55         remoteHost string
56         remoteUrl  string
57         remoteProt []string
58         timeout    time.Duration
59 }
60
61 func NewSubscriber(host string, timo int) *Subscriber {
62         if host == "" {
63                 host = "service-ricplt-submgr-http:8088"
64         }
65
66         if timo == 0 {
67                 timo = 20
68         }
69
70         return &Subscriber{
71                 remoteHost: host,
72                 remoteUrl:  "/ric/v1",
73                 remoteProt: []string{"http"},
74                 timeout:    time.Duration(timo) * time.Second,
75                 localAddr:  "0.0.0.0",
76                 localPort:  8088,
77         }
78 }
79
80 // Server interface: listen and receive subscription requests
81 func (r *Subscriber) Listen(add SubscriptionHandler, get SubscriptionQueryHandler, del SubscriptionDeleteHandler) error {
82         swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
83         if err != nil {
84                 return err
85         }
86
87         api := operations.NewXappFrameworkAPI(swaggerSpec)
88
89         // Subscription: query
90         api.QueryGetAllSubscriptionsHandler = query.GetAllSubscriptionsHandlerFunc(
91                 func(p query.GetAllSubscriptionsParams) middleware.Responder {
92                         if resp, err := get(); err == nil {
93                                 return query.NewGetAllSubscriptionsOK().WithPayload(resp)
94                         }
95                         return query.NewGetAllSubscriptionsInternalServerError()
96                 })
97
98         // SubscriptionType: Report
99         api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc(
100                 func(p report.SubscribeReportParams) middleware.Responder {
101                         if resp, err := add(models.SubscriptionTypeReport, p.ReportParams); err == nil {
102                                 return report.NewSubscribeReportCreated().WithPayload(resp)
103                         }
104                         return report.NewSubscribeReportInternalServerError()
105                 })
106
107         // SubscriptionType: policy
108         api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc(
109                 func(p policy.SubscribePolicyParams) middleware.Responder {
110                         if resp, err := add(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
111                                 return policy.NewSubscribePolicyCreated().WithPayload(resp)
112                         }
113                         return policy.NewSubscribePolicyInternalServerError()
114                 })
115
116         // SubscriptionType: delete
117         api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
118                 func(p common.UnsubscribeParams) middleware.Responder {
119                         if err := del(p.SubscriptionID); err == nil {
120                                 return common.NewUnsubscribeNoContent()
121                         }
122                         return common.NewUnsubscribeInternalServerError()
123                 })
124
125         server := restapi.NewServer(api)
126         defer server.Shutdown()
127         server.Host = r.localAddr
128         server.Port = r.localPort
129
130         Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
131         if err := server.Serve(); err != nil {
132                 return err
133         }
134         return nil
135 }
136
137 // Subscription interface for xApp: REPORT
138 func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (apimodel.SubscriptionResponse, error) {
139         params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p)
140         result, err := r.CreateTransport().Report.SubscribeReport(params)
141         if err != nil {
142                 return apimodel.SubscriptionResponse{}, err
143         }
144
145         return result.Payload, err
146 }
147
148 // Subscription interface for xApp: POLICY
149 func (r *Subscriber) SubscribePolicy(p *apimodel.PolicyParams) (apimodel.SubscriptionResponse, error) {
150         params := apipolicy.NewSubscribePolicyParamsWithTimeout(r.timeout).WithPolicyParams(p)
151         result, err := r.CreateTransport().Policy.SubscribePolicy(params)
152         if err != nil {
153                 return apimodel.SubscriptionResponse{}, err
154         }
155
156         return result.Payload, err
157 }
158
159 // Subscription interface for xApp: DELETE
160 func (r *Subscriber) UnSubscribe(subId string) error {
161         params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
162         _, err := r.CreateTransport().Common.Unsubscribe(params)
163
164         return err
165 }
166
167 // Subscription interface for xApp: QUERY
168 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
169         resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
170         if err != nil {
171                 return models.SubscriptionList{}, err
172         }
173
174         defer resp.Body.Close()
175
176         contents, err := ioutil.ReadAll(resp.Body)
177         if err != nil {
178                 return models.SubscriptionList{}, err
179         }
180
181         subscriptions := models.SubscriptionList{}
182         err = json.Unmarshal([]byte(string(contents)), &subscriptions)
183         if err != nil {
184                 return models.SubscriptionList{}, err
185         }
186
187         return subscriptions, nil
188 }
189
190 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
191         return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)
192 }