Run unit tests
[ric-plt/appmgr.git] / pkg / resthooks / resthooks.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package resthooks
21
22 import (
23         "bytes"
24         "encoding/json"
25         sdl "gerrit.oran-osc.org/r/ric-plt/sdlgo"
26         cmap "github.com/orcaman/concurrent-map"
27         "github.com/segmentio/ksuid"
28         "net/http"
29         "time"
30
31         "gerrit.oran-osc.org/r/ric-plt/appmgr/pkg/appmgr"
32         "gerrit.oran-osc.org/r/ric-plt/appmgr/pkg/models"
33 )
34
35 func NewResthook(restoreData bool) *Resthook {
36         rh := &Resthook{
37                 client: &http.Client{},
38                 db:     sdl.NewSdlInstance("appmgr", sdl.NewDatabase()),
39         }
40
41         if restoreData {
42                 rh.subscriptions = rh.RestoreSubscriptions()
43         } else {
44                 rh.subscriptions = cmap.New()
45         }
46         return rh
47 }
48
49 func (rh *Resthook) AddSubscription(sr models.SubscriptionRequest) *models.SubscriptionResponse {
50         for v := range rh.subscriptions.IterBuffered() {
51                 r := v.Val.(SubscriptionInfo).req
52                 if *r.Data.TargetURL == *sr.Data.TargetURL && r.Data.EventType == sr.Data.EventType {
53                         appmgr.Logger.Info("Similar subscription already exists!")
54                         return &models.SubscriptionResponse{}
55                 }
56         }
57
58         key := ksuid.New().String()
59         resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: sr.Data.EventType}
60         rh.subscriptions.Set(key, SubscriptionInfo{key, sr, resp})
61         rh.StoreSubscriptions(rh.subscriptions)
62
63         appmgr.Logger.Info("Sub: New subscription added: key=%s targetUl=%s eventType=%s", key, *sr.Data.TargetURL, sr.Data.EventType)
64         return &resp
65 }
66
67 func (rh *Resthook) DeleteSubscription(id string) (*models.SubscriptionResponse, bool) {
68         if v, found := rh.subscriptions.Get(id); found {
69                 appmgr.Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(SubscriptionInfo).req)
70
71                 rh.subscriptions.Remove(id)
72                 rh.StoreSubscriptions(rh.subscriptions)
73                 resp := v.(SubscriptionInfo).resp
74                 return &resp, found
75         }
76         return &models.SubscriptionResponse{}, false
77 }
78
79 func (rh *Resthook) ModifySubscription(id string, req models.SubscriptionRequest) (*models.SubscriptionResponse, bool) {
80         if s, found := rh.subscriptions.Get(id); found {
81                 appmgr.Logger.Info("Subscription id=%s found: %v ... updating", id, s.(SubscriptionInfo).req)
82
83                 resp := models.SubscriptionResponse{ID: id, Version: 0, EventType: req.Data.EventType}
84                 rh.subscriptions.Set(id, SubscriptionInfo{id, req, resp})
85                 rh.StoreSubscriptions(rh.subscriptions)
86
87                 return &resp, found
88         }
89         return &models.SubscriptionResponse{}, false
90 }
91
92 func (rh *Resthook) GetAllSubscriptions() (hooks models.AllSubscriptions) {
93         hooks = models.AllSubscriptions{}
94         for v := range rh.subscriptions.IterBuffered() {
95                 s := v.Val.(SubscriptionInfo)
96                 r := v.Val.(SubscriptionInfo).req
97                 hooks = append(hooks, &models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, s.Id})
98         }
99
100         return hooks
101 }
102
103 func (rh *Resthook) GetSubscriptionById(id string) (models.Subscription, bool) {
104         if v, found := rh.subscriptions.Get(id); found {
105                 appmgr.Logger.Info("Subscription id=%s found: %v", id, v.(SubscriptionInfo).req)
106                 r := v.(SubscriptionInfo).req
107                 return models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, id}, found
108         }
109         return models.Subscription{}, false
110 }
111
112 func (rh *Resthook) PublishSubscription(x models.Xapp, et models.EventType) {
113         rh.NotifyClients(models.AllDeployedXapps{&x}, et)
114 }
115
116 func (rh *Resthook) NotifyClients(xapps models.AllDeployedXapps, et models.EventType) {
117         if len(xapps) == 0 || len(rh.subscriptions) == 0 {
118                 appmgr.Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(rh.subscriptions))
119                 return
120         }
121
122         rh.Seq = rh.Seq + 1
123         for v := range rh.subscriptions.Iter() {
124                 go rh.notify(xapps, et, v.Val.(SubscriptionInfo), rh.Seq)
125         }
126 }
127
128 func (rh *Resthook) notify(xapps models.AllDeployedXapps, et models.EventType, s SubscriptionInfo, seq int64) error {
129         xappData, err := json.Marshal(xapps)
130         if err != nil {
131                 appmgr.Logger.Info("json.Marshal failed: %v", err)
132                 return err
133         }
134
135         // TODO: Use models.SubscriptionNotification instead of internal ...
136         notif := SubscriptionNotification{ID: s.Id, Version: seq, Event: string(et), XApps: string(xappData)}
137         jsonData, err := json.Marshal(notif)
138         if err != nil {
139                 appmgr.Logger.Info("json.Marshal failed: %v", err)
140                 return err
141         }
142
143         // Execute the request with retry policy
144         return rh.retry(s, func() error {
145                 appmgr.Logger.Info("Posting notification to TargetURL=%s: %v", *s.req.Data.TargetURL, notif)
146                 resp, err := http.Post(*s.req.Data.TargetURL, "application/json", bytes.NewBuffer(jsonData))
147                 if err != nil {
148                         appmgr.Logger.Info("Posting to subscription failed: %v", err)
149                         return err
150                 }
151
152                 if resp.StatusCode != http.StatusOK {
153                         appmgr.Logger.Info("Client returned error code: %d", resp.StatusCode)
154                         return err
155                 }
156
157                 appmgr.Logger.Info("subscription to '%s' dispatched, response code: %d", *s.req.Data.TargetURL, resp.StatusCode)
158                 return nil
159         })
160 }
161
162 func (rh *Resthook) retry(s SubscriptionInfo, fn func() error) error {
163         if err := fn(); err != nil {
164                 // Todo: use exponential backoff, or similar mechanism
165                 if *s.req.Data.MaxRetries--; *s.req.Data.MaxRetries > 0 {
166                         time.Sleep(time.Duration(*s.req.Data.RetryTimer) * time.Second)
167                         return rh.retry(s, fn)
168                 }
169                 rh.subscriptions.Remove(s.Id)
170                 return err
171         }
172         return nil
173 }
174
175 func (rh *Resthook) StoreSubscriptions(m cmap.ConcurrentMap) {
176         for v := range m.Iter() {
177                 s := v.Val.(SubscriptionInfo)
178
179                 data, err := json.Marshal(s.req)
180                 if err != nil {
181                         appmgr.Logger.Error("json.marshal failed: %v ", err.Error())
182                         return
183                 }
184
185                 if err := rh.db.Set(s.Id, data); err != nil {
186                         appmgr.Logger.Error("DB.session.Set failed: %v ", err.Error())
187                 }
188         }
189 }
190
191 func (rh *Resthook) RestoreSubscriptions() (m cmap.ConcurrentMap) {
192         rh.VerifyDBConnection()
193
194         m = cmap.New()
195         keys, err := rh.db.GetAll()
196         if err != nil {
197                 appmgr.Logger.Error("DB.session.GetAll failed: %v ", err.Error())
198                 return
199         }
200
201         for _, key := range keys {
202                 value, err := rh.db.Get([]string{key})
203                 if err != nil {
204                         appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
205                         return
206                 }
207
208                 var item models.SubscriptionRequest
209                 if err = json.Unmarshal([]byte(value[key].(string)), &item); err != nil {
210                         appmgr.Logger.Error("json.Unmarshal failed: %v ", err.Error())
211                         return
212                 }
213
214                 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: item.Data.EventType}
215                 m.Set(key, SubscriptionInfo{key, item, resp})
216         }
217
218         return m
219 }
220
221 func (rh *Resthook) VerifyDBConnection() {
222         // Test DB connection, and wait until ready!
223         for {
224                 if _, err := rh.db.GetAll(); err == nil {
225                         return
226                 }
227                 appmgr.Logger.Error("Database connection not ready, waiting ...")
228                 time.Sleep(time.Duration(5 * time.Second))
229         }
230 }
231
232 func (rh *Resthook) FlushSubscriptions() {
233         rh.db.RemoveAll()
234         rh.subscriptions = cmap.New()
235 }