xapp-frame namespace changes
[ric-plt/xapp-frame.git] / pkg / xapp / subscription.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "github.com/go-openapi/loads"
27         httptransport "github.com/go-openapi/runtime/client"
28         "github.com/go-openapi/runtime/middleware"
29         "github.com/go-openapi/strfmt"
30         "github.com/spf13/viper"
31         "io/ioutil"
32         "net"
33         "net/http"
34         "os"
35         "time"
36
37         apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
38         apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
39         apipolicy "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/policy"
40         apireport "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/report"
41         apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
42
43         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
44         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
45         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
46         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
47         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/policy"
48         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/query"
49         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/report"
50 )
51
52 type SubscriptionHandler func(models.SubscriptionType, interface{}) (*models.SubscriptionResponse, error)
53 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
54 type SubscriptionDeleteHandler func(string) error
55 type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
56
57 type Subscriber struct {
58         localAddr  string
59         localPort  int
60         remoteHost string
61         remoteUrl  string
62         remoteProt []string
63         timeout    time.Duration
64         clientUrl  string
65         clientCB   SubscriptionResponseCallback
66 }
67
68 func NewSubscriber(host string, timo int) *Subscriber {
69         if host == "" {
70                 pltnamespace := os.Getenv("PLT_NAMESPACE")
71                 if pltnamespace == "" {
72                         pltnamespace = "ricplt"
73                 }
74                 host = fmt.Sprintf("service-%s-submgr-http.%s:8088", pltnamespace, pltnamespace)
75         }
76
77         if timo == 0 {
78                 timo = 20
79         }
80
81         r := &Subscriber{
82                 remoteHost: host,
83                 remoteUrl:  "/ric/v1",
84                 remoteProt: []string{"http"},
85                 timeout:    time.Duration(timo) * time.Second,
86                 localAddr:  "0.0.0.0",
87                 localPort:  8088,
88                 clientUrl:  "/ric/v1/subscriptions/response",
89         }
90         Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
91
92         return r
93 }
94
95 func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
96         if req.Body != nil {
97                 var resp apimodel.SubscriptionResponse
98                 if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
99                         if r.clientCB != nil {
100                                 r.clientCB(&resp)
101                         }
102                 }
103                 req.Body.Close()
104         }
105 }
106
107 // Server interface: listen and receive subscription requests
108 func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
109         swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
110         if err != nil {
111                 return err
112         }
113
114         api := operations.NewXappFrameworkAPI(swaggerSpec)
115
116         // Subscription: Query
117         api.QueryGetAllSubscriptionsHandler = query.GetAllSubscriptionsHandlerFunc(
118                 func(p query.GetAllSubscriptionsParams) middleware.Responder {
119                         if resp, err := getSubscription(); err == nil {
120                                 return query.NewGetAllSubscriptionsOK().WithPayload(resp)
121                         }
122                         return query.NewGetAllSubscriptionsInternalServerError()
123                 })
124
125         // SubscriptionType: Report
126         api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc(
127                 func(p report.SubscribeReportParams) middleware.Responder {
128                         if resp, err := createSubscription(models.SubscriptionTypeReport, p.ReportParams); err == nil {
129                                 return report.NewSubscribeReportCreated().WithPayload(resp)
130                         }
131                         return report.NewSubscribeReportInternalServerError()
132                 })
133
134         // SubscriptionType: Policy
135         api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc(
136                 func(p policy.SubscribePolicyParams) middleware.Responder {
137                         if resp, err := createSubscription(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
138                                 return policy.NewSubscribePolicyCreated().WithPayload(resp)
139                         }
140                         return policy.NewSubscribePolicyInternalServerError()
141                 })
142
143         // SubscriptionType: Delete
144         api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
145                 func(p common.UnsubscribeParams) middleware.Responder {
146                         if err := delSubscription(p.SubscriptionID); err == nil {
147                                 return common.NewUnsubscribeNoContent()
148                         }
149                         return common.NewUnsubscribeInternalServerError()
150                 })
151
152         server := restapi.NewServer(api)
153         defer server.Shutdown()
154         server.Host = r.localAddr
155         server.Port = r.localPort
156
157         Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
158         if err := server.Serve(); err != nil {
159                 return err
160         }
161         return nil
162 }
163
164 // Server interface: send notification to client
165 func (r *Subscriber) Notify(resp *models.SubscriptionResponse, clientEndpoint string) (err error) {
166         respData, err := json.Marshal(resp)
167         if err != nil {
168                 Logger.Error("json.Marshal failed: %v", err)
169                 return err
170         }
171
172         ep, _, _ := net.SplitHostPort(clientEndpoint)
173         _, port, _ := net.SplitHostPort(viper.GetString("local.host"))
174         clientUrl := fmt.Sprintf("http://%s:%s%s", ep, port, r.clientUrl)
175
176         retries := viper.GetInt("subscription.retryCount")
177         if retries == 0 {
178                 retries = 10
179         }
180
181         delay := viper.GetInt("subscription.retryDelay")
182         if delay == 0 {
183                 delay = 5
184         }
185
186         for i := 0; i < retries; i++ {
187                 r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
188                 if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
189                         break
190                 }
191
192                 if err != nil {
193                         Logger.Error("%v", err)
194                 }
195                 if r != nil && r.StatusCode != http.StatusOK {
196                         Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
197                 }
198                 time.Sleep(time.Duration(delay) * time.Second)
199         }
200
201         return err
202 }
203
204 // Subscription interface for xApp: Response callback
205 func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
206         r.clientCB = c
207 }
208
209 // Subscription interface for xApp: REPORT
210 func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (*apimodel.SubscriptionResponse, error) {
211         params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p)
212         result, err := r.CreateTransport().Report.SubscribeReport(params)
213         if err != nil {
214                 return &apimodel.SubscriptionResponse{}, err
215         }
216
217         return result.Payload, err
218 }
219
220 // Subscription interface for xApp: POLICY
221 func (r *Subscriber) SubscribePolicy(p *apimodel.PolicyParams) (*apimodel.SubscriptionResponse, error) {
222         params := apipolicy.NewSubscribePolicyParamsWithTimeout(r.timeout).WithPolicyParams(p)
223         result, err := r.CreateTransport().Policy.SubscribePolicy(params)
224         if err != nil {
225                 return &apimodel.SubscriptionResponse{}, err
226         }
227
228         return result.Payload, err
229 }
230
231 // Subscription interface for xApp: DELETE
232 func (r *Subscriber) UnSubscribe(subId string) error {
233         params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
234         _, err := r.CreateTransport().Common.Unsubscribe(params)
235
236         return err
237 }
238
239 // Subscription interface for xApp: QUERY
240 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
241         resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
242         if err != nil {
243                 return models.SubscriptionList{}, err
244         }
245
246         defer resp.Body.Close()
247
248         contents, err := ioutil.ReadAll(resp.Body)
249         if err != nil {
250                 return models.SubscriptionList{}, err
251         }
252
253         subscriptions := models.SubscriptionList{}
254         err = json.Unmarshal([]byte(string(contents)), &subscriptions)
255         if err != nil {
256                 return models.SubscriptionList{}, err
257         }
258
259         return subscriptions, nil
260 }
261
262 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
263         return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)
264 }