Subscription REST interface update
[ric-plt/xapp-frame.git] / pkg / xapp / subscription.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "io/ioutil"
27         "net/http"
28         "os"
29         "time"
30
31         "github.com/go-openapi/loads"
32         httptransport "github.com/go-openapi/runtime/client"
33         "github.com/go-openapi/runtime/middleware"
34         "github.com/go-openapi/strfmt"
35         "github.com/spf13/viper"
36
37         apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
38         apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
39         apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
40
41         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
42         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
43         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
44         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
45 )
46
47 type SubscriptionHandler func(interface{}) (*models.SubscriptionResponse, int)
48 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
49 type SubscriptionDeleteHandler func(string) int
50 type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
51
52 type Subscriber struct {
53         localAddr  string
54         localPort  int
55         remoteHost string
56         remoteUrl  string
57         remoteProt []string
58         timeout    time.Duration
59         clientUrl  string
60         clientCB   SubscriptionResponseCallback
61 }
62
63 func NewSubscriber(host string, timo int) *Subscriber {
64         if host == "" {
65                 pltnamespace := os.Getenv("PLT_NAMESPACE")
66                 if pltnamespace == "" {
67                         pltnamespace = "ricplt"
68                 }
69                 host = fmt.Sprintf("service-%s-submgr-http.%s:8088", pltnamespace, pltnamespace)
70         }
71
72         if timo == 0 {
73                 timo = 20
74         }
75
76         r := &Subscriber{
77                 remoteHost: host,
78                 remoteUrl:  "/ric/v1",
79                 remoteProt: []string{"http"},
80                 timeout:    time.Duration(timo) * time.Second,
81                 localAddr:  "0.0.0.0",
82                 localPort:  8088,
83                 clientUrl:  "/ric/v1/subscriptions/response",
84         }
85         Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
86
87         return r
88 }
89
90 func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
91         if req.Body != nil {
92                 var resp apimodel.SubscriptionResponse
93                 if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
94                         if r.clientCB != nil {
95                                 r.clientCB(&resp)
96                         }
97                 }
98                 req.Body.Close()
99         }
100 }
101
102 // Server interface: listen and receive subscription requests
103 func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
104         swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
105         if err != nil {
106                 return err
107         }
108
109         api := operations.NewXappFrameworkAPI(swaggerSpec)
110
111         // Subscription: Query
112         api.CommonGetAllSubscriptionsHandler = common.GetAllSubscriptionsHandlerFunc(
113                 func(p common.GetAllSubscriptionsParams) middleware.Responder {
114                         if resp, err := getSubscription(); err == nil {
115                                 return common.NewGetAllSubscriptionsOK().WithPayload(resp)
116                         }
117                         return common.NewGetAllSubscriptionsInternalServerError()
118                 })
119
120         // Subscription: Subscribe
121         api.CommonSubscribeHandler = common.SubscribeHandlerFunc(
122                 func(params common.SubscribeParams) middleware.Responder {
123                         Logger.Error("Subscribe: Params=%+v", params.SubscriptionParams)
124                         resp, retCode := createSubscription(params.SubscriptionParams)
125                         if retCode != common.SubscribeCreatedCode {
126                                 if retCode == common.SubscribeBadRequestCode {
127                                         return common.NewSubscribeBadRequest()
128                                 } else {
129                                         return common.NewSubscribeInternalServerError()
130                                 }
131                         }
132                         return common.NewSubscribeCreated().WithPayload(resp)
133                 })
134
135         // Subscription: Unsubscribe
136         api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
137                 func(p common.UnsubscribeParams) middleware.Responder {
138                         Logger.Error("Unsubscribe: SubscriptionID=%+v", p.SubscriptionID)
139                         retCode := delSubscription(p.SubscriptionID)
140                         if retCode != common.UnsubscribeNoContentCode {
141                                 if retCode == common.UnsubscribeBadRequestCode {
142                                         return common.NewUnsubscribeBadRequest()
143                                 } else {
144                                         return common.NewUnsubscribeInternalServerError()
145                                 }
146                         }
147                         return common.NewUnsubscribeNoContent()
148                 })
149
150         server := restapi.NewServer(api)
151         defer server.Shutdown()
152         server.Host = r.localAddr
153         server.Port = r.localPort
154
155         Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
156         if err := server.Serve(); err != nil {
157                 return err
158         }
159         return nil
160 }
161
162 // Server interface: send notification to client
163 func (r *Subscriber) Notify(resp *models.SubscriptionResponse, ep models.SubscriptionParamsClientEndpoint) (err error) {
164         respData, err := json.Marshal(resp)
165         if err != nil {
166                 Logger.Error("json.Marshal failed: %v", err)
167                 return err
168         }
169
170         clientUrl := fmt.Sprintf("http://%s:%d%s", ep.Host, *ep.HTTPPort, r.clientUrl)
171
172         retries := viper.GetInt("subscription.retryCount")
173         if retries == 0 {
174                 retries = 10
175         }
176
177         delay := viper.GetInt("subscription.retryDelay")
178         if delay == 0 {
179                 delay = 5
180         }
181
182         for i := 0; i < retries; i++ {
183                 r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
184                 if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
185                         break
186                 }
187
188                 if err != nil {
189                         Logger.Error("%v", err)
190                 }
191                 if r != nil && r.StatusCode != http.StatusOK {
192                         Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
193                 }
194                 time.Sleep(time.Duration(delay) * time.Second)
195         }
196
197         return err
198 }
199
200 // Subscription interface for xApp: Response callback
201 func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
202         r.clientCB = c
203 }
204
205 // Subscription interface for xApp
206 func (r *Subscriber) Subscribe(p *apimodel.SubscriptionParams) (*apimodel.SubscriptionResponse, error) {
207         params := apicommon.NewSubscribeParamsWithTimeout(r.timeout).WithSubscriptionParams(p)
208         result, err := r.CreateTransport().Common.Subscribe(params)
209         if err != nil {
210                 return &apimodel.SubscriptionResponse{}, err
211         }
212
213         return result.Payload, err
214 }
215
216 // Subscription interface for xApp: DELETE
217 func (r *Subscriber) Unsubscribe(subId string) error {
218         params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
219         _, err := r.CreateTransport().Common.Unsubscribe(params)
220
221         return err
222 }
223
224 // Subscription interface for xApp: QUERY
225 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
226         resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
227         if err != nil {
228                 return models.SubscriptionList{}, err
229         }
230
231         defer resp.Body.Close()
232
233         contents, err := ioutil.ReadAll(resp.Body)
234         if err != nil {
235                 return models.SubscriptionList{}, err
236         }
237
238         subscriptions := models.SubscriptionList{}
239         err = json.Unmarshal([]byte(string(contents)), &subscriptions)
240         if err != nil {
241                 return models.SubscriptionList{}, err
242         }
243
244         return subscriptions, nil
245 }
246
247 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
248         return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)
249 }