bf140c4ce9aef551bada531e22d2a1d705baa612
[ric-plt/xapp-frame.git] / pkg / xapp / subscription.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "github.com/go-openapi/loads"
27         httptransport "github.com/go-openapi/runtime/client"
28         "github.com/go-openapi/runtime/middleware"
29         "github.com/go-openapi/strfmt"
30         "github.com/spf13/viper"
31         "io/ioutil"
32         "net"
33         "net/http"
34         "os"
35         "time"
36         //"errors"
37
38         apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
39         apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
40         apipolicy "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/policy"
41         apireport "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/report"
42         apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
43
44         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
45         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
46         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
47         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
48         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/policy"
49         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/query"
50         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/report"
51         //"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/xapp"
52 )
53
54 type SubscriptionHandler func(models.SubscriptionType, interface{}) (*models.SubscriptionResponse, error)
55 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
56 type SubscriptionDeleteHandler func(string) error
57 type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
58
59 type Subscriber struct {
60         localAddr  string
61         localPort  int
62         remoteHost string
63         remoteUrl  string
64         remoteProt []string
65         timeout    time.Duration
66         clientUrl  string
67         clientCB   SubscriptionResponseCallback
68 }
69
70 func NewSubscriber(host string, timo int) *Subscriber {
71         if host == "" {
72                 pltnamespace := os.Getenv("PLT_NAMESPACE")
73                 if pltnamespace == "" {
74                         pltnamespace = "ricplt"
75                 }
76                 host = fmt.Sprintf("service-%s-submgr-http.%s:8088", pltnamespace, pltnamespace)
77         }
78
79         if timo == 0 {
80                 timo = 20
81         }
82
83         r := &Subscriber{
84                 remoteHost: host,
85                 remoteUrl:  "/ric/v1",
86                 remoteProt: []string{"http"},
87                 timeout:    time.Duration(timo) * time.Second,
88                 localAddr:  "0.0.0.0",
89                 localPort:  8088,
90                 clientUrl:  "/ric/v1/subscriptions/response",
91         }
92         Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
93
94         return r
95 }
96
97 func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
98         if req.Body != nil {
99                 var resp apimodel.SubscriptionResponse
100                 if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
101                         if r.clientCB != nil {
102                                 r.clientCB(&resp)
103                         }
104                 }
105                 req.Body.Close()
106         }
107 }
108
109 // Server interface: listen and receive subscription requests
110 func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
111         swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
112         if err != nil {
113                 return err
114         }
115
116         api := operations.NewXappFrameworkAPI(swaggerSpec)
117
118         // Subscription: Query
119         api.QueryGetAllSubscriptionsHandler = query.GetAllSubscriptionsHandlerFunc(
120                 func(p query.GetAllSubscriptionsParams) middleware.Responder {
121                         if resp, err := getSubscription(); err == nil {
122                                 return query.NewGetAllSubscriptionsOK().WithPayload(resp)
123                         }
124                         return query.NewGetAllSubscriptionsInternalServerError()
125                 })
126
127         // SubscriptionType: Report
128         api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc(
129                 func(p report.SubscribeReportParams) middleware.Responder {
130                         if resp, err := createSubscription(models.SubscriptionTypeReport, p.ReportParams); err == nil {
131                                 return report.NewSubscribeReportCreated().WithPayload(resp)
132                         }
133                         return report.NewSubscribeReportInternalServerError()
134                 })
135
136         // SubscriptionType: Policy
137         api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc(
138                 func(p policy.SubscribePolicyParams) middleware.Responder {
139                         if resp, err := createSubscription(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
140                                 return policy.NewSubscribePolicyCreated().WithPayload(resp)
141                         }
142                         return policy.NewSubscribePolicyInternalServerError()
143                 })
144
145         // SubscriptionType: Delete
146         api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
147                 func(p common.UnsubscribeParams) middleware.Responder {
148                         if err := delSubscription(p.SubscriptionID); err == nil {
149                                 return common.NewUnsubscribeNoContent()
150                         }
151                         return common.NewUnsubscribeInternalServerError()
152                 })
153
154         // XApp: Get Config
155         /*api.XappGetXappConfigListHandler = xapp.GetXappConfigListHandlerFunc(
156                         func(p xapp.GetXappConfigListParams) middleware.Responder {
157                     Logger.Info("Hitting xapp config")
158                                 if resp,err := r.getXappConfig(); err == nil {
159                                         return xapp.NewGetXappConfigListOK().WithPayload(resp)
160                                 }
161                                 return xapp.NewGetXappConfigListInternalServerError()
162                     })*/
163
164         server := restapi.NewServer(api)
165         defer server.Shutdown()
166         server.Host = r.localAddr
167         server.Port = r.localPort
168
169         Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
170         if err := server.Serve(); err != nil {
171                 return err
172         }
173         return nil
174 }
175
176 // Server interface: send notification to client
177 func (r *Subscriber) Notify(resp *models.SubscriptionResponse, clientEndpoint string) (err error) {
178         respData, err := json.Marshal(resp)
179         if err != nil {
180                 Logger.Error("json.Marshal failed: %v", err)
181                 return err
182         }
183
184         ep, _, _ := net.SplitHostPort(clientEndpoint)
185         _, port, _ := net.SplitHostPort(fmt.Sprintf(":%d", GetPortData("http").Port))
186         clientUrl := fmt.Sprintf("http://%s:%s%s", ep, port, r.clientUrl)
187
188         retries := viper.GetInt("subscription.retryCount")
189         if retries == 0 {
190                 retries = 10
191         }
192
193         delay := viper.GetInt("subscription.retryDelay")
194         if delay == 0 {
195                 delay = 5
196         }
197
198         for i := 0; i < retries; i++ {
199                 r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
200                 if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
201                         break
202                 }
203
204                 if err != nil {
205                         Logger.Error("%v", err)
206                 }
207                 if r != nil && r.StatusCode != http.StatusOK {
208                         Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
209                 }
210                 time.Sleep(time.Duration(delay) * time.Second)
211         }
212
213         return err
214 }
215
216 // Subscription interface for xApp: Response callback
217 func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
218         r.clientCB = c
219 }
220
221 // Subscription interface for xApp: REPORT
222 func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (*apimodel.SubscriptionResponse, error) {
223         params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p)
224         result, err := r.CreateTransport().Report.SubscribeReport(params)
225         if err != nil {
226                 return &apimodel.SubscriptionResponse{}, err
227         }
228
229         return result.Payload, err
230 }
231
232 // Subscription interface for xApp: POLICY
233 func (r *Subscriber) SubscribePolicy(p *apimodel.PolicyParams) (*apimodel.SubscriptionResponse, error) {
234         params := apipolicy.NewSubscribePolicyParamsWithTimeout(r.timeout).WithPolicyParams(p)
235         result, err := r.CreateTransport().Policy.SubscribePolicy(params)
236         if err != nil {
237                 return &apimodel.SubscriptionResponse{}, err
238         }
239
240         return result.Payload, err
241 }
242
243 // Subscription interface for xApp: DELETE
244 func (r *Subscriber) UnSubscribe(subId string) error {
245         params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
246         _, err := r.CreateTransport().Common.Unsubscribe(params)
247
248         return err
249 }
250
251 // Subscription interface for xApp: QUERY
252 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
253         resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
254         if err != nil {
255                 return models.SubscriptionList{}, err
256         }
257
258         defer resp.Body.Close()
259
260         contents, err := ioutil.ReadAll(resp.Body)
261         if err != nil {
262                 return models.SubscriptionList{}, err
263         }
264
265         subscriptions := models.SubscriptionList{}
266         err = json.Unmarshal([]byte(string(contents)), &subscriptions)
267         if err != nil {
268                 return models.SubscriptionList{}, err
269         }
270
271         return subscriptions, nil
272 }
273
274 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
275         return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)
276 }
277
278 /*func (r *Subscriber) getXappConfig() (appconfig models.XappConfigList, err error) {
279
280     Logger.Error("Inside getXappConfig")
281
282                 var metadata models.ConfigMetadata
283         var xappconfig models.XAppConfig
284         name := viper.GetString("name")
285         configtype := "json"
286                 metadata.XappName = &name
287                 metadata.ConfigType = &configtype
288
289         configFile, err := os.Open("/opt/ric/config/config-file.json")
290         if err != nil {
291                 Logger.Error("Cannot open config file: %v", err)
292                 return nil,errors.New("Could Not parse the config file")
293         }
294
295         body, err := ioutil.ReadAll(configFile)
296
297         defer configFile.Close()
298
299                 xappconfig.Metadata = &metadata
300                 xappconfig.Config = body
301
302         appconfig = append(appconfig,&xappconfig)
303
304                 return appconfig,nil
305 }*/