Improve appmgr UT coverage
[ric-plt/appmgr.git] / pkg / resthooks / resthooks.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package resthooks
21
22 import (
23         "bytes"
24         "encoding/json"
25         sdl "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
26         cmap "github.com/orcaman/concurrent-map"
27         "github.com/segmentio/ksuid"
28         "net/http"
29         "time"
30
31         "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr"
32         "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/models"
33 )
34
35 func NewResthook(restoreData bool) *Resthook {
36         return createResthook(restoreData, sdl.NewSdlInstance("appmgr", sdl.NewDatabase()))
37 }
38
39 func createResthook(restoreData bool, sdlInst iSdl) *Resthook {
40         rh := &Resthook{
41                 client: &http.Client{},
42                 db:     sdlInst,
43         }
44
45         if restoreData {
46                 rh.subscriptions = rh.RestoreSubscriptions()
47         } else {
48                 rh.subscriptions = cmap.New()
49         }
50         return rh
51 }
52
53 func (rh *Resthook) AddSubscription(sr models.SubscriptionRequest) *models.SubscriptionResponse {
54         for v := range rh.subscriptions.IterBuffered() {
55                 r := v.Val.(SubscriptionInfo).req
56                 if *r.Data.TargetURL == *sr.Data.TargetURL && r.Data.EventType == sr.Data.EventType {
57                         appmgr.Logger.Info("Similar subscription already exists!")
58                         resp := v.Val.(SubscriptionInfo).resp
59                         return &resp
60                 }
61         }
62
63         key := ksuid.New().String()
64         resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: sr.Data.EventType}
65         rh.subscriptions.Set(key, SubscriptionInfo{key, sr, resp})
66         rh.StoreSubscriptions(rh.subscriptions)
67
68         appmgr.Logger.Info("Sub: New subscription added: key=%s targetUl=%s eventType=%s", key, *sr.Data.TargetURL, sr.Data.EventType)
69         return &resp
70 }
71
72 func (rh *Resthook) DeleteSubscription(id string) (*models.SubscriptionResponse, bool) {
73         if v, found := rh.subscriptions.Get(id); found {
74                 appmgr.Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(SubscriptionInfo).req)
75
76                 rh.subscriptions.Remove(id)
77                 rh.StoreSubscriptions(rh.subscriptions)
78                 resp := v.(SubscriptionInfo).resp
79                 return &resp, found
80         }
81         return &models.SubscriptionResponse{}, false
82 }
83
84 func (rh *Resthook) ModifySubscription(id string, req models.SubscriptionRequest) (*models.SubscriptionResponse, bool) {
85         if s, found := rh.subscriptions.Get(id); found {
86                 appmgr.Logger.Info("Subscription id=%s found: %v ... updating", id, s.(SubscriptionInfo).req)
87
88                 resp := models.SubscriptionResponse{ID: id, Version: 0, EventType: req.Data.EventType}
89                 rh.subscriptions.Set(id, SubscriptionInfo{id, req, resp})
90                 rh.StoreSubscriptions(rh.subscriptions)
91
92                 return &resp, found
93         }
94         return &models.SubscriptionResponse{}, false
95 }
96
97 func (rh *Resthook) GetAllSubscriptions() (hooks models.AllSubscriptions) {
98         hooks = models.AllSubscriptions{}
99         for v := range rh.subscriptions.IterBuffered() {
100                 s := v.Val.(SubscriptionInfo)
101                 r := v.Val.(SubscriptionInfo).req
102                 hooks = append(hooks, &models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, s.Id})
103         }
104
105         return hooks
106 }
107
108 func (rh *Resthook) GetSubscriptionById(id string) (models.Subscription, bool) {
109         if v, found := rh.subscriptions.Get(id); found {
110                 appmgr.Logger.Info("Subscription id=%s found: %v", id, v.(SubscriptionInfo).req)
111                 r := v.(SubscriptionInfo).req
112                 return models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, id}, found
113         }
114         return models.Subscription{}, false
115 }
116
117 func (rh *Resthook) PublishSubscription(x models.Xapp, et models.EventType) {
118         rh.NotifyClients(models.AllDeployedXapps{&x}, et)
119 }
120
121 func (rh *Resthook) NotifyClients(xapps models.AllDeployedXapps, et models.EventType) {
122         if len(xapps) == 0 || len(rh.subscriptions) == 0 {
123                 appmgr.Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(rh.subscriptions))
124                 return
125         }
126
127         rh.Seq = rh.Seq + 1
128         for v := range rh.subscriptions.Iter() {
129                 go rh.notify(xapps, et, v.Val.(SubscriptionInfo), rh.Seq)
130         }
131 }
132
133 func (rh *Resthook) notify(xapps models.AllDeployedXapps, et models.EventType, s SubscriptionInfo, seq int64) error {
134         xappData, err := json.Marshal(xapps)
135         if err != nil {
136                 appmgr.Logger.Info("json.Marshal failed: %v", err)
137                 return err
138         }
139
140         // TODO: Use models.SubscriptionNotification instead of internal ...
141         notif := SubscriptionNotification{ID: s.Id, Version: seq, Event: string(et), XApps: string(xappData)}
142         jsonData, err := json.Marshal(notif)
143         if err != nil {
144                 appmgr.Logger.Info("json.Marshal failed: %v", err)
145                 return err
146         }
147
148         // Execute the request with retry policy
149         return rh.retry(s, func() error {
150                 appmgr.Logger.Info("Posting notification to TargetURL=%s: %v", *s.req.Data.TargetURL, notif)
151                 resp, err := http.Post(*s.req.Data.TargetURL, "application/json", bytes.NewBuffer(jsonData))
152                 if err != nil {
153                         appmgr.Logger.Info("Posting to subscription failed: %v", err)
154                         return err
155                 }
156
157                 if resp.StatusCode != http.StatusOK {
158                         appmgr.Logger.Info("Client returned error code: %d", resp.StatusCode)
159                         return err
160                 }
161
162                 appmgr.Logger.Info("subscription to '%s' dispatched, response code: %d", *s.req.Data.TargetURL, resp.StatusCode)
163                 return nil
164         })
165 }
166
167 func (rh *Resthook) retry(s SubscriptionInfo, fn func() error) error {
168         if err := fn(); err != nil {
169                 // Todo: use exponential backoff, or similar mechanism
170                 if *s.req.Data.MaxRetries--; *s.req.Data.MaxRetries > 0 {
171                         time.Sleep(time.Duration(*s.req.Data.RetryTimer) * time.Second)
172                         return rh.retry(s, fn)
173                 }
174                 rh.subscriptions.Remove(s.Id)
175                 return err
176         }
177         return nil
178 }
179
180 func (rh *Resthook) StoreSubscriptions(m cmap.ConcurrentMap) {
181         for v := range m.Iter() {
182                 s := v.Val.(SubscriptionInfo)
183                 data, err := json.Marshal(s.req)
184                 if err != nil {
185                         appmgr.Logger.Error("json.marshal failed: %v ", err.Error())
186                         return
187                 }
188
189                 if err := rh.db.Set(s.Id, data); err != nil {
190                         appmgr.Logger.Error("DB.session.Set failed: %v ", err.Error())
191                 }
192         }
193 }
194
195 func (rh *Resthook) RestoreSubscriptions() (m cmap.ConcurrentMap) {
196         rh.VerifyDBConnection()
197
198         m = cmap.New()
199         keys, err := rh.db.GetAll()
200         if err != nil {
201                 appmgr.Logger.Error("DB.session.GetAll failed: %v ", err.Error())
202                 return
203         }
204
205         for _, key := range keys {
206                 value, err := rh.db.Get([]string{key})
207                 if err != nil {
208                         appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
209                         return
210                 }
211
212                 var item models.SubscriptionRequest
213                 if err = json.Unmarshal([]byte(value[key].(string)), &item); err != nil {
214                         appmgr.Logger.Error("json.Unmarshal failed: %v ", err.Error())
215                         return
216                 }
217
218                 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: item.Data.EventType}
219                 m.Set(key, SubscriptionInfo{key, item, resp})
220         }
221
222         return m
223 }
224
225 func (rh *Resthook) VerifyDBConnection() {
226         // Test DB connection, and wait until ready!
227         for {
228                 if _, err := rh.db.GetAll(); err == nil {
229                         return
230                 }
231                 appmgr.Logger.Error("Database connection not ready, waiting ...")
232                 time.Sleep(time.Duration(5 * time.Second))
233         }
234 }
235
236 func (rh *Resthook) FlushSubscriptions() {
237         rh.db.RemoveAll()
238         rh.subscriptions = cmap.New()
239 }