2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
26 sdl "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
27 cmap "github.com/orcaman/concurrent-map"
28 "github.com/segmentio/ksuid"
33 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr"
34 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/models"
37 func NewResthook(restoreData bool) *Resthook {
38 return createResthook(restoreData, sdl.NewSdlInstance("appmgr", sdl.NewDatabase()), sdl.NewSdlInstance("appdb", sdl.NewDatabase()))
41 func createResthook(restoreData bool, sdlInst iSdl, sdlInst2 iSdl) *Resthook {
43 client: &http.Client{},
49 rh.subscriptions = rh.RestoreSubscriptions()
51 rh.subscriptions = cmap.New()
56 func (rh *Resthook) AddSubscription(sr models.SubscriptionRequest) *models.SubscriptionResponse {
57 for v := range rh.subscriptions.IterBuffered() {
58 r := v.Val.(SubscriptionInfo).req
59 if *r.Data.TargetURL == *sr.Data.TargetURL && r.Data.EventType == sr.Data.EventType {
60 appmgr.Logger.Info("Similar subscription already exists!")
61 resp := v.Val.(SubscriptionInfo).resp
66 key := ksuid.New().String()
67 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: sr.Data.EventType}
68 rh.subscriptions.Set(key, SubscriptionInfo{key, sr, resp})
69 rh.StoreSubscriptions(rh.subscriptions)
71 appmgr.Logger.Info("Sub: New subscription added: key=%s targetUl=%s eventType=%s", key, *sr.Data.TargetURL, sr.Data.EventType)
75 func (rh *Resthook) DeleteSubscription(id string) (*models.SubscriptionResponse, bool) {
76 if v, found := rh.subscriptions.Get(id); found {
77 appmgr.Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(SubscriptionInfo).req)
79 rh.subscriptions.Remove(id)
80 rh.StoreSubscriptions(rh.subscriptions)
81 resp := v.(SubscriptionInfo).resp
84 return &models.SubscriptionResponse{}, false
87 func (rh *Resthook) ModifySubscription(id string, req models.SubscriptionRequest) (*models.SubscriptionResponse, bool) {
88 if s, found := rh.subscriptions.Get(id); found {
89 appmgr.Logger.Info("Subscription id=%s found: %v ... updating", id, s.(SubscriptionInfo).req)
91 resp := models.SubscriptionResponse{ID: id, Version: 0, EventType: req.Data.EventType}
92 rh.subscriptions.Set(id, SubscriptionInfo{id, req, resp})
93 rh.StoreSubscriptions(rh.subscriptions)
97 return &models.SubscriptionResponse{}, false
100 func (rh *Resthook) GetAllSubscriptions() (hooks models.AllSubscriptions) {
101 hooks = models.AllSubscriptions{}
102 for v := range rh.subscriptions.IterBuffered() {
103 s := v.Val.(SubscriptionInfo)
104 r := v.Val.(SubscriptionInfo).req
105 hooks = append(hooks, &models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, s.Id})
111 func (rh *Resthook) GetSubscriptionById(id string) (models.Subscription, bool) {
112 if v, found := rh.subscriptions.Get(id); found {
113 appmgr.Logger.Info("Subscription id=%s found: %v", id, v.(SubscriptionInfo).req)
114 r := v.(SubscriptionInfo).req
115 return models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, id}, found
117 return models.Subscription{}, false
120 func (rh *Resthook) PublishSubscription(x models.Xapp, et models.EventType) {
121 rh.NotifyClients(models.AllDeployedXapps{&x}, et)
124 func (rh *Resthook) NotifyClients(xapps models.AllDeployedXapps, et models.EventType) {
125 if len(xapps) == 0 || len(rh.subscriptions) == 0 {
126 appmgr.Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(rh.subscriptions))
131 for v := range rh.subscriptions.Iter() {
132 go rh.notify(xapps, et, v.Val.(SubscriptionInfo), rh.Seq)
136 func (rh *Resthook) notify(xapps models.AllDeployedXapps, et models.EventType, s SubscriptionInfo, seq int64) error {
137 xappData, err := json.Marshal(xapps)
139 appmgr.Logger.Info("json.Marshal failed: %v", err)
143 // TODO: Use models.SubscriptionNotification instead of internal ...
144 notif := SubscriptionNotification{ID: s.Id, Version: seq, Event: string(et), XApps: string(xappData)}
145 jsonData, err := json.Marshal(notif)
147 appmgr.Logger.Info("json.Marshal failed: %v", err)
151 // Execute the request with retry policy
152 return rh.retry(s, func() error {
153 appmgr.Logger.Info("Posting notification to TargetURL=%s: %v", *s.req.Data.TargetURL, notif)
154 resp, err := http.Post(*s.req.Data.TargetURL, "application/json", bytes.NewBuffer(jsonData))
156 appmgr.Logger.Info("Posting to subscription failed: %v", err)
160 if resp.StatusCode != http.StatusOK {
161 appmgr.Logger.Info("Client returned error code: %d", resp.StatusCode)
165 appmgr.Logger.Info("subscription to '%s' dispatched, response code: %d", *s.req.Data.TargetURL, resp.StatusCode)
170 func (rh *Resthook) retry(s SubscriptionInfo, fn func() error) error {
171 if err := fn(); err != nil {
172 // Todo: use exponential backoff, or similar mechanism
173 if *s.req.Data.MaxRetries--; *s.req.Data.MaxRetries > 0 {
174 time.Sleep(time.Duration(*s.req.Data.RetryTimer) * time.Second)
175 return rh.retry(s, fn)
177 rh.subscriptions.Remove(s.Id)
183 func (rh *Resthook) StoreSubscriptions(m cmap.ConcurrentMap) {
184 for v := range m.Iter() {
185 s := v.Val.(SubscriptionInfo)
186 data, err := json.Marshal(s.req)
188 appmgr.Logger.Error("json.marshal failed: %v ", err.Error())
192 if err := rh.db.Set(s.Id, data); err != nil {
193 appmgr.Logger.Error("DB.session.Set failed: %v ", err.Error())
198 func (rh *Resthook) RestoreSubscriptions() (m cmap.ConcurrentMap) {
199 rh.VerifyDBConnection()
202 keys, err := rh.db.GetAll()
204 appmgr.Logger.Error("DB.session.GetAll failed: %v ", err.Error())
208 for _, key := range keys {
209 value, err := rh.db.Get([]string{key})
211 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
215 var item models.SubscriptionRequest
216 if err = json.Unmarshal([]byte(value[key].(string)), &item); err != nil {
217 appmgr.Logger.Error("json.Unmarshal failed: %v ", err.Error())
221 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: item.Data.EventType}
222 m.Set(key, SubscriptionInfo{key, item, resp})
228 func (rh *Resthook) VerifyDBConnection() {
229 // Test DB connection, and wait until ready!
231 if _, err := rh.db.GetAll(); err == nil {
234 appmgr.Logger.Error("Database connection not ready, waiting ...")
235 time.Sleep(time.Duration(5 * time.Second))
239 func (rh *Resthook) FlushSubscriptions() {
241 rh.subscriptions = cmap.New()
244 func (rh *Resthook) UpdateAppData(params models.RegisterRequest, updateflag bool) {
245 appmgr.Logger.Info("Endpoint to be added in SDL: %s", *params.HTTPEndpoint)
246 if updateflag == false {
250 //Ensure config is empty string, as we dont want to store config in DB
251 if params.Config != "" {
255 value, err := rh.db2.Get([]string{"endpoints"})
257 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
261 appmgr.Logger.Info("List of Apps in SDL: %v", value["endpoints"])
262 var appsindb []string
266 if value["endpoints"] != nil {
267 formstring := fmt.Sprintf("%s", value["endpoints"])
268 newstring := strings.Split(formstring, " ")
269 for i, _ := range newstring {
270 if len(newstring) == 1 && strings.Contains(newstring[i], *params.HTTPEndpoint) {
271 appmgr.Logger.Info("Removing Key %s", *params.HTTPEndpoint)
272 rh.db2.Remove([]string{"endpoints"})
276 if strings.Contains(newstring[i], *params.HTTPEndpoint) {
277 appmgr.Logger.Info("Removing entry %s", *params.HTTPEndpoint)
281 appsindb = append(appsindb, newstring[i])
282 data = strings.Join(appsindb, " ")
284 rh.db2.Set("endpoints", strings.TrimSpace(data))
288 xappData, err := json.Marshal(params)
290 appmgr.Logger.Info("json.Marshal failed: %v", err)
293 appsindb = append(appsindb, string(xappData))
294 data = strings.Join(appsindb, " ")
295 rh.db2.Set("endpoints", strings.TrimSpace(data))
299 func (rh *Resthook) GetAppsInSDL() *string {
300 value, err := rh.db2.Get([]string{"endpoints"})
302 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
305 appmgr.Logger.Info("List of Apps in SDL: %v", value["endpoints"])
306 if value["endpoints"] == nil || value["endpoints"] == "" {
309 apps := fmt.Sprintf("%s", value["endpoints"])