Get interface for subscriptions
[ric-plt/xapp-frame.git] / pkg / xapp / subscription.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "encoding/json"
24         "fmt"
25         "github.com/go-openapi/loads"
26         httptransport "github.com/go-openapi/runtime/client"
27         "github.com/go-openapi/runtime/middleware"
28         "github.com/go-openapi/strfmt"
29         "io/ioutil"
30         "net/http"
31         "time"
32
33         apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
34         apicontrol "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/control"
35         apipolicy "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/policy"
36         apireport "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/report"
37         apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
38
39         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
40         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
41         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
42         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/control"
43         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/policy"
44         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/query"
45         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/report"
46 )
47
48 type SubscriptionHandler func(models.SubscriptionType, interface{}) (models.SubscriptionResult, error)
49 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
50
51 type Subscriber struct {
52         localAddr  string
53         localPort  int
54         remoteHost string
55         remoteUrl  string
56         remoteProt []string
57         timeout    time.Duration
58 }
59
60 func NewSubscriber(host string, timo int) *Subscriber {
61         if host == "" {
62                 host = "service-ricplt-submgr-http:8088"
63         }
64
65         if timo == 0 {
66                 timo = 20
67         }
68
69         return &Subscriber{
70                 remoteHost: host,
71                 remoteUrl:  "/ric/v1",
72                 remoteProt: []string{"http"},
73                 timeout:    time.Duration(timo) * time.Second,
74                 localAddr:  "0.0.0.0",
75                 localPort:  8088,
76         }
77 }
78
79 // Server interface: listen and receive subscription requests
80 func (r *Subscriber) Listen(subHandler SubscriptionHandler, queryHandler SubscriptionQueryHandler) error {
81         swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
82         if err != nil {
83                 return err
84         }
85
86         api := operations.NewXappFrameworkAPI(swaggerSpec)
87
88         // Subscription: query
89         api.QueryGetAllSubscriptionsHandler = query.GetAllSubscriptionsHandlerFunc(
90                 func(p query.GetAllSubscriptionsParams) middleware.Responder {
91                         if resp, err := queryHandler(); err == nil {
92                                 return query.NewGetAllSubscriptionsOK().WithPayload(resp)
93                         }
94                         return query.NewGetAllSubscriptionsInternalServerError()
95                 })
96
97         // SubscriptionType: Report
98         api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc(
99                 func(p report.SubscribeReportParams) middleware.Responder {
100                         if resp, err := subHandler(models.SubscriptionTypeReport, p.ReportParams); err == nil {
101                                 return report.NewSubscribeReportCreated().WithPayload(resp)
102                         }
103                         return report.NewSubscribeReportInternalServerError()
104                 })
105
106         // SubscriptionType: Control
107         api.ControlSubscribeControlHandler = control.SubscribeControlHandlerFunc(
108                 func(p control.SubscribeControlParams) middleware.Responder {
109                         if resp, err := subHandler(models.SubscriptionTypeControl, p.ControlParams); err == nil {
110                                 return control.NewSubscribeControlCreated().WithPayload(resp)
111                         }
112                         return control.NewSubscribeControlInternalServerError()
113                 })
114
115         // SubscriptionType: policy
116         api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc(
117                 func(p policy.SubscribePolicyParams) middleware.Responder {
118                         if resp, err := subHandler(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
119                                 return policy.NewSubscribePolicyCreated().WithPayload(resp)
120                         }
121                         return policy.NewSubscribePolicyInternalServerError()
122                 })
123
124         server := restapi.NewServer(api)
125         defer server.Shutdown()
126         server.Host = r.localAddr
127         server.Port = r.localPort
128
129         Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
130         if err := server.Serve(); err != nil {
131                 return err
132         }
133         return nil
134 }
135
136 // Subscription interface for xApp: REPORT
137 func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (apimodel.SubscriptionResult, error) {
138         params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p)
139         result, err := r.CreateTransport().Report.SubscribeReport(params)
140         if err != nil {
141                 return apimodel.SubscriptionResult{}, err
142         }
143
144         return result.Payload, err
145 }
146
147 // Subscription interface for xApp: CONTROL
148 func (r *Subscriber) SubscribeControl(p *apimodel.ControlParams) (apimodel.SubscriptionResult, error) {
149         params := apicontrol.NewSubscribeControlParamsWithTimeout(r.timeout).WithControlParams(p)
150         result, err := r.CreateTransport().Control.SubscribeControl(params)
151         if err != nil {
152                 return apimodel.SubscriptionResult{}, err
153         }
154
155         return result.Payload, err
156 }
157
158 // Subscription interface for xApp: POLICY
159 func (r *Subscriber) SubscribePolicy(p *apimodel.PolicyParams) (apimodel.SubscriptionResult, error) {
160         params := apipolicy.NewSubscribePolicyParamsWithTimeout(r.timeout).WithPolicyParams(p)
161         result, err := r.CreateTransport().Policy.SubscribePolicy(params)
162         if err != nil {
163                 return apimodel.SubscriptionResult{}, err
164         }
165
166         return result.Payload, err
167 }
168
169 // Subscription interface for xApp: QUERY
170 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
171         resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
172         if err != nil {
173                 return models.SubscriptionList{}, err
174         }
175
176         defer resp.Body.Close()
177
178         contents, err := ioutil.ReadAll(resp.Body)
179         if err != nil {
180                 return models.SubscriptionList{}, err
181         }
182
183         subscriptions := models.SubscriptionList{}
184         err = json.Unmarshal([]byte(string(contents)), &subscriptions)
185         if err != nil {
186                 return models.SubscriptionList{}, err
187         }
188
189         return subscriptions, nil
190 }
191
192 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
193         return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)
194 }