Update logging interface
[ric-plt/appmgr.git] / cmd / appmgr / subscriptions.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package main
21
22 import (
23         "bytes"
24         "encoding/json"
25         "github.com/segmentio/ksuid"
26         "net/http"
27         "time"
28 )
29
30 func (sd *SubscriptionDispatcher) Initialize() {
31         sd.client = &http.Client{}
32
33         sd.db = &DB{}
34         sd.db.Create()
35         sd.subscriptions = sd.db.RestoreSubscriptions()
36 }
37
38 func (sd *SubscriptionDispatcher) Add(sr SubscriptionReq) (resp SubscriptionResp) {
39         key := ksuid.New().String()
40         resp = SubscriptionResp{key, 0, sr.EventType}
41         sr.Id = key
42
43         sd.subscriptions.Set(key, Subscription{sr, resp})
44         sd.db.StoreSubscriptions(sd.subscriptions)
45
46         Logger.Info("Sub: New subscription added: key=%s value=%v", key, sr)
47         return
48 }
49
50 func (sd *SubscriptionDispatcher) GetAll() (hooks []SubscriptionReq) {
51         hooks = []SubscriptionReq{}
52         for v := range sd.subscriptions.IterBuffered() {
53                 hooks = append(hooks, v.Val.(Subscription).req)
54         }
55
56         return hooks
57 }
58
59 func (sd *SubscriptionDispatcher) Get(id string) (SubscriptionReq, bool) {
60         if v, found := sd.subscriptions.Get(id); found {
61                 Logger.Info("Subscription id=%s found: %v", id, v.(Subscription).req)
62
63                 return v.(Subscription).req, found
64         }
65         return SubscriptionReq{}, false
66 }
67
68 func (sd *SubscriptionDispatcher) Delete(id string) (SubscriptionReq, bool) {
69         if v, found := sd.subscriptions.Get(id); found {
70                 Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(Subscription).req)
71
72                 sd.subscriptions.Remove(id)
73                 sd.db.StoreSubscriptions(sd.subscriptions)
74
75                 return v.(Subscription).req, found
76         }
77         return SubscriptionReq{}, false
78 }
79
80 func (sd *SubscriptionDispatcher) Update(id string, sr SubscriptionReq) (SubscriptionReq, bool) {
81         if s, found := sd.subscriptions.Get(id); found {
82                 Logger.Info("Subscription id=%s found: %v ... updating", id, s.(Subscription).req)
83
84                 sr.Id = id
85                 sd.subscriptions.Set(id, Subscription{sr, s.(Subscription).resp})
86                 sd.db.StoreSubscriptions(sd.subscriptions)
87
88                 return sr, found
89         }
90         return SubscriptionReq{}, false
91 }
92
93 func (sd *SubscriptionDispatcher) Publish(x Xapp, et EventType) {
94         sd.notifyClients([]Xapp{x}, et)
95 }
96
97 func (sd *SubscriptionDispatcher) notifyClients(xapps []Xapp, et EventType) {
98         if len(xapps) == 0 || len(sd.subscriptions) == 0 {
99                 Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(sd.subscriptions))
100                 return
101         }
102
103         sd.Seq = sd.Seq + 1
104         for v := range sd.subscriptions.Iter() {
105                 go sd.notify(xapps, et, v.Val.(Subscription), sd.Seq)
106         }
107 }
108
109 func (sd *SubscriptionDispatcher) notify(xapps []Xapp, et EventType, s Subscription, seq int) error {
110         notif := []SubscriptionNotif{}
111         notif = append(notif, SubscriptionNotif{Id: s.req.Id, Version: seq, EventType: string(et), XappData: xapps})
112
113         jsonData, err := json.Marshal(notif)
114         if err != nil {
115                 Logger.Info("json.Marshal failed: %v", err)
116                 return err
117         }
118
119         // Execute the request with retry policy
120         return sd.retry(s, func() error {
121                 resp, err := http.Post(s.req.TargetUrl, "application/json", bytes.NewBuffer(jsonData))
122                 if err != nil {
123                         Logger.Info("Posting to subscription failed: %v", err)
124                         return err
125                 }
126
127                 if resp.StatusCode != http.StatusOK {
128                         Logger.Info("Client returned error code: %d", resp.StatusCode)
129                         return err
130                 }
131
132                 Logger.Info("subscription to '%s' dispatched, response code: %d", s.req.TargetUrl, resp.StatusCode)
133                 return nil
134         })
135 }
136
137 func (sd *SubscriptionDispatcher) retry(s Subscription, fn func() error) error {
138         if err := fn(); err != nil {
139                 // Todo: use exponential backoff, or similar mechanism
140                 if s.req.MaxRetries--; s.req.MaxRetries > 0 {
141                         time.Sleep(time.Duration(s.req.RetryTimer) * time.Second)
142                         return sd.retry(s, fn)
143                 }
144                 sd.subscriptions.Remove(s.req.Id)
145                 return err
146         }
147         return nil
148 }