Return service names of xApps instead of IP
[ric-plt/appmgr.git] / cmd / appmgr / subscriptions.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package main
21
22 import (
23         "bytes"
24         "encoding/json"
25         "github.com/segmentio/ksuid"
26         "net/http"
27         "time"
28 )
29
30 func (sd *SubscriptionDispatcher) Initialize() {
31         sd.client = &http.Client{}
32
33         sd.db = &DB{}
34         sd.db.Create()
35         sd.subscriptions = sd.db.RestoreSubscriptions()
36 }
37
38 func (sd *SubscriptionDispatcher) Add(sr SubscriptionReq) (resp SubscriptionResp) {
39         // Skip duplicates
40         for v := range sd.subscriptions.IterBuffered() {
41                 r := v.Val.(Subscription).req
42                 if r.TargetUrl == sr.TargetUrl && r.EventType == sr.EventType {
43                         Logger.Info("Similar subscription already exists!")
44                         return
45                 }
46         }
47
48         key := ksuid.New().String()
49         resp = SubscriptionResp{key, 0, sr.EventType}
50         sr.Id = key
51
52         sd.subscriptions.Set(key, Subscription{sr, resp})
53         sd.db.StoreSubscriptions(sd.subscriptions)
54
55         Logger.Info("Sub: New subscription added: key=%s value=%v", key, sr)
56         return
57 }
58
59 func (sd *SubscriptionDispatcher) GetAll() (hooks []SubscriptionReq) {
60         hooks = []SubscriptionReq{}
61         for v := range sd.subscriptions.IterBuffered() {
62                 hooks = append(hooks, v.Val.(Subscription).req)
63         }
64
65         return hooks
66 }
67
68 func (sd *SubscriptionDispatcher) Get(id string) (SubscriptionReq, bool) {
69         if v, found := sd.subscriptions.Get(id); found {
70                 Logger.Info("Subscription id=%s found: %v", id, v.(Subscription).req)
71
72                 return v.(Subscription).req, found
73         }
74         return SubscriptionReq{}, false
75 }
76
77 func (sd *SubscriptionDispatcher) Delete(id string) (SubscriptionReq, bool) {
78         if v, found := sd.subscriptions.Get(id); found {
79                 Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(Subscription).req)
80
81                 sd.subscriptions.Remove(id)
82                 sd.db.StoreSubscriptions(sd.subscriptions)
83
84                 return v.(Subscription).req, found
85         }
86         return SubscriptionReq{}, false
87 }
88
89 func (sd *SubscriptionDispatcher) Update(id string, sr SubscriptionReq) (SubscriptionReq, bool) {
90         if s, found := sd.subscriptions.Get(id); found {
91                 Logger.Info("Subscription id=%s found: %v ... updating", id, s.(Subscription).req)
92
93                 sr.Id = id
94                 sd.subscriptions.Set(id, Subscription{sr, s.(Subscription).resp})
95                 sd.db.StoreSubscriptions(sd.subscriptions)
96
97                 return sr, found
98         }
99         return SubscriptionReq{}, false
100 }
101
102 func (sd *SubscriptionDispatcher) Publish(x Xapp, et EventType) {
103         sd.notifyClients([]Xapp{x}, et)
104 }
105
106 func (sd *SubscriptionDispatcher) notifyClients(xapps []Xapp, et EventType) {
107         if len(xapps) == 0 || len(sd.subscriptions) == 0 {
108                 Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(sd.subscriptions))
109                 return
110         }
111
112         sd.Seq = sd.Seq + 1
113         for v := range sd.subscriptions.Iter() {
114                 go sd.notify(xapps, et, v.Val.(Subscription), sd.Seq)
115         }
116 }
117
118 func (sd *SubscriptionDispatcher) notify(xapps []Xapp, et EventType, s Subscription, seq int) error {
119         xappData, err := json.Marshal(xapps)
120         if err != nil {
121                 Logger.Info("json.Marshal failed: %v", err)
122                 return err
123         }
124
125         notif := SubscriptionNotif{Id: s.req.Id, Version: seq, EventType: string(et), XApps: string(xappData)}
126         jsonData, err := json.Marshal(notif)
127         if err != nil {
128                 Logger.Info("json.Marshal failed: %v", err)
129                 return err
130         }
131
132         // Execute the request with retry policy
133         return sd.retry(s, func() error {
134                 Logger.Info("Posting notification to targetUrl=%s: %v", s.req.TargetUrl, notif)
135                 resp, err := http.Post(s.req.TargetUrl, "application/json", bytes.NewBuffer(jsonData))
136                 if err != nil {
137                         Logger.Info("Posting to subscription failed: %v", err)
138                         return err
139                 }
140
141                 if resp.StatusCode != http.StatusOK {
142                         Logger.Info("Client returned error code: %d", resp.StatusCode)
143                         return err
144                 }
145
146                 Logger.Info("subscription to '%s' dispatched, response code: %d", s.req.TargetUrl, resp.StatusCode)
147                 return nil
148         })
149 }
150
151 func (sd *SubscriptionDispatcher) retry(s Subscription, fn func() error) error {
152         if err := fn(); err != nil {
153                 // Todo: use exponential backoff, or similar mechanism
154                 if s.req.MaxRetries--; s.req.MaxRetries > 0 {
155                         time.Sleep(time.Duration(s.req.RetryTimer) * time.Second)
156                         return sd.retry(s, fn)
157                 }
158                 sd.subscriptions.Remove(s.req.Id)
159                 return err
160         }
161         return nil
162 }