f45197b2a6b63754e81b9222480c3af4a7d1f842
[ric-plt/appmgr.git] / pkg / resthooks / resthooks.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package resthooks
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         sdl "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
27         cmap "github.com/orcaman/concurrent-map"
28         "github.com/segmentio/ksuid"
29         "net/http"
30         "strings"
31         "time"
32
33         "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr"
34         "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/models"
35 )
36
37 //To encapsulate xApp Manager's keys under their own namespace in a DB
38 const (
39         appmgrSdlNs = "appmgr"
40         appDbSdlNs  = "appdb"
41 )
42
43 func NewResthook(restoreData bool) *Resthook {
44         return createResthook(restoreData, sdl.NewSyncStorage())
45 }
46
47 func createResthook(restoreData bool, sdlInst iSdl) *Resthook {
48         rh := &Resthook{
49                 client: &http.Client{},
50                 db:     sdlInst,
51         }
52
53         if restoreData {
54                 rh.subscriptions = rh.RestoreSubscriptions()
55         } else {
56                 rh.subscriptions = cmap.New()
57         }
58         return rh
59 }
60
61 func (rh *Resthook) AddSubscription(sr models.SubscriptionRequest) *models.SubscriptionResponse {
62         for v := range rh.subscriptions.IterBuffered() {
63                 r := v.Val.(SubscriptionInfo).req
64                 if *r.Data.TargetURL == *sr.Data.TargetURL && r.Data.EventType == sr.Data.EventType {
65                         appmgr.Logger.Info("Similar subscription already exists!")
66                         resp := v.Val.(SubscriptionInfo).resp
67                         return &resp
68                 }
69         }
70
71         key := ksuid.New().String()
72         resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: sr.Data.EventType}
73         rh.subscriptions.Set(key, SubscriptionInfo{key, sr, resp})
74         rh.StoreSubscriptions(rh.subscriptions)
75
76         appmgr.Logger.Info("Sub: New subscription added: key=%s targetUl=%s eventType=%s", key, *sr.Data.TargetURL, sr.Data.EventType)
77         return &resp
78 }
79
80 func (rh *Resthook) DeleteSubscription(id string) (*models.SubscriptionResponse, bool) {
81         if v, found := rh.subscriptions.Get(id); found {
82                 appmgr.Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(SubscriptionInfo).req)
83
84                 rh.subscriptions.Remove(id)
85                 rh.StoreSubscriptions(rh.subscriptions)
86                 resp := v.(SubscriptionInfo).resp
87                 return &resp, found
88         }
89         return &models.SubscriptionResponse{}, false
90 }
91
92 func (rh *Resthook) ModifySubscription(id string, req models.SubscriptionRequest) (*models.SubscriptionResponse, bool) {
93         if s, found := rh.subscriptions.Get(id); found {
94                 appmgr.Logger.Info("Subscription id=%s found: %v ... updating", id, s.(SubscriptionInfo).req)
95
96                 resp := models.SubscriptionResponse{ID: id, Version: 0, EventType: req.Data.EventType}
97                 rh.subscriptions.Set(id, SubscriptionInfo{id, req, resp})
98                 rh.StoreSubscriptions(rh.subscriptions)
99
100                 return &resp, found
101         }
102         return &models.SubscriptionResponse{}, false
103 }
104
105 func (rh *Resthook) GetAllSubscriptions() (hooks models.AllSubscriptions) {
106         hooks = models.AllSubscriptions{}
107         for v := range rh.subscriptions.IterBuffered() {
108                 s := v.Val.(SubscriptionInfo)
109                 r := v.Val.(SubscriptionInfo).req
110                 hooks = append(hooks, &models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, s.Id})
111         }
112
113         return hooks
114 }
115
116 func (rh *Resthook) GetSubscriptionById(id string) (models.Subscription, bool) {
117         if v, found := rh.subscriptions.Get(id); found {
118                 appmgr.Logger.Info("Subscription id=%s found: %v", id, v.(SubscriptionInfo).req)
119                 r := v.(SubscriptionInfo).req
120                 return models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, id}, found
121         }
122         return models.Subscription{}, false
123 }
124
125 func (rh *Resthook) PublishSubscription(x models.Xapp, et models.EventType) {
126         rh.NotifyClients(models.AllDeployedXapps{&x}, et)
127 }
128
129 func (rh *Resthook) NotifyClients(xapps models.AllDeployedXapps, et models.EventType) {
130         if len(xapps) == 0 || len(rh.subscriptions) == 0 {
131                 appmgr.Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(rh.subscriptions))
132                 return
133         }
134
135         rh.Seq = rh.Seq + 1
136         for v := range rh.subscriptions.Iter() {
137                 go rh.notify(xapps, et, v.Val.(SubscriptionInfo), rh.Seq)
138         }
139 }
140
141 func (rh *Resthook) notify(xapps models.AllDeployedXapps, et models.EventType, s SubscriptionInfo, seq int64) error {
142         xappData, err := json.Marshal(xapps)
143         if err != nil {
144                 appmgr.Logger.Info("json.Marshal failed: %v", err)
145                 return err
146         }
147
148         // TODO: Use models.SubscriptionNotification instead of internal ...
149         notif := SubscriptionNotification{ID: s.Id, Version: seq, Event: string(et), XApps: string(xappData)}
150         jsonData, err := json.Marshal(notif)
151         if err != nil {
152                 appmgr.Logger.Info("json.Marshal failed: %v", err)
153                 return err
154         }
155
156         // Execute the request with retry policy
157         return rh.retry(s, func() error {
158                 appmgr.Logger.Info("Posting notification to TargetURL=%s: %v", *s.req.Data.TargetURL, notif)
159                 resp, err := http.Post(*s.req.Data.TargetURL, "application/json", bytes.NewBuffer(jsonData))
160                 if err != nil {
161                         appmgr.Logger.Info("Posting to subscription failed: %v", err)
162                         return err
163                 }
164
165                 if resp.StatusCode != http.StatusOK {
166                         appmgr.Logger.Info("Client returned error code: %d", resp.StatusCode)
167                         return err
168                 }
169
170                 appmgr.Logger.Info("subscription to '%s' dispatched, response code: %d", *s.req.Data.TargetURL, resp.StatusCode)
171                 return nil
172         })
173 }
174
175 func (rh *Resthook) retry(s SubscriptionInfo, fn func() error) error {
176         if err := fn(); err != nil {
177                 // Todo: use exponential backoff, or similar mechanism
178                 if *s.req.Data.MaxRetries--; *s.req.Data.MaxRetries > 0 {
179                         time.Sleep(time.Duration(*s.req.Data.RetryTimer) * time.Second)
180                         return rh.retry(s, fn)
181                 }
182                 rh.subscriptions.Remove(s.Id)
183                 return err
184         }
185         return nil
186 }
187
188 func (rh *Resthook) StoreSubscriptions(m cmap.ConcurrentMap) {
189         for v := range m.Iter() {
190                 s := v.Val.(SubscriptionInfo)
191                 data, err := json.Marshal(s.req)
192                 if err != nil {
193                         appmgr.Logger.Error("json.marshal failed: %v ", err.Error())
194                         return
195                 }
196
197                 if err := rh.db.Set(appmgrSdlNs, s.Id, data); err != nil {
198                         appmgr.Logger.Error("DB.session.Set failed: %v ", err.Error())
199                 }
200         }
201 }
202
203 func (rh *Resthook) RestoreSubscriptions() (m cmap.ConcurrentMap) {
204         rh.VerifyDBConnection()
205
206         m = cmap.New()
207         keys, err := rh.db.GetAll(appmgrSdlNs)
208         if err != nil {
209                 appmgr.Logger.Error("DB.session.GetAll failed: %v ", err.Error())
210                 return
211         }
212
213         for _, key := range keys {
214                 value, err := rh.db.Get(appmgrSdlNs, []string{key})
215                 if err != nil {
216                         appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
217                         return
218                 }
219
220                 var item models.SubscriptionRequest
221                 if err = json.Unmarshal([]byte(value[key].(string)), &item); err != nil {
222                         appmgr.Logger.Error("json.Unmarshal failed: %v ", err.Error())
223                         return
224                 }
225
226                 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: item.Data.EventType}
227                 m.Set(key, SubscriptionInfo{key, item, resp})
228         }
229
230         return m
231 }
232
233 func (rh *Resthook) VerifyDBConnection() {
234         // Test DB connection, and wait until ready!
235         for {
236                 if _, err := rh.db.GetAll(appmgrSdlNs); err == nil {
237                         return
238                 }
239                 appmgr.Logger.Error("Database connection not ready, waiting ...")
240                 time.Sleep(time.Duration(5 * time.Second))
241         }
242 }
243
244 func (rh *Resthook) FlushSubscriptions() {
245         rh.db.RemoveAll(appmgrSdlNs)
246         rh.subscriptions = cmap.New()
247 }
248
249 func (rh *Resthook) UpdateAppData(params models.RegisterRequest, updateflag bool) {
250         appmgr.Logger.Info("Endpoint to be added in SDL: %s", *params.HTTPEndpoint)
251         if updateflag == false {
252                 return
253         }
254
255         //Ensure config is empty string, as we dont want to store config in DB
256         if params.Config != "" {
257                 params.Config = ""
258         }
259
260         value, err := rh.db.Get(appDbSdlNs, []string{"endpoints"})
261         if err != nil {
262                 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
263                 return
264         }
265
266         appmgr.Logger.Info("List of Apps in SDL: %v", value["endpoints"])
267         var appsindb []string
268         var data string
269         dbflag := false
270
271         if value["endpoints"] != nil {
272                 formstring := fmt.Sprintf("%s", value["endpoints"])
273                 newstring := strings.Split(formstring, " ")
274                 for i, _ := range newstring {
275                         if len(newstring) == 1 && strings.Contains(newstring[i], *params.HTTPEndpoint) {
276                                 appmgr.Logger.Info("Removing Key %s", *params.HTTPEndpoint)
277                                 rh.db.Remove(appDbSdlNs, []string{"endpoints"})
278                                 dbflag = true
279                                 break
280                         }
281                         if strings.Contains(newstring[i], *params.HTTPEndpoint) {
282                                 appmgr.Logger.Info("Removing entry %s", *params.HTTPEndpoint)
283                                 dbflag = true
284                                 continue
285                         }
286                         appsindb = append(appsindb, newstring[i])
287                         data = strings.Join(appsindb, " ")
288                 }
289                 rh.db.Set(appDbSdlNs, "endpoints", strings.TrimSpace(data))
290         }
291
292         if dbflag == false {
293                 xappData, err := json.Marshal(params)
294                 if err != nil {
295                         appmgr.Logger.Info("json.Marshal failed: %v", err)
296                         return
297                 }
298                 appsindb = append(appsindb, string(xappData))
299                 data = strings.Join(appsindb, " ")
300                 rh.db.Set(appDbSdlNs, "endpoints", strings.TrimSpace(data))
301         }
302 }
303
304 func (rh *Resthook) GetAppsInSDL() *string {
305         value, err := rh.db.Get(appDbSdlNs, []string{"endpoints"})
306         if err != nil {
307                 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
308                 return nil
309         }
310         appmgr.Logger.Info("List of Apps in SDL: %v", value["endpoints"])
311         if value["endpoints"] == nil || value["endpoints"] == "" {
312                 return nil
313         } else {
314                 apps := fmt.Sprintf("%s", value["endpoints"])
315                 return &apps
316         }
317 }