8f49e996867a179635c21efe98bc9e7f631852d8
[ric-plt/xapp-frame.git] / pkg / xapp / subscription.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "io/ioutil"
27         "net/http"
28         "os"
29         "time"
30
31         "github.com/go-openapi/loads"
32         httptransport "github.com/go-openapi/runtime/client"
33         "github.com/go-openapi/runtime/middleware"
34         "github.com/go-openapi/strfmt"
35         "github.com/spf13/viper"
36
37         apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
38         apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
39         apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
40
41         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
42         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
43         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
44         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
45 )
46
47 type SubscriptionHandler func(interface{}) (*models.SubscriptionResponse, int)
48 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
49 type SubscriptionDeleteHandler func(string) int
50 type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
51
52 type Subscriber struct {
53         localAddr  string
54         localPort  int
55         remoteHost string
56         remoteUrl  string
57         remoteProt []string
58         timeout    time.Duration
59         clientUrl  string
60         clientCB   SubscriptionResponseCallback
61 }
62
63 func NewSubscriber(host string, timo int) *Subscriber {
64         if host == "" {
65                 pltnamespace := os.Getenv("PLT_NAMESPACE")
66                 if pltnamespace == "" {
67                         pltnamespace = "ricplt"
68                 }
69                 host = fmt.Sprintf("service-%s-submgr-http.%s:8088", pltnamespace, pltnamespace)
70         }
71
72         if timo == 0 {
73                 timo = 20
74         }
75
76         r := &Subscriber{
77                 remoteHost: host,
78                 remoteUrl:  "/ric/v1",
79                 remoteProt: []string{"http"},
80                 timeout:    time.Duration(timo) * time.Second,
81                 localAddr:  "0.0.0.0",
82                 localPort:  8088,
83                 clientUrl:  "/ric/v1/subscriptions/response",
84         }
85         Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
86
87         return r
88 }
89
90 func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
91         if req.Body != nil {
92                 var resp apimodel.SubscriptionResponse
93                 if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
94                         if r.clientCB != nil {
95                                 r.clientCB(&resp)
96                         }
97                 }
98                 req.Body.Close()
99         }
100 }
101
102 // Server interface: listen and receive subscription requests
103 func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
104         swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
105         if err != nil {
106                 return err
107         }
108
109         api := operations.NewXappFrameworkAPI(swaggerSpec)
110
111         // Subscription: Query
112         api.CommonGetAllSubscriptionsHandler = common.GetAllSubscriptionsHandlerFunc(
113                 func(p common.GetAllSubscriptionsParams) middleware.Responder {
114                         if resp, err := getSubscription(); err == nil {
115                                 return common.NewGetAllSubscriptionsOK().WithPayload(resp)
116                         }
117                         return common.NewGetAllSubscriptionsInternalServerError()
118                 })
119
120         // Subscription: Subscribe
121         api.CommonSubscribeHandler = common.SubscribeHandlerFunc(
122                 func(params common.SubscribeParams) middleware.Responder {
123                         resp, retCode := createSubscription(params.SubscriptionParams)
124                         if retCode != common.SubscribeCreatedCode {
125                                 if retCode == common.SubscribeBadRequestCode {
126                                         return common.NewSubscribeBadRequest()
127                                 } else if retCode == common.SubscribeNotFoundCode {
128                                         return common.NewSubscribeNotFound()
129                                 } else if retCode == common.SubscribeServiceUnavailableCode {
130                                         return common.NewSubscribeServiceUnavailable()
131                                 } else {
132                                         return common.NewSubscribeInternalServerError()
133                                 }
134                         }
135                         return common.NewSubscribeCreated().WithPayload(resp)
136                 })
137
138         // Subscription: Unsubscribe
139         api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
140                 func(p common.UnsubscribeParams) middleware.Responder {
141                         retCode := delSubscription(p.SubscriptionID)
142                         if retCode != common.UnsubscribeNoContentCode {
143                                 if retCode == common.UnsubscribeBadRequestCode {
144                                         return common.NewUnsubscribeBadRequest()
145                                 } else {
146                                         return common.NewUnsubscribeInternalServerError()
147                                 }
148                         }
149                         return common.NewUnsubscribeNoContent()
150                 })
151
152         server := restapi.NewServer(api)
153         defer server.Shutdown()
154         server.Host = r.localAddr
155         server.Port = r.localPort
156
157         Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
158         if err := server.Serve(); err != nil {
159                 return err
160         }
161         return nil
162 }
163
164 // Server interface: send notification to client
165 func (r *Subscriber) Notify(resp *models.SubscriptionResponse, ep models.SubscriptionParamsClientEndpoint) (err error) {
166         respData, err := json.Marshal(resp)
167         if err != nil {
168                 Logger.Error("json.Marshal failed: %v", err)
169                 return err
170         }
171
172         clientUrl := fmt.Sprintf("http://%s:%d%s", ep.Host, *ep.HTTPPort, r.clientUrl)
173
174         retries := viper.GetInt("subscription.retryCount")
175         if retries == 0 {
176                 retries = 10
177         }
178
179         delay := viper.GetInt("subscription.retryDelay")
180         if delay == 0 {
181                 delay = 5
182         }
183
184         for i := 0; i < retries; i++ {
185                 r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
186                 if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
187                         break
188                 }
189
190                 if err != nil {
191                         Logger.Error("%v", err)
192                 }
193                 if r != nil && r.StatusCode != http.StatusOK {
194                         Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
195                 }
196                 time.Sleep(time.Duration(delay) * time.Second)
197         }
198
199         return err
200 }
201
202 // Subscription interface for xApp: Response callback
203 func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
204         r.clientCB = c
205 }
206
207 // Subscription interface for xApp
208 func (r *Subscriber) Subscribe(p *apimodel.SubscriptionParams) (*apimodel.SubscriptionResponse, error) {
209         params := apicommon.NewSubscribeParamsWithTimeout(r.timeout).WithSubscriptionParams(p)
210         result, err := r.CreateTransport().Common.Subscribe(params)
211         if err != nil {
212                 return &apimodel.SubscriptionResponse{}, err
213         }
214         return result.Payload, err
215 }
216
217 // Subscription interface for xApp: DELETE
218 func (r *Subscriber) Unsubscribe(subId string) error {
219         params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
220         _, err := r.CreateTransport().Common.Unsubscribe(params)
221
222         return err
223 }
224
225 // Subscription interface for xApp: QUERY
226 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
227         resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
228         if err != nil {
229                 return models.SubscriptionList{}, err
230         }
231
232         defer resp.Body.Close()
233
234         contents, err := ioutil.ReadAll(resp.Body)
235         if err != nil {
236                 return models.SubscriptionList{}, err
237         }
238
239         subscriptions := models.SubscriptionList{}
240         err = json.Unmarshal([]byte(string(contents)), &subscriptions)
241         if err != nil {
242                 return models.SubscriptionList{}, err
243         }
244
245         return subscriptions, nil
246 }
247
248 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
249         return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)
250 }