1bc7925f09947b020524da1a0194b3e51c037727
[ric-plt/appmgr.git] / pkg / resthooks / resthooks.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package resthooks
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         sdl "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
27         cmap "github.com/orcaman/concurrent-map"
28         "github.com/segmentio/ksuid"
29         "net/http"
30         "strings"
31         "time"
32
33         "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr"
34         "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/models"
35 )
36
37 func NewResthook(restoreData bool) *Resthook {
38         return createResthook(restoreData, sdl.NewSdlInstance("appmgr", sdl.NewDatabase()), sdl.NewSdlInstance("appdb", sdl.NewDatabase()))
39 }
40
41 func createResthook(restoreData bool, sdlInst iSdl, sdlInst2 iSdl) *Resthook {
42         rh := &Resthook{
43                 client: &http.Client{},
44                 db:     sdlInst,
45                 db2:    sdlInst2,
46         }
47
48         if restoreData {
49                 rh.subscriptions = rh.RestoreSubscriptions()
50         } else {
51                 rh.subscriptions = cmap.New()
52         }
53         return rh
54 }
55
56 func (rh *Resthook) AddSubscription(sr models.SubscriptionRequest) *models.SubscriptionResponse {
57         for v := range rh.subscriptions.IterBuffered() {
58                 r := v.Val.(SubscriptionInfo).req
59                 if *r.Data.TargetURL == *sr.Data.TargetURL && r.Data.EventType == sr.Data.EventType {
60                         appmgr.Logger.Info("Similar subscription already exists!")
61                         resp := v.Val.(SubscriptionInfo).resp
62                         return &resp
63                 }
64         }
65
66         key := ksuid.New().String()
67         resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: sr.Data.EventType}
68         rh.subscriptions.Set(key, SubscriptionInfo{key, sr, resp})
69         rh.StoreSubscriptions(rh.subscriptions)
70
71         appmgr.Logger.Info("Sub: New subscription added: key=%s targetUl=%s eventType=%s", key, *sr.Data.TargetURL, sr.Data.EventType)
72         return &resp
73 }
74
75 func (rh *Resthook) DeleteSubscription(id string) (*models.SubscriptionResponse, bool) {
76         if v, found := rh.subscriptions.Get(id); found {
77                 appmgr.Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(SubscriptionInfo).req)
78
79                 rh.subscriptions.Remove(id)
80                 rh.StoreSubscriptions(rh.subscriptions)
81                 resp := v.(SubscriptionInfo).resp
82                 return &resp, found
83         }
84         return &models.SubscriptionResponse{}, false
85 }
86
87 func (rh *Resthook) ModifySubscription(id string, req models.SubscriptionRequest) (*models.SubscriptionResponse, bool) {
88         if s, found := rh.subscriptions.Get(id); found {
89                 appmgr.Logger.Info("Subscription id=%s found: %v ... updating", id, s.(SubscriptionInfo).req)
90
91                 resp := models.SubscriptionResponse{ID: id, Version: 0, EventType: req.Data.EventType}
92                 rh.subscriptions.Set(id, SubscriptionInfo{id, req, resp})
93                 rh.StoreSubscriptions(rh.subscriptions)
94
95                 return &resp, found
96         }
97         return &models.SubscriptionResponse{}, false
98 }
99
100 func (rh *Resthook) GetAllSubscriptions() (hooks models.AllSubscriptions) {
101         hooks = models.AllSubscriptions{}
102         for v := range rh.subscriptions.IterBuffered() {
103                 s := v.Val.(SubscriptionInfo)
104                 r := v.Val.(SubscriptionInfo).req
105                 hooks = append(hooks, &models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, s.Id})
106         }
107
108         return hooks
109 }
110
111 func (rh *Resthook) GetSubscriptionById(id string) (models.Subscription, bool) {
112         if v, found := rh.subscriptions.Get(id); found {
113                 appmgr.Logger.Info("Subscription id=%s found: %v", id, v.(SubscriptionInfo).req)
114                 r := v.(SubscriptionInfo).req
115                 return models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, id}, found
116         }
117         return models.Subscription{}, false
118 }
119
120 func (rh *Resthook) PublishSubscription(x models.Xapp, et models.EventType) {
121         rh.NotifyClients(models.AllDeployedXapps{&x}, et)
122 }
123
124 func (rh *Resthook) NotifyClients(xapps models.AllDeployedXapps, et models.EventType) {
125         if len(xapps) == 0 || len(rh.subscriptions) == 0 {
126                 appmgr.Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(rh.subscriptions))
127                 return
128         }
129
130         rh.Seq = rh.Seq + 1
131         for v := range rh.subscriptions.Iter() {
132                 go rh.notify(xapps, et, v.Val.(SubscriptionInfo), rh.Seq)
133         }
134 }
135
136 func (rh *Resthook) notify(xapps models.AllDeployedXapps, et models.EventType, s SubscriptionInfo, seq int64) error {
137         xappData, err := json.Marshal(xapps)
138         if err != nil {
139                 appmgr.Logger.Info("json.Marshal failed: %v", err)
140                 return err
141         }
142
143         // TODO: Use models.SubscriptionNotification instead of internal ...
144         notif := SubscriptionNotification{ID: s.Id, Version: seq, Event: string(et), XApps: string(xappData)}
145         jsonData, err := json.Marshal(notif)
146         if err != nil {
147                 appmgr.Logger.Info("json.Marshal failed: %v", err)
148                 return err
149         }
150
151         // Execute the request with retry policy
152         return rh.retry(s, func() error {
153                 appmgr.Logger.Info("Posting notification to TargetURL=%s: %v", *s.req.Data.TargetURL, notif)
154                 resp, err := http.Post(*s.req.Data.TargetURL, "application/json", bytes.NewBuffer(jsonData))
155                 if err != nil {
156                         appmgr.Logger.Info("Posting to subscription failed: %v", err)
157                         return err
158                 }
159
160                 if resp.StatusCode != http.StatusOK {
161                         appmgr.Logger.Info("Client returned error code: %d", resp.StatusCode)
162                         return err
163                 }
164
165                 appmgr.Logger.Info("subscription to '%s' dispatched, response code: %d", *s.req.Data.TargetURL, resp.StatusCode)
166                 return nil
167         })
168 }
169
170 func (rh *Resthook) retry(s SubscriptionInfo, fn func() error) error {
171         if err := fn(); err != nil {
172                 // Todo: use exponential backoff, or similar mechanism
173                 if *s.req.Data.MaxRetries--; *s.req.Data.MaxRetries > 0 {
174                         time.Sleep(time.Duration(*s.req.Data.RetryTimer) * time.Second)
175                         return rh.retry(s, fn)
176                 }
177                 rh.subscriptions.Remove(s.Id)
178                 return err
179         }
180         return nil
181 }
182
183 func (rh *Resthook) StoreSubscriptions(m cmap.ConcurrentMap) {
184         for v := range m.Iter() {
185                 s := v.Val.(SubscriptionInfo)
186                 data, err := json.Marshal(s.req)
187                 if err != nil {
188                         appmgr.Logger.Error("json.marshal failed: %v ", err.Error())
189                         return
190                 }
191
192                 if err := rh.db.Set(s.Id, data); err != nil {
193                         appmgr.Logger.Error("DB.session.Set failed: %v ", err.Error())
194                 }
195         }
196 }
197
198 func (rh *Resthook) RestoreSubscriptions() (m cmap.ConcurrentMap) {
199         rh.VerifyDBConnection()
200
201         m = cmap.New()
202         keys, err := rh.db.GetAll()
203         if err != nil {
204                 appmgr.Logger.Error("DB.session.GetAll failed: %v ", err.Error())
205                 return
206         }
207
208         for _, key := range keys {
209                 value, err := rh.db.Get([]string{key})
210                 if err != nil {
211                         appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
212                         return
213                 }
214
215                 var item models.SubscriptionRequest
216                 if err = json.Unmarshal([]byte(value[key].(string)), &item); err != nil {
217                         appmgr.Logger.Error("json.Unmarshal failed: %v ", err.Error())
218                         return
219                 }
220
221                 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: item.Data.EventType}
222                 m.Set(key, SubscriptionInfo{key, item, resp})
223         }
224
225         return m
226 }
227
228 func (rh *Resthook) VerifyDBConnection() {
229         // Test DB connection, and wait until ready!
230         for {
231                 if _, err := rh.db.GetAll(); err == nil {
232                         return
233                 }
234                 appmgr.Logger.Error("Database connection not ready, waiting ...")
235                 time.Sleep(time.Duration(5 * time.Second))
236         }
237 }
238
239 func (rh *Resthook) FlushSubscriptions() {
240         rh.db.RemoveAll()
241         rh.subscriptions = cmap.New()
242 }
243
244 func (rh *Resthook) UpdateAppData(params models.RegisterRequest, updateflag bool) {
245         appmgr.Logger.Info("Endpoint to be added in SDL: %s", *params.HTTPEndpoint)
246         if updateflag == false {
247                 return
248         }
249
250         //Ensure config is empty string, as we dont want to store config in DB
251         if params.Config != "" {
252                 params.Config = ""
253         }
254
255         value, err := rh.db2.Get([]string{"endpoints"})
256         if err != nil {
257                 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
258                 return
259         }
260
261         appmgr.Logger.Info("List of Apps in SDL: %v", value["endpoints"])
262         var appsindb []string
263         var data string
264         dbflag := false
265
266         if value["endpoints"] != nil {
267                 formstring := fmt.Sprintf("%s", value["endpoints"])
268                 newstring := strings.Split(formstring, " ")
269                 for i, _ := range newstring {
270                         if len(newstring) == 1 && strings.Contains(newstring[i], *params.HTTPEndpoint) {
271                                 appmgr.Logger.Info("Removing Key %s", *params.HTTPEndpoint)
272                                 rh.db2.Remove([]string{"endpoints"})
273                                 dbflag = true
274                                 break
275                         }
276                         if strings.Contains(newstring[i], *params.HTTPEndpoint) {
277                                 appmgr.Logger.Info("Removing entry %s", *params.HTTPEndpoint)
278                                 dbflag = true
279                                 continue
280                         }
281                         appsindb = append(appsindb, newstring[i])
282                         data = strings.Join(appsindb, " ")
283                 }
284                 rh.db2.Set("endpoints", strings.TrimSpace(data))
285         }
286
287         if dbflag == false {
288                 xappData, err := json.Marshal(params)
289                 if err != nil {
290                         appmgr.Logger.Info("json.Marshal failed: %v", err)
291                         return
292                 }
293                 appsindb = append(appsindb, string(xappData))
294                 data = strings.Join(appsindb, " ")
295                 rh.db2.Set("endpoints", strings.TrimSpace(data))
296         }
297 }
298
299 func (rh *Resthook) GetAppsInSDL() *string {
300         value, err := rh.db2.Get([]string{"endpoints"})
301         if err != nil {
302                 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
303                 return nil
304         }
305         appmgr.Logger.Info("List of Apps in SDL: %v", value["endpoints"])
306         if value["endpoints"] == nil || value["endpoints"] == "" {
307                 return nil
308         } else {
309                 apps := fmt.Sprintf("%s", value["endpoints"])
310                 return &apps
311         }
312 }