New standard GO project layout
[ric-plt/appmgr.git] / cmd / appmgr / subscriptions.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package main
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "github.com/segmentio/ksuid"
27         "net/http"
28         "time"
29 )
30
31 func (sd *SubscriptionDispatcher) Initialize() {
32         sd.client = &http.Client{}
33
34         sd.db = &DB{}
35         sd.db.Create()
36         sd.subscriptions = sd.db.RestoreSubscriptions()
37 }
38
39 func (sd *SubscriptionDispatcher) Add(sr SubscriptionReq) (resp SubscriptionResp) {
40         key := ksuid.New().String()
41         resp = SubscriptionResp{key, 0, sr.EventType}
42         sr.Id = key
43
44         sd.subscriptions.Set(key, Subscription{sr, resp})
45         sd.db.StoreSubscriptions(sd.subscriptions)
46
47         mdclog(MdclogInfo, fmt.Sprintf("Sub: New subscription added: key=%s value=%v", key, sr))
48         return
49 }
50
51 func (sd *SubscriptionDispatcher) GetAll() (hooks []SubscriptionReq) {
52         hooks = []SubscriptionReq{}
53         for v := range sd.subscriptions.IterBuffered() {
54                 hooks = append(hooks, v.Val.(Subscription).req)
55         }
56
57         return hooks
58 }
59
60 func (sd *SubscriptionDispatcher) Get(id string) (SubscriptionReq, bool) {
61         if v, found := sd.subscriptions.Get(id); found {
62                 mdclog(MdclogInfo, fmt.Sprintf("Subscription id=%s found: %v", id, v.(Subscription).req))
63
64                 return v.(Subscription).req, found
65         }
66         return SubscriptionReq{}, false
67 }
68
69 func (sd *SubscriptionDispatcher) Delete(id string) (SubscriptionReq, bool) {
70         if v, found := sd.subscriptions.Get(id); found {
71                 mdclog(MdclogInfo, fmt.Sprintf("Subscription id=%s found: %v ... deleting", id, v.(Subscription).req))
72
73                 sd.subscriptions.Remove(id)
74                 sd.db.StoreSubscriptions(sd.subscriptions)
75
76                 return v.(Subscription).req, found
77         }
78         return SubscriptionReq{}, false
79 }
80
81 func (sd *SubscriptionDispatcher) Update(id string, sr SubscriptionReq) (SubscriptionReq, bool) {
82         if s, found := sd.subscriptions.Get(id); found {
83                 mdclog(MdclogInfo, fmt.Sprintf("Subscription id=%s found: %v ... updating", id, s.(Subscription).req))
84
85                 sr.Id = id
86                 sd.subscriptions.Set(id, Subscription{sr, s.(Subscription).resp})
87                 sd.db.StoreSubscriptions(sd.subscriptions)
88
89                 return sr, found
90         }
91         return SubscriptionReq{}, false
92 }
93
94 func (sd *SubscriptionDispatcher) Publish(x Xapp, et EventType) {
95         sd.notifyClients([]Xapp{x}, et)
96 }
97
98 func (sd *SubscriptionDispatcher) notifyClients(xapps []Xapp, et EventType) {
99         if len(xapps) == 0 || len(sd.subscriptions) == 0 {
100                 mdclog(MdclogInfo, fmt.Sprintf("Nothing to publish [%d:%d]", len(xapps), len(sd.subscriptions)))
101                 return
102         }
103
104         sd.Seq = sd.Seq + 1
105         for v := range sd.subscriptions.Iter() {
106                 go sd.notify(xapps, et, v.Val.(Subscription), sd.Seq)
107         }
108 }
109
110 func (sd *SubscriptionDispatcher) notify(xapps []Xapp, et EventType, s Subscription, seq int) error {
111         notif := []SubscriptionNotif{}
112         notif = append(notif, SubscriptionNotif{Id: s.req.Id, Version: seq, EventType: string(et), XappData: xapps})
113
114         jsonData, err := json.Marshal(notif)
115         if err != nil {
116                 mdclog(MdclogInfo, fmt.Sprintf("json.Marshal failed: %v", err))
117                 return err
118         }
119
120         // Execute the request with retry policy
121         return sd.retry(s, func() error {
122                 resp, err := http.Post(s.req.TargetUrl, "application/json", bytes.NewBuffer(jsonData))
123                 if err != nil {
124                         mdclog(MdclogInfo, fmt.Sprintf("Posting to subscription failed: %v", err))
125                         return err
126                 }
127
128                 if resp.StatusCode != http.StatusOK {
129                         mdclog(MdclogInfo, fmt.Sprintf("Client returned error code: %d", resp.StatusCode))
130                         return err
131                 }
132
133                 mdclog(MdclogInfo, fmt.Sprintf("subscription to '%s' dispatched, response code: %d \n", s.req.TargetUrl, resp.StatusCode))
134                 return nil
135         })
136 }
137
138 func (sd *SubscriptionDispatcher) retry(s Subscription, fn func() error) error {
139         if err := fn(); err != nil {
140                 // Todo: use exponential backoff, or similar mechanism
141                 if s.req.MaxRetries--; s.req.MaxRetries > 0 {
142                         time.Sleep(time.Duration(s.req.RetryTimer) * time.Second)
143                         return sd.retry(s, fn)
144                 }
145                 sd.subscriptions.Remove(s.req.Id)
146                 return err
147         }
148         return nil
149 }