c8db1e6602d8476648e1bb023a19ca6d7556985d
[ric-plt/appmgr.git] / src / subscriptions.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package main
21
22 import (
23     "bytes"
24     "log"
25     "net/http"
26     "encoding/json"
27     "time"
28     "github.com/segmentio/ksuid"
29     cmap "github.com/orcaman/concurrent-map"
30 )
31
32 func (sd *SubscriptionDispatcher) Initialize() {
33     sd.client = &http.Client{}
34     sd.subscriptions = cmap.New()
35 }
36
37 func (sd *SubscriptionDispatcher) Add(sr SubscriptionReq) (resp SubscriptionResp) {
38     key := ksuid.New().String()
39     resp = SubscriptionResp{key, 0, sr.EventType}
40     sr.Id = key
41
42     sd.subscriptions.Set(key, Subscription{sr, resp})
43
44     log.Printf("New subscription added: key=%s value=%v", key, sr)
45     return
46 }
47
48 func (sd *SubscriptionDispatcher) GetAll() (hooks []SubscriptionReq) {
49     hooks = []SubscriptionReq{}
50     for v := range sd.subscriptions.IterBuffered() {
51         hooks = append(hooks, v.Val.(Subscription).req)
52     }
53
54     return hooks
55 }
56
57 func (sd *SubscriptionDispatcher) Get(id string) (SubscriptionReq, bool) {
58     if v, found := sd.subscriptions.Get(id); found {
59         log.Printf("Subscription id=%s found: %v", id, v.(Subscription).req)
60
61         return v.(Subscription).req, found
62     }
63     return SubscriptionReq{}, false
64 }
65
66 func (sd *SubscriptionDispatcher) Delete(id string) (SubscriptionReq, bool) {
67     if v, found := sd.subscriptions.Get(id); found {
68         log.Printf("Subscription id=%s found: %v ... deleting", id, v.(Subscription).req)
69
70         sd.subscriptions.Remove(id)
71         return v.(Subscription).req, found
72     }
73     return SubscriptionReq{}, false
74 }
75
76 func (sd *SubscriptionDispatcher) Update(id string, sr SubscriptionReq) (SubscriptionReq, bool) {
77     if s, found := sd.subscriptions.Get(id); found {
78         log.Printf("Subscription id=%s found: %v ... updating", id, s.(Subscription).req)
79
80         sr.Id = id
81         sd.subscriptions.Set(id, Subscription{sr, s.(Subscription).resp});
82         return sr, found
83     }
84     return SubscriptionReq{}, false
85 }
86
87 func (sd *SubscriptionDispatcher) Publish(x Xapp, et EventType) {
88     for v := range sd.subscriptions.Iter() {
89         go sd.notify(x, et, v.Val.(Subscription))
90     }
91 }
92
93 func (sd *SubscriptionDispatcher) notify(x Xapp, et EventType, s Subscription) error {
94     notif := []SubscriptionNotif{}
95     notif = append(notif, SubscriptionNotif{Id: s.req.Id, Version: s.resp.Version, EventType: string(et), XappData: x})
96
97     jsonData, err := json.Marshal(notif)
98     if err != nil {
99         log.Panic(err)
100     }
101
102     // Execute the request with retry policy
103     return sd.retry(s, func() error {
104         resp, err := http.Post(s.req.TargetUrl, "application/json", bytes.NewBuffer(jsonData))
105         if err != nil {
106             log.Printf("Posting to subscription failed: %v", err)
107             return err
108         }
109
110         if resp.StatusCode != http.StatusOK {
111             log.Printf("Client returned error code: %d", resp.StatusCode)
112             return err
113         }
114
115         log.Printf("subscription to '%s' dispatched, response code: %d \n", s.req.TargetUrl, resp.StatusCode)
116         return nil
117     })
118 }
119
120 func (sd *SubscriptionDispatcher) retry(s Subscription, fn func() error) error {
121     if err := fn(); err != nil {
122         // Todo: use exponential backoff, or similar mechanism
123         if s.req.MaxRetries--; s.req.MaxRetries > 0 {
124             time.Sleep(time.Duration(s.req.RetryTimer) * time.Second)
125             return sd.retry(s, fn)
126         }
127         sd.subscriptions.Remove(s.req.Id);
128         return err
129     }
130     return nil
131 }