Update subscription data
[ric-plt/appmgr.git] / cmd / appmgr / subscriptions.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package main
21
22 import (
23         "bytes"
24         "encoding/json"
25         "github.com/segmentio/ksuid"
26         "net/http"
27         "time"
28 )
29
30 func (sd *SubscriptionDispatcher) Initialize() {
31         sd.client = &http.Client{}
32
33         sd.db = &DB{}
34         sd.db.Create()
35         sd.subscriptions = sd.db.RestoreSubscriptions()
36 }
37
38 func (sd *SubscriptionDispatcher) Add(sr SubscriptionReq) (resp SubscriptionResp) {
39         key := ksuid.New().String()
40         resp = SubscriptionResp{key, 0, sr.EventType}
41         sr.Id = key
42
43         sd.subscriptions.Set(key, Subscription{sr, resp})
44         sd.db.StoreSubscriptions(sd.subscriptions)
45
46         Logger.Info("Sub: New subscription added: key=%s value=%v", key, sr)
47         return
48 }
49
50 func (sd *SubscriptionDispatcher) GetAll() (hooks []SubscriptionReq) {
51         hooks = []SubscriptionReq{}
52         for v := range sd.subscriptions.IterBuffered() {
53                 hooks = append(hooks, v.Val.(Subscription).req)
54         }
55
56         return hooks
57 }
58
59 func (sd *SubscriptionDispatcher) Get(id string) (SubscriptionReq, bool) {
60         if v, found := sd.subscriptions.Get(id); found {
61                 Logger.Info("Subscription id=%s found: %v", id, v.(Subscription).req)
62
63                 return v.(Subscription).req, found
64         }
65         return SubscriptionReq{}, false
66 }
67
68 func (sd *SubscriptionDispatcher) Delete(id string) (SubscriptionReq, bool) {
69         if v, found := sd.subscriptions.Get(id); found {
70                 Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(Subscription).req)
71
72                 sd.subscriptions.Remove(id)
73                 sd.db.StoreSubscriptions(sd.subscriptions)
74
75                 return v.(Subscription).req, found
76         }
77         return SubscriptionReq{}, false
78 }
79
80 func (sd *SubscriptionDispatcher) Update(id string, sr SubscriptionReq) (SubscriptionReq, bool) {
81         if s, found := sd.subscriptions.Get(id); found {
82                 Logger.Info("Subscription id=%s found: %v ... updating", id, s.(Subscription).req)
83
84                 sr.Id = id
85                 sd.subscriptions.Set(id, Subscription{sr, s.(Subscription).resp})
86                 sd.db.StoreSubscriptions(sd.subscriptions)
87
88                 return sr, found
89         }
90         return SubscriptionReq{}, false
91 }
92
93 func (sd *SubscriptionDispatcher) Publish(x Xapp, et EventType) {
94         sd.notifyClients([]Xapp{x}, et)
95 }
96
97 func (sd *SubscriptionDispatcher) notifyClients(xapps []Xapp, et EventType) {
98         if len(xapps) == 0 || len(sd.subscriptions) == 0 {
99                 Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(sd.subscriptions))
100                 return
101         }
102
103         sd.Seq = sd.Seq + 1
104         for v := range sd.subscriptions.Iter() {
105                 go sd.notify(xapps, et, v.Val.(Subscription), sd.Seq)
106         }
107 }
108
109 func (sd *SubscriptionDispatcher) notify(xapps []Xapp, et EventType, s Subscription, seq int) error {
110         xappData, err := json.Marshal(xapps)
111         if err != nil {
112                 Logger.Info("json.Marshal failed: %v", err)
113                 return err
114         }
115
116         notif := SubscriptionNotif{Id: s.req.Id, Version: seq, EventType: string(et), XApps: string(xappData)}
117         jsonData, err := json.Marshal(notif)
118         if err != nil {
119                 Logger.Info("json.Marshal failed: %v", err)
120                 return err
121         }
122
123         // Execute the request with retry policy
124         return sd.retry(s, func() error {
125                 Logger.Info("Posting notification to targetUrl=%s: %v", s.req.TargetUrl, notif)
126                 resp, err := http.Post(s.req.TargetUrl, "application/json", bytes.NewBuffer(jsonData))
127                 if err != nil {
128                         Logger.Info("Posting to subscription failed: %v", err)
129                         return err
130                 }
131
132                 if resp.StatusCode != http.StatusOK {
133                         Logger.Info("Client returned error code: %d", resp.StatusCode)
134                         return err
135                 }
136
137                 Logger.Info("subscription to '%s' dispatched, response code: %d", s.req.TargetUrl, resp.StatusCode)
138                 return nil
139         })
140 }
141
142 func (sd *SubscriptionDispatcher) retry(s Subscription, fn func() error) error {
143         if err := fn(); err != nil {
144                 // Todo: use exponential backoff, or similar mechanism
145                 if s.req.MaxRetries--; s.req.MaxRetries > 0 {
146                         time.Sleep(time.Duration(s.req.RetryTimer) * time.Second)
147                         return sd.retry(s, fn)
148                 }
149                 sd.subscriptions.Remove(s.req.Id)
150                 return err
151         }
152         return nil
153 }