Defer alarm sysrem init
[ric-plt/xapp-frame.git] / pkg / xapp / subscription.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "github.com/go-openapi/loads"
27         httptransport "github.com/go-openapi/runtime/client"
28         "github.com/go-openapi/runtime/middleware"
29         "github.com/go-openapi/strfmt"
30         "github.com/spf13/viper"
31         "io/ioutil"
32         "net"
33         "net/http"
34         "time"
35
36         apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
37         apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
38         apipolicy "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/policy"
39         apireport "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/report"
40         apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
41
42         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
43         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
44         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
45         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
46         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/policy"
47         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/query"
48         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/report"
49 )
50
51 type SubscriptionHandler func(models.SubscriptionType, interface{}) (*models.SubscriptionResponse, error)
52 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
53 type SubscriptionDeleteHandler func(string) error
54 type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
55
56 type Subscriber struct {
57         localAddr  string
58         localPort  int
59         remoteHost string
60         remoteUrl  string
61         remoteProt []string
62         timeout    time.Duration
63         clientUrl  string
64         clientCB   SubscriptionResponseCallback
65 }
66
67 func NewSubscriber(host string, timo int) *Subscriber {
68         if host == "" {
69                 host = "service-ricplt-submgr-http.ricplt:8088"
70         }
71
72         if timo == 0 {
73                 timo = 20
74         }
75
76         r := &Subscriber{
77                 remoteHost: host,
78                 remoteUrl:  "/ric/v1",
79                 remoteProt: []string{"http"},
80                 timeout:    time.Duration(timo) * time.Second,
81                 localAddr:  "0.0.0.0",
82                 localPort:  8088,
83                 clientUrl:  "/ric/v1/subscriptions/response",
84         }
85         Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
86
87         return r
88 }
89
90 func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
91         if req.Body != nil {
92                 var resp apimodel.SubscriptionResponse
93                 if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
94                         if r.clientCB != nil {
95                                 r.clientCB(&resp)
96                         }
97                 }
98                 req.Body.Close()
99         }
100 }
101
102 // Server interface: listen and receive subscription requests
103 func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
104         swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
105         if err != nil {
106                 return err
107         }
108
109         api := operations.NewXappFrameworkAPI(swaggerSpec)
110
111         // Subscription: Query
112         api.QueryGetAllSubscriptionsHandler = query.GetAllSubscriptionsHandlerFunc(
113                 func(p query.GetAllSubscriptionsParams) middleware.Responder {
114                         if resp, err := getSubscription(); err == nil {
115                                 return query.NewGetAllSubscriptionsOK().WithPayload(resp)
116                         }
117                         return query.NewGetAllSubscriptionsInternalServerError()
118                 })
119
120         // SubscriptionType: Report
121         api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc(
122                 func(p report.SubscribeReportParams) middleware.Responder {
123                         if resp, err := createSubscription(models.SubscriptionTypeReport, p.ReportParams); err == nil {
124                                 return report.NewSubscribeReportCreated().WithPayload(resp)
125                         }
126                         return report.NewSubscribeReportInternalServerError()
127                 })
128
129         // SubscriptionType: Policy
130         api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc(
131                 func(p policy.SubscribePolicyParams) middleware.Responder {
132                         if resp, err := createSubscription(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
133                                 return policy.NewSubscribePolicyCreated().WithPayload(resp)
134                         }
135                         return policy.NewSubscribePolicyInternalServerError()
136                 })
137
138         // SubscriptionType: Delete
139         api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
140                 func(p common.UnsubscribeParams) middleware.Responder {
141                         if err := delSubscription(p.SubscriptionID); err == nil {
142                                 return common.NewUnsubscribeNoContent()
143                         }
144                         return common.NewUnsubscribeInternalServerError()
145                 })
146
147         server := restapi.NewServer(api)
148         defer server.Shutdown()
149         server.Host = r.localAddr
150         server.Port = r.localPort
151
152         Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
153         if err := server.Serve(); err != nil {
154                 return err
155         }
156         return nil
157 }
158
159 // Server interface: send notification to client
160 func (r *Subscriber) Notify(resp *models.SubscriptionResponse, clientEndpoint string) (err error) {
161         respData, err := json.Marshal(resp)
162         if err != nil {
163                 Logger.Error("json.Marshal failed: %v", err)
164                 return err
165         }
166
167         ep, _, _ := net.SplitHostPort(clientEndpoint)
168         _, port, _ := net.SplitHostPort(viper.GetString("local.host"))
169         clientUrl := fmt.Sprintf("http://%s:%s%s", ep, port, r.clientUrl)
170
171         retries := viper.GetInt("subscription.retryCount")
172         if retries == 0 {
173                 retries = 10
174         }
175
176         delay := viper.GetInt("subscription.retryDelay")
177         if delay == 0 {
178                 delay = 5
179         }
180
181         for i := 0; i < retries; i++ {
182                 r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
183                 if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
184                         break
185                 }
186
187                 if err != nil {
188                         Logger.Error("%v", err)
189                 }
190                 if r != nil && r.StatusCode != http.StatusOK {
191                         Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
192                 }
193                 time.Sleep(time.Duration(delay) * time.Second)
194         }
195
196         return err
197 }
198
199 // Subscription interface for xApp: Response callback
200 func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
201         r.clientCB = c
202 }
203
204 // Subscription interface for xApp: REPORT
205 func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (*apimodel.SubscriptionResponse, error) {
206         params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p)
207         result, err := r.CreateTransport().Report.SubscribeReport(params)
208         if err != nil {
209                 return &apimodel.SubscriptionResponse{}, err
210         }
211
212         return result.Payload, err
213 }
214
215 // Subscription interface for xApp: POLICY
216 func (r *Subscriber) SubscribePolicy(p *apimodel.PolicyParams) (*apimodel.SubscriptionResponse, error) {
217         params := apipolicy.NewSubscribePolicyParamsWithTimeout(r.timeout).WithPolicyParams(p)
218         result, err := r.CreateTransport().Policy.SubscribePolicy(params)
219         if err != nil {
220                 return &apimodel.SubscriptionResponse{}, err
221         }
222
223         return result.Payload, err
224 }
225
226 // Subscription interface for xApp: DELETE
227 func (r *Subscriber) UnSubscribe(subId string) error {
228         params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
229         _, err := r.CreateTransport().Common.Unsubscribe(params)
230
231         return err
232 }
233
234 // Subscription interface for xApp: QUERY
235 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
236         resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
237         if err != nil {
238                 return models.SubscriptionList{}, err
239         }
240
241         defer resp.Body.Close()
242
243         contents, err := ioutil.ReadAll(resp.Body)
244         if err != nil {
245                 return models.SubscriptionList{}, err
246         }
247
248         subscriptions := models.SubscriptionList{}
249         err = json.Unmarshal([]byte(string(contents)), &subscriptions)
250         if err != nil {
251                 return models.SubscriptionList{}, err
252         }
253
254         return subscriptions, nil
255 }
256
257 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
258         return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)
259 }