Initial commit for Xapp Orchestration
[ric-plt/appmgr.git] / pkg / resthooks / resthooks.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package resthooks
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         sdl "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
27         cmap "github.com/orcaman/concurrent-map"
28         "github.com/segmentio/ksuid"
29         "net/http"
30         "strings"
31         "time"
32
33         "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr"
34         "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/models"
35 )
36
37 func NewResthook(restoreData bool) *Resthook {
38         return createResthook(restoreData, sdl.NewSdlInstance("appmgr", sdl.NewDatabase()),sdl.NewSdlInstance("appdb", sdl.NewDatabase()))
39 }
40
41 func createResthook(restoreData bool, sdlInst iSdl, sdlInst2 iSdl) *Resthook {
42         rh := &Resthook{
43                 client: &http.Client{},
44                 db:     sdlInst,
45                 db2:    sdlInst2,
46         }
47
48         if restoreData {
49                 rh.subscriptions = rh.RestoreSubscriptions()
50         } else {
51                 rh.subscriptions = cmap.New()
52         }
53         return rh
54 }
55
56 func (rh *Resthook) AddSubscription(sr models.SubscriptionRequest) *models.SubscriptionResponse {
57         for v := range rh.subscriptions.IterBuffered() {
58                 r := v.Val.(SubscriptionInfo).req
59                 if *r.Data.TargetURL == *sr.Data.TargetURL && r.Data.EventType == sr.Data.EventType {
60                         appmgr.Logger.Info("Similar subscription already exists!")
61                         resp := v.Val.(SubscriptionInfo).resp
62                         return &resp
63                 }
64         }
65
66         key := ksuid.New().String()
67         resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: sr.Data.EventType}
68         rh.subscriptions.Set(key, SubscriptionInfo{key, sr, resp})
69         rh.StoreSubscriptions(rh.subscriptions)
70
71         appmgr.Logger.Info("Sub: New subscription added: key=%s targetUl=%s eventType=%s", key, *sr.Data.TargetURL, sr.Data.EventType)
72         return &resp
73 }
74
75 func (rh *Resthook) DeleteSubscription(id string) (*models.SubscriptionResponse, bool) {
76         if v, found := rh.subscriptions.Get(id); found {
77                 appmgr.Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(SubscriptionInfo).req)
78
79                 rh.subscriptions.Remove(id)
80                 rh.StoreSubscriptions(rh.subscriptions)
81                 resp := v.(SubscriptionInfo).resp
82                 return &resp, found
83         }
84         return &models.SubscriptionResponse{}, false
85 }
86
87 func (rh *Resthook) ModifySubscription(id string, req models.SubscriptionRequest) (*models.SubscriptionResponse, bool) {
88         if s, found := rh.subscriptions.Get(id); found {
89                 appmgr.Logger.Info("Subscription id=%s found: %v ... updating", id, s.(SubscriptionInfo).req)
90
91                 resp := models.SubscriptionResponse{ID: id, Version: 0, EventType: req.Data.EventType}
92                 rh.subscriptions.Set(id, SubscriptionInfo{id, req, resp})
93                 rh.StoreSubscriptions(rh.subscriptions)
94
95                 return &resp, found
96         }
97         return &models.SubscriptionResponse{}, false
98 }
99
100 func (rh *Resthook) GetAllSubscriptions() (hooks models.AllSubscriptions) {
101         hooks = models.AllSubscriptions{}
102         for v := range rh.subscriptions.IterBuffered() {
103                 s := v.Val.(SubscriptionInfo)
104                 r := v.Val.(SubscriptionInfo).req
105                 hooks = append(hooks, &models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, s.Id})
106         }
107
108         return hooks
109 }
110
111 func (rh *Resthook) GetSubscriptionById(id string) (models.Subscription, bool) {
112         if v, found := rh.subscriptions.Get(id); found {
113                 appmgr.Logger.Info("Subscription id=%s found: %v", id, v.(SubscriptionInfo).req)
114                 r := v.(SubscriptionInfo).req
115                 return models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, id}, found
116         }
117         return models.Subscription{}, false
118 }
119
120 func (rh *Resthook) PublishSubscription(x models.Xapp, et models.EventType) {
121         rh.NotifyClients(models.AllDeployedXapps{&x}, et)
122 }
123
124 func (rh *Resthook) NotifyClients(xapps models.AllDeployedXapps, et models.EventType) {
125         if len(xapps) == 0 || len(rh.subscriptions) == 0 {
126                 appmgr.Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(rh.subscriptions))
127                 return
128         }
129
130         rh.Seq = rh.Seq + 1
131         for v := range rh.subscriptions.Iter() {
132                 go rh.notify(xapps, et, v.Val.(SubscriptionInfo), rh.Seq)
133         }
134 }
135
136 func (rh *Resthook) notify(xapps models.AllDeployedXapps, et models.EventType, s SubscriptionInfo, seq int64) error {
137         xappData, err := json.Marshal(xapps)
138         if err != nil {
139                 appmgr.Logger.Info("json.Marshal failed: %v", err)
140                 return err
141         }
142
143         // TODO: Use models.SubscriptionNotification instead of internal ...
144         notif := SubscriptionNotification{ID: s.Id, Version: seq, Event: string(et), XApps: string(xappData)}
145         jsonData, err := json.Marshal(notif)
146         if err != nil {
147                 appmgr.Logger.Info("json.Marshal failed: %v", err)
148                 return err
149         }
150
151         // Execute the request with retry policy
152         return rh.retry(s, func() error {
153                 appmgr.Logger.Info("Posting notification to TargetURL=%s: %v", *s.req.Data.TargetURL, notif)
154                 resp, err := http.Post(*s.req.Data.TargetURL, "application/json", bytes.NewBuffer(jsonData))
155                 if err != nil {
156                         appmgr.Logger.Info("Posting to subscription failed: %v", err)
157                         return err
158                 }
159
160                 if resp.StatusCode != http.StatusOK {
161                         appmgr.Logger.Info("Client returned error code: %d", resp.StatusCode)
162                         return err
163                 }
164
165                 appmgr.Logger.Info("subscription to '%s' dispatched, response code: %d", *s.req.Data.TargetURL, resp.StatusCode)
166                 return nil
167         })
168 }
169
170 func (rh *Resthook) retry(s SubscriptionInfo, fn func() error) error {
171         if err := fn(); err != nil {
172                 // Todo: use exponential backoff, or similar mechanism
173                 if *s.req.Data.MaxRetries--; *s.req.Data.MaxRetries > 0 {
174                         time.Sleep(time.Duration(*s.req.Data.RetryTimer) * time.Second)
175                         return rh.retry(s, fn)
176                 }
177                 rh.subscriptions.Remove(s.Id)
178                 return err
179         }
180         return nil
181 }
182
183 func (rh *Resthook) StoreSubscriptions(m cmap.ConcurrentMap) {
184         for v := range m.Iter() {
185                 s := v.Val.(SubscriptionInfo)
186                 data, err := json.Marshal(s.req)
187                 if err != nil {
188                         appmgr.Logger.Error("json.marshal failed: %v ", err.Error())
189                         return
190                 }
191
192                 if err := rh.db.Set(s.Id, data); err != nil {
193                         appmgr.Logger.Error("DB.session.Set failed: %v ", err.Error())
194                 }
195         }
196 }
197
198 func (rh *Resthook) RestoreSubscriptions() (m cmap.ConcurrentMap) {
199         rh.VerifyDBConnection()
200
201         m = cmap.New()
202         keys, err := rh.db.GetAll()
203         if err != nil {
204                 appmgr.Logger.Error("DB.session.GetAll failed: %v ", err.Error())
205                 return
206         }
207
208         for _, key := range keys {
209                 value, err := rh.db.Get([]string{key})
210                 if err != nil {
211                         appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
212                         return
213                 }
214
215                 var item models.SubscriptionRequest
216                 if err = json.Unmarshal([]byte(value[key].(string)), &item); err != nil {
217                         appmgr.Logger.Error("json.Unmarshal failed: %v ", err.Error())
218                         return
219                 }
220
221                 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: item.Data.EventType}
222                 m.Set(key, SubscriptionInfo{key, item, resp})
223         }
224
225         return m
226 }
227
228 func (rh *Resthook) VerifyDBConnection() {
229         // Test DB connection, and wait until ready!
230         for {
231                 if _, err := rh.db.GetAll(); err == nil {
232                         return
233                 }
234                 appmgr.Logger.Error("Database connection not ready, waiting ...")
235                 time.Sleep(time.Duration(5 * time.Second))
236         }
237 }
238
239 func (rh *Resthook) FlushSubscriptions() {
240         rh.db.RemoveAll()
241         rh.subscriptions = cmap.New()
242 }
243
244 func (rh *Resthook) UpdateAppData(params models.RegisterRequest, updateflag bool) {
245         appmgr.Logger.Info("Endpoint to be added in SDL: %s", *params.HTTPEndpoint)
246         if updateflag == false {
247                 return
248         }
249
250         value, err := rh.db2.Get([]string{"endpoints"})
251         if err != nil {
252                 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
253                 return
254         }
255
256         appmgr.Logger.Info("List of Apps in SDL: %v", value["endpoints"])
257         var appsindb []string
258         var data string
259         dbflag := false
260
261         if value["endpoints"] != nil {
262                 formstring := fmt.Sprintf("%s", value["endpoints"])
263                 newstring := strings.Split(formstring, " ")
264                 for i, _ := range newstring {
265                         if len(newstring) == 1 && strings.Contains(newstring[i], *params.HTTPEndpoint) {
266                                 appmgr.Logger.Info("Removing Key %s", *params.HTTPEndpoint)
267                                 rh.db2.Remove([]string{"endpoints"})
268                                 dbflag = true
269                                 break
270                         }
271                         if strings.Contains(newstring[i], *params.HTTPEndpoint) {
272                                 appmgr.Logger.Info("Removing entry %s", *params.HTTPEndpoint)
273                                 dbflag = true
274                                 continue
275                         }
276                         appsindb = append(appsindb, newstring[i])
277                         data = strings.Join(appsindb, " ")
278                 }
279                 rh.db2.Set("endpoints", strings.TrimSpace(data))
280         }
281
282         if dbflag == false {
283                 xappData, err := json.Marshal(params)
284                 if err != nil {
285                         appmgr.Logger.Info("json.Marshal failed: %v", err)
286                         return
287                 }
288                 appsindb = append(appsindb, string(xappData))
289                 data = strings.Join(appsindb, " ")
290                 rh.db2.Set("endpoints", strings.TrimSpace(data))
291         }
292 }
293
294 func (rh *Resthook) GetAppsInSDL() *string {
295         value, err := rh.db2.Get([]string{"endpoints"})
296         if err != nil {
297                 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
298                 return nil
299         }
300         appmgr.Logger.Info("List of Apps in SDL: %v", value["endpoints"])
301         if value["endpoints"] == nil || value["endpoints"] == "" {
302                 return nil
303         } else {
304                 apps := fmt.Sprintf("%s", value["endpoints"])
305                 return &apps
306         }
307 }