Resolve RMR port from K8s services
[ric-plt/appmgr.git] / pkg / resthooks / resthooks.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package resthooks
21
22 import (
23         "bytes"
24         "encoding/json"
25         sdl "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
26         cmap "github.com/orcaman/concurrent-map"
27         "github.com/segmentio/ksuid"
28         "net/http"
29         "time"
30
31         "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr"
32         "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/models"
33 )
34
35 func NewResthook(restoreData bool) *Resthook {
36         rh := &Resthook{
37                 client: &http.Client{},
38                 db:     sdl.NewSdlInstance("appmgr", sdl.NewDatabase()),
39         }
40
41         if restoreData {
42                 rh.subscriptions = rh.RestoreSubscriptions()
43         } else {
44                 rh.subscriptions = cmap.New()
45         }
46         return rh
47 }
48
49 func (rh *Resthook) AddSubscription(sr models.SubscriptionRequest) *models.SubscriptionResponse {
50         for v := range rh.subscriptions.IterBuffered() {
51                 r := v.Val.(SubscriptionInfo).req
52                 if *r.Data.TargetURL == *sr.Data.TargetURL && r.Data.EventType == sr.Data.EventType {
53                         appmgr.Logger.Info("Similar subscription already exists!")
54                         resp := v.Val.(SubscriptionInfo).resp
55                         return &resp
56                 }
57         }
58
59         key := ksuid.New().String()
60         resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: sr.Data.EventType}
61         rh.subscriptions.Set(key, SubscriptionInfo{key, sr, resp})
62         rh.StoreSubscriptions(rh.subscriptions)
63
64         appmgr.Logger.Info("Sub: New subscription added: key=%s targetUl=%s eventType=%s", key, *sr.Data.TargetURL, sr.Data.EventType)
65         return &resp
66 }
67
68 func (rh *Resthook) DeleteSubscription(id string) (*models.SubscriptionResponse, bool) {
69         if v, found := rh.subscriptions.Get(id); found {
70                 appmgr.Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(SubscriptionInfo).req)
71
72                 rh.subscriptions.Remove(id)
73                 rh.StoreSubscriptions(rh.subscriptions)
74                 resp := v.(SubscriptionInfo).resp
75                 return &resp, found
76         }
77         return &models.SubscriptionResponse{}, false
78 }
79
80 func (rh *Resthook) ModifySubscription(id string, req models.SubscriptionRequest) (*models.SubscriptionResponse, bool) {
81         if s, found := rh.subscriptions.Get(id); found {
82                 appmgr.Logger.Info("Subscription id=%s found: %v ... updating", id, s.(SubscriptionInfo).req)
83
84                 resp := models.SubscriptionResponse{ID: id, Version: 0, EventType: req.Data.EventType}
85                 rh.subscriptions.Set(id, SubscriptionInfo{id, req, resp})
86                 rh.StoreSubscriptions(rh.subscriptions)
87
88                 return &resp, found
89         }
90         return &models.SubscriptionResponse{}, false
91 }
92
93 func (rh *Resthook) GetAllSubscriptions() (hooks models.AllSubscriptions) {
94         hooks = models.AllSubscriptions{}
95         for v := range rh.subscriptions.IterBuffered() {
96                 s := v.Val.(SubscriptionInfo)
97                 r := v.Val.(SubscriptionInfo).req
98                 hooks = append(hooks, &models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, s.Id})
99         }
100
101         return hooks
102 }
103
104 func (rh *Resthook) GetSubscriptionById(id string) (models.Subscription, bool) {
105         if v, found := rh.subscriptions.Get(id); found {
106                 appmgr.Logger.Info("Subscription id=%s found: %v", id, v.(SubscriptionInfo).req)
107                 r := v.(SubscriptionInfo).req
108                 return models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, id}, found
109         }
110         return models.Subscription{}, false
111 }
112
113 func (rh *Resthook) PublishSubscription(x models.Xapp, et models.EventType) {
114         rh.NotifyClients(models.AllDeployedXapps{&x}, et)
115 }
116
117 func (rh *Resthook) NotifyClients(xapps models.AllDeployedXapps, et models.EventType) {
118         if len(xapps) == 0 || len(rh.subscriptions) == 0 {
119                 appmgr.Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(rh.subscriptions))
120                 return
121         }
122
123         rh.Seq = rh.Seq + 1
124         for v := range rh.subscriptions.Iter() {
125                 go rh.notify(xapps, et, v.Val.(SubscriptionInfo), rh.Seq)
126         }
127 }
128
129 func (rh *Resthook) notify(xapps models.AllDeployedXapps, et models.EventType, s SubscriptionInfo, seq int64) error {
130         xappData, err := json.Marshal(xapps)
131         if err != nil {
132                 appmgr.Logger.Info("json.Marshal failed: %v", err)
133                 return err
134         }
135
136         // TODO: Use models.SubscriptionNotification instead of internal ...
137         notif := SubscriptionNotification{ID: s.Id, Version: seq, Event: string(et), XApps: string(xappData)}
138         jsonData, err := json.Marshal(notif)
139         if err != nil {
140                 appmgr.Logger.Info("json.Marshal failed: %v", err)
141                 return err
142         }
143
144         // Execute the request with retry policy
145         return rh.retry(s, func() error {
146                 appmgr.Logger.Info("Posting notification to TargetURL=%s: %v", *s.req.Data.TargetURL, notif)
147                 resp, err := http.Post(*s.req.Data.TargetURL, "application/json", bytes.NewBuffer(jsonData))
148                 if err != nil {
149                         appmgr.Logger.Info("Posting to subscription failed: %v", err)
150                         return err
151                 }
152
153                 if resp.StatusCode != http.StatusOK {
154                         appmgr.Logger.Info("Client returned error code: %d", resp.StatusCode)
155                         return err
156                 }
157
158                 appmgr.Logger.Info("subscription to '%s' dispatched, response code: %d", *s.req.Data.TargetURL, resp.StatusCode)
159                 return nil
160         })
161 }
162
163 func (rh *Resthook) retry(s SubscriptionInfo, fn func() error) error {
164         if err := fn(); err != nil {
165                 // Todo: use exponential backoff, or similar mechanism
166                 if *s.req.Data.MaxRetries--; *s.req.Data.MaxRetries > 0 {
167                         time.Sleep(time.Duration(*s.req.Data.RetryTimer) * time.Second)
168                         return rh.retry(s, fn)
169                 }
170                 rh.subscriptions.Remove(s.Id)
171                 return err
172         }
173         return nil
174 }
175
176 func (rh *Resthook) StoreSubscriptions(m cmap.ConcurrentMap) {
177         for v := range m.Iter() {
178                 s := v.Val.(SubscriptionInfo)
179
180                 data, err := json.Marshal(s.req)
181                 if err != nil {
182                         appmgr.Logger.Error("json.marshal failed: %v ", err.Error())
183                         return
184                 }
185
186                 if err := rh.db.Set(s.Id, data); err != nil {
187                         appmgr.Logger.Error("DB.session.Set failed: %v ", err.Error())
188                 }
189         }
190 }
191
192 func (rh *Resthook) RestoreSubscriptions() (m cmap.ConcurrentMap) {
193         rh.VerifyDBConnection()
194
195         m = cmap.New()
196         keys, err := rh.db.GetAll()
197         if err != nil {
198                 appmgr.Logger.Error("DB.session.GetAll failed: %v ", err.Error())
199                 return
200         }
201
202         for _, key := range keys {
203                 value, err := rh.db.Get([]string{key})
204                 if err != nil {
205                         appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
206                         return
207                 }
208
209                 var item models.SubscriptionRequest
210                 if err = json.Unmarshal([]byte(value[key].(string)), &item); err != nil {
211                         appmgr.Logger.Error("json.Unmarshal failed: %v ", err.Error())
212                         return
213                 }
214
215                 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: item.Data.EventType}
216                 m.Set(key, SubscriptionInfo{key, item, resp})
217         }
218
219         return m
220 }
221
222 func (rh *Resthook) VerifyDBConnection() {
223         // Test DB connection, and wait until ready!
224         for {
225                 if _, err := rh.db.GetAll(); err == nil {
226                         return
227                 }
228                 appmgr.Logger.Error("Database connection not ready, waiting ...")
229                 time.Sleep(time.Duration(5 * time.Second))
230         }
231 }
232
233 func (rh *Resthook) FlushSubscriptions() {
234         rh.db.RemoveAll()
235         rh.subscriptions = cmap.New()
236 }