cf8dc04a57c723c47d0e21a600617f62b4d3aaca
[ric-plt/xapp-frame.git] / pkg / xapp / subscription.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "github.com/go-openapi/loads"
27         httptransport "github.com/go-openapi/runtime/client"
28         "github.com/go-openapi/runtime/middleware"
29         "github.com/go-openapi/strfmt"
30         "github.com/spf13/viper"
31         "io/ioutil"
32         "net/http"
33         "strings"
34         "time"
35
36         apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
37         apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
38         apipolicy "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/policy"
39         apireport "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/report"
40         apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
41
42         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
43         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
44         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
45         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
46         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/policy"
47         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/query"
48         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/report"
49 )
50
51 type SubscriptionHandler func(models.SubscriptionType, interface{}) (*models.SubscriptionResponse, error)
52 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
53 type SubscriptionDeleteHandler func(string) error
54 type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
55
56 type Subscriber struct {
57         localAddr  string
58         localPort  int
59         remoteHost string
60         remoteUrl  string
61         remoteProt []string
62         timeout    time.Duration
63         clientUrl  string
64         clientCB   SubscriptionResponseCallback
65 }
66
67 func NewSubscriber(host string, timo int) *Subscriber {
68         if host == "" {
69                 host = "service-ricplt-submgr-http:8088"
70         }
71
72         if timo == 0 {
73                 timo = 20
74         }
75
76         r := &Subscriber{
77                 remoteHost: host,
78                 remoteUrl:  "/ric/v1",
79                 remoteProt: []string{"http"},
80                 timeout:    time.Duration(timo) * time.Second,
81                 localAddr:  "0.0.0.0",
82                 localPort:  8088,
83                 clientUrl:  "/ric/v1/subscriptions/response",
84         }
85         Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
86
87         return r
88 }
89
90 func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
91         if req.Body != nil {
92                 var resp apimodel.SubscriptionResponse
93                 if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
94                         if r.clientCB != nil {
95                                 r.clientCB(&resp)
96                         }
97                 }
98                 req.Body.Close()
99         }
100 }
101
102 // Server interface: listen and receive subscription requests
103 func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
104         swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
105         if err != nil {
106                 return err
107         }
108
109         api := operations.NewXappFrameworkAPI(swaggerSpec)
110
111         // Subscription: Query
112         api.QueryGetAllSubscriptionsHandler = query.GetAllSubscriptionsHandlerFunc(
113                 func(p query.GetAllSubscriptionsParams) middleware.Responder {
114                         if resp, err := getSubscription(); err == nil {
115                                 return query.NewGetAllSubscriptionsOK().WithPayload(resp)
116                         }
117                         return query.NewGetAllSubscriptionsInternalServerError()
118                 })
119
120         // SubscriptionType: Report
121         api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc(
122                 func(p report.SubscribeReportParams) middleware.Responder {
123                         if resp, err := createSubscription(models.SubscriptionTypeReport, p.ReportParams); err == nil {
124                                 return report.NewSubscribeReportCreated().WithPayload(resp)
125                         }
126                         return report.NewSubscribeReportInternalServerError()
127                 })
128
129         // SubscriptionType: Policy
130         api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc(
131                 func(p policy.SubscribePolicyParams) middleware.Responder {
132                         if resp, err := createSubscription(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
133                                 return policy.NewSubscribePolicyCreated().WithPayload(resp)
134                         }
135                         return policy.NewSubscribePolicyInternalServerError()
136                 })
137
138         // SubscriptionType: Delete
139         api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
140                 func(p common.UnsubscribeParams) middleware.Responder {
141                         if err := delSubscription(p.SubscriptionID); err == nil {
142                                 return common.NewUnsubscribeNoContent()
143                         }
144                         return common.NewUnsubscribeInternalServerError()
145                 })
146
147         server := restapi.NewServer(api)
148         defer server.Shutdown()
149         server.Host = r.localAddr
150         server.Port = r.localPort
151
152         Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
153         if err := server.Serve(); err != nil {
154                 return err
155         }
156         return nil
157 }
158
159 // Server interface: send notification to client
160 func (r *Subscriber) Notify(resp *models.SubscriptionResponse, clientEndpoint string) (err error) {
161         respData, err := json.Marshal(resp)
162         if err != nil {
163                 Logger.Error("json.Marshal failed: %v", err)
164                 return err
165         }
166
167         port := strings.Split(viper.GetString("local.host"), ":")[1]
168         clientUrl := fmt.Sprintf("http://%s:%s%s", clientEndpoint, port, r.clientUrl)
169
170         retries := viper.GetInt("subscription.retryCount")
171         if retries == 0 {
172                 retries = 10
173         }
174
175         delay := viper.GetInt("subscription.retryDelay")
176         if delay == 0 {
177                 delay = 5
178         }
179
180         for i := 0; i < retries; i++ {
181                 r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
182                 if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
183                         break
184                 }
185
186                 if err != nil {
187                         Logger.Error("%v", err)
188                 }
189                 if r != nil && r.StatusCode != http.StatusOK {
190                         Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
191                 }
192                 time.Sleep(time.Duration(delay) * time.Second)
193         }
194
195         return err
196 }
197
198 // Subscription interface for xApp: Response callback
199 func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
200         r.clientCB = c
201 }
202
203 // Subscription interface for xApp: REPORT
204 func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (*apimodel.SubscriptionResponse, error) {
205         params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p)
206         result, err := r.CreateTransport().Report.SubscribeReport(params)
207         if err != nil {
208                 return &apimodel.SubscriptionResponse{}, err
209         }
210
211         return result.Payload, err
212 }
213
214 // Subscription interface for xApp: POLICY
215 func (r *Subscriber) SubscribePolicy(p *apimodel.PolicyParams) (*apimodel.SubscriptionResponse, error) {
216         params := apipolicy.NewSubscribePolicyParamsWithTimeout(r.timeout).WithPolicyParams(p)
217         result, err := r.CreateTransport().Policy.SubscribePolicy(params)
218         if err != nil {
219                 return &apimodel.SubscriptionResponse{}, err
220         }
221
222         return result.Payload, err
223 }
224
225 // Subscription interface for xApp: DELETE
226 func (r *Subscriber) UnSubscribe(subId string) error {
227         params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
228         _, err := r.CreateTransport().Common.Unsubscribe(params)
229
230         return err
231 }
232
233 // Subscription interface for xApp: QUERY
234 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
235         resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
236         if err != nil {
237                 return models.SubscriptionList{}, err
238         }
239
240         defer resp.Body.Close()
241
242         contents, err := ioutil.ReadAll(resp.Body)
243         if err != nil {
244                 return models.SubscriptionList{}, err
245         }
246
247         subscriptions := models.SubscriptionList{}
248         err = json.Unmarshal([]byte(string(contents)), &subscriptions)
249         if err != nil {
250                 return models.SubscriptionList{}, err
251         }
252
253         return subscriptions, nil
254 }
255
256 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
257         return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)
258 }