Swagger-baser server REST API interface
[ric-plt/appmgr.git] / pkg / resthooks / resthooks.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package resthooks
21
22 import (
23         "bytes"
24         "encoding/json"
25         sdl "gerrit.oran-osc.org/r/ric-plt/sdlgo"
26         cmap "github.com/orcaman/concurrent-map"
27         "github.com/segmentio/ksuid"
28         "net/http"
29         "time"
30
31         "gerrit.oran-osc.org/r/ric-plt/appmgr/pkg/appmgr"
32         "gerrit.oran-osc.org/r/ric-plt/appmgr/pkg/models"
33 )
34
35 func NewResthook() *Resthook {
36         rh := &Resthook{
37                 client: &http.Client{},
38                 db:     sdl.NewSdlInstance("appmgr", sdl.NewDatabase()),
39         }
40
41         rh.subscriptions = rh.RestoreSubscriptions()
42         return rh
43 }
44
45 func (rh *Resthook) AddSubscription(sr models.SubscriptionRequest) *models.SubscriptionResponse {
46         for v := range rh.subscriptions.IterBuffered() {
47                 r := v.Val.(SubscriptionInfo).req
48                 if *r.Data.TargetURL == *sr.Data.TargetURL && r.Data.EventType == sr.Data.EventType {
49                         appmgr.Logger.Info("Similar subscription already exists!")
50                         return &models.SubscriptionResponse{}
51                 }
52         }
53
54         key := ksuid.New().String()
55         resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: sr.Data.EventType}
56         rh.subscriptions.Set(key, SubscriptionInfo{key, sr, resp})
57         rh.StoreSubscriptions(rh.subscriptions)
58
59         appmgr.Logger.Info("Sub: New subscription added: key=%s targetUl=%s eventType=%s", key, *sr.Data.TargetURL, sr.Data.EventType)
60         return &resp
61 }
62
63 func (rh *Resthook) DeleteSubscription(id string) (*models.SubscriptionResponse, bool) {
64         if v, found := rh.subscriptions.Get(id); found {
65                 appmgr.Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(SubscriptionInfo).req)
66
67                 rh.subscriptions.Remove(id)
68                 rh.StoreSubscriptions(rh.subscriptions)
69                 resp := v.(SubscriptionInfo).resp
70                 return &resp, found
71         }
72         return &models.SubscriptionResponse{}, false
73 }
74
75 func (rh *Resthook) ModifySubscription(id string, req models.SubscriptionRequest) (*models.SubscriptionResponse, bool) {
76         if s, found := rh.subscriptions.Get(id); found {
77                 appmgr.Logger.Info("Subscription id=%s found: %v ... updating", id, s.(SubscriptionInfo).req)
78
79                 resp := models.SubscriptionResponse{ID: id, Version: 0, EventType: req.Data.EventType}
80                 rh.subscriptions.Set(id, SubscriptionInfo{id, req, resp})
81                 rh.StoreSubscriptions(rh.subscriptions)
82
83                 return &resp, found
84         }
85         return &models.SubscriptionResponse{}, false
86 }
87
88 func (rh *Resthook) GetAllSubscriptions() (hooks models.AllSubscriptions) {
89         hooks = models.AllSubscriptions{}
90         for v := range rh.subscriptions.IterBuffered() {
91                 s := v.Val.(SubscriptionInfo)
92                 r := v.Val.(SubscriptionInfo).req
93                 hooks = append(hooks, &models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, s.Id})
94         }
95
96         return hooks
97 }
98
99 func (rh *Resthook) GetSubscriptionById(id string) (models.Subscription, bool) {
100         if v, found := rh.subscriptions.Get(id); found {
101                 appmgr.Logger.Info("Subscription id=%s found: %v", id, v.(SubscriptionInfo).req)
102                 r := v.(SubscriptionInfo).req
103                 return models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, id}, found
104         }
105         return models.Subscription{}, false
106 }
107
108 func (rh *Resthook) PublishSubscription(x models.Xapp, et models.EventType) {
109         rh.NotifyClients(models.AllDeployedXapps{&x}, et)
110 }
111
112 func (rh *Resthook) NotifyClients(xapps models.AllDeployedXapps, et models.EventType) {
113         if len(xapps) == 0 || len(rh.subscriptions) == 0 {
114                 appmgr.Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(rh.subscriptions))
115                 return
116         }
117
118         rh.Seq = rh.Seq + 1
119         for v := range rh.subscriptions.Iter() {
120                 go rh.notify(xapps, et, v.Val.(SubscriptionInfo), rh.Seq)
121         }
122 }
123
124 func (rh *Resthook) notify(xapps models.AllDeployedXapps, et models.EventType, s SubscriptionInfo, seq int64) error {
125         notif := models.SubscriptionNotification{ID: s.Id, Version: seq, EventType: et, XApps: xapps}
126         jsonData, err := json.Marshal(notif)
127         if err != nil {
128                 appmgr.Logger.Info("json.Marshal failed: %v", err)
129                 return err
130         }
131
132         // Execute the request with retry policy
133         return rh.retry(s, func() error {
134                 appmgr.Logger.Info("Posting notification to TargetURL=%s: %v", *s.req.Data.TargetURL, notif)
135                 resp, err := http.Post(*s.req.Data.TargetURL, "application/json", bytes.NewBuffer(jsonData))
136                 if err != nil {
137                         appmgr.Logger.Info("Posting to subscription failed: %v", err)
138                         return err
139                 }
140
141                 if resp.StatusCode != http.StatusOK {
142                         appmgr.Logger.Info("Client returned error code: %d", resp.StatusCode)
143                         return err
144                 }
145
146                 appmgr.Logger.Info("subscription to '%s' dispatched, response code: %d", *s.req.Data.TargetURL, resp.StatusCode)
147                 return nil
148         })
149 }
150
151 func (rh *Resthook) retry(s SubscriptionInfo, fn func() error) error {
152         if err := fn(); err != nil {
153                 // Todo: use exponential backoff, or similar mechanism
154                 if *s.req.Data.MaxRetries--; *s.req.Data.MaxRetries > 0 {
155                         time.Sleep(time.Duration(*s.req.Data.RetryTimer) * time.Second)
156                         return rh.retry(s, fn)
157                 }
158                 rh.subscriptions.Remove(s.Id)
159                 return err
160         }
161         return nil
162 }
163
164 func (rh *Resthook) StoreSubscriptions(m cmap.ConcurrentMap) {
165         for v := range m.Iter() {
166                 s := v.Val.(SubscriptionInfo)
167
168                 data, err := json.Marshal(s.req)
169                 if err != nil {
170                         appmgr.Logger.Error("json.marshal failed: %v ", err.Error())
171                         return
172                 }
173
174                 if err := rh.db.Set(s.Id, data); err != nil {
175                         appmgr.Logger.Error("DB.session.Set failed: %v ", err.Error())
176                 }
177         }
178 }
179
180 func (rh *Resthook) RestoreSubscriptions() (m cmap.ConcurrentMap) {
181         rh.VerifyDBConnection()
182
183         m = cmap.New()
184         keys, err := rh.db.GetAll()
185         if err != nil {
186                 appmgr.Logger.Error("DB.session.GetAll failed: %v ", err.Error())
187                 return
188         }
189
190         for _, key := range keys {
191                 value, err := rh.db.Get([]string{key})
192                 if err != nil {
193                         appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
194                         return
195                 }
196
197                 var item models.SubscriptionRequest
198                 if err = json.Unmarshal([]byte(value[key].(string)), &item); err != nil {
199                         appmgr.Logger.Error("json.Unmarshal failed: %v ", err.Error())
200                         return
201                 }
202
203                 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: item.Data.EventType}
204                 m.Set(key, SubscriptionInfo{key, item, resp})
205         }
206
207         return m
208 }
209
210 func (rh *Resthook) VerifyDBConnection() {
211         // Test DB connection, and wait until ready!
212         for {
213                 if _, err := rh.db.GetAll(); err == nil {
214                         return
215                 }
216                 appmgr.Logger.Error("Database connection not ready, waiting ...")
217                 time.Sleep(time.Duration(5 * time.Second))
218         }
219 }
220
221 func (rh *Resthook) FlushSubscriptions() {
222         rh.db.RemoveAll()
223         rh.subscriptions = cmap.New()
224 }