2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
25 sdl "gerrit.oran-osc.org/r/ric-plt/sdlgo"
26 cmap "github.com/orcaman/concurrent-map"
27 "github.com/segmentio/ksuid"
31 "gerrit.oran-osc.org/r/ric-plt/appmgr/pkg/appmgr"
32 "gerrit.oran-osc.org/r/ric-plt/appmgr/pkg/models"
35 func NewResthook() *Resthook {
37 client: &http.Client{},
38 db: sdl.NewSdlInstance("appmgr", sdl.NewDatabase()),
41 rh.subscriptions = rh.RestoreSubscriptions()
45 func (rh *Resthook) AddSubscription(sr models.SubscriptionRequest) *models.SubscriptionResponse {
46 for v := range rh.subscriptions.IterBuffered() {
47 r := v.Val.(SubscriptionInfo).req
48 if *r.Data.TargetURL == *sr.Data.TargetURL && r.Data.EventType == sr.Data.EventType {
49 appmgr.Logger.Info("Similar subscription already exists!")
50 return &models.SubscriptionResponse{}
54 key := ksuid.New().String()
55 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: sr.Data.EventType}
56 rh.subscriptions.Set(key, SubscriptionInfo{key, sr, resp})
57 rh.StoreSubscriptions(rh.subscriptions)
59 appmgr.Logger.Info("Sub: New subscription added: key=%s targetUl=%s eventType=%s", key, *sr.Data.TargetURL, sr.Data.EventType)
63 func (rh *Resthook) DeleteSubscription(id string) (*models.SubscriptionResponse, bool) {
64 if v, found := rh.subscriptions.Get(id); found {
65 appmgr.Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(SubscriptionInfo).req)
67 rh.subscriptions.Remove(id)
68 rh.StoreSubscriptions(rh.subscriptions)
69 resp := v.(SubscriptionInfo).resp
72 return &models.SubscriptionResponse{}, false
75 func (rh *Resthook) ModifySubscription(id string, req models.SubscriptionRequest) (*models.SubscriptionResponse, bool) {
76 if s, found := rh.subscriptions.Get(id); found {
77 appmgr.Logger.Info("Subscription id=%s found: %v ... updating", id, s.(SubscriptionInfo).req)
79 resp := models.SubscriptionResponse{ID: id, Version: 0, EventType: req.Data.EventType}
80 rh.subscriptions.Set(id, SubscriptionInfo{id, req, resp})
81 rh.StoreSubscriptions(rh.subscriptions)
85 return &models.SubscriptionResponse{}, false
88 func (rh *Resthook) GetAllSubscriptions() (hooks models.AllSubscriptions) {
89 hooks = models.AllSubscriptions{}
90 for v := range rh.subscriptions.IterBuffered() {
91 s := v.Val.(SubscriptionInfo)
92 r := v.Val.(SubscriptionInfo).req
93 hooks = append(hooks, &models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, s.Id})
99 func (rh *Resthook) GetSubscriptionById(id string) (models.Subscription, bool) {
100 if v, found := rh.subscriptions.Get(id); found {
101 appmgr.Logger.Info("Subscription id=%s found: %v", id, v.(SubscriptionInfo).req)
102 r := v.(SubscriptionInfo).req
103 return models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, id}, found
105 return models.Subscription{}, false
108 func (rh *Resthook) PublishSubscription(x models.Xapp, et models.EventType) {
109 rh.NotifyClients(models.AllDeployedXapps{&x}, et)
112 func (rh *Resthook) NotifyClients(xapps models.AllDeployedXapps, et models.EventType) {
113 if len(xapps) == 0 || len(rh.subscriptions) == 0 {
114 appmgr.Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(rh.subscriptions))
119 for v := range rh.subscriptions.Iter() {
120 go rh.notify(xapps, et, v.Val.(SubscriptionInfo), rh.Seq)
124 func (rh *Resthook) notify(xapps models.AllDeployedXapps, et models.EventType, s SubscriptionInfo, seq int64) error {
125 notif := models.SubscriptionNotification{ID: s.Id, Version: seq, EventType: et, XApps: xapps}
126 jsonData, err := json.Marshal(notif)
128 appmgr.Logger.Info("json.Marshal failed: %v", err)
132 // Execute the request with retry policy
133 return rh.retry(s, func() error {
134 appmgr.Logger.Info("Posting notification to TargetURL=%s: %v", *s.req.Data.TargetURL, notif)
135 resp, err := http.Post(*s.req.Data.TargetURL, "application/json", bytes.NewBuffer(jsonData))
137 appmgr.Logger.Info("Posting to subscription failed: %v", err)
141 if resp.StatusCode != http.StatusOK {
142 appmgr.Logger.Info("Client returned error code: %d", resp.StatusCode)
146 appmgr.Logger.Info("subscription to '%s' dispatched, response code: %d", *s.req.Data.TargetURL, resp.StatusCode)
151 func (rh *Resthook) retry(s SubscriptionInfo, fn func() error) error {
152 if err := fn(); err != nil {
153 // Todo: use exponential backoff, or similar mechanism
154 if *s.req.Data.MaxRetries--; *s.req.Data.MaxRetries > 0 {
155 time.Sleep(time.Duration(*s.req.Data.RetryTimer) * time.Second)
156 return rh.retry(s, fn)
158 rh.subscriptions.Remove(s.Id)
164 func (rh *Resthook) StoreSubscriptions(m cmap.ConcurrentMap) {
165 for v := range m.Iter() {
166 s := v.Val.(SubscriptionInfo)
168 data, err := json.Marshal(s.req)
170 appmgr.Logger.Error("json.marshal failed: %v ", err.Error())
174 if err := rh.db.Set(s.Id, data); err != nil {
175 appmgr.Logger.Error("DB.session.Set failed: %v ", err.Error())
180 func (rh *Resthook) RestoreSubscriptions() (m cmap.ConcurrentMap) {
181 rh.VerifyDBConnection()
184 keys, err := rh.db.GetAll()
186 appmgr.Logger.Error("DB.session.GetAll failed: %v ", err.Error())
190 for _, key := range keys {
191 value, err := rh.db.Get([]string{key})
193 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
197 var item models.SubscriptionRequest
198 if err = json.Unmarshal([]byte(value[key].(string)), &item); err != nil {
199 appmgr.Logger.Error("json.Unmarshal failed: %v ", err.Error())
203 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: item.Data.EventType}
204 m.Set(key, SubscriptionInfo{key, item, resp})
210 func (rh *Resthook) VerifyDBConnection() {
211 // Test DB connection, and wait until ready!
213 if _, err := rh.db.GetAll(); err == nil {
216 appmgr.Logger.Error("Database connection not ready, waiting ...")
217 time.Sleep(time.Duration(5 * time.Second))
221 func (rh *Resthook) FlushSubscriptions() {
223 rh.subscriptions = cmap.New()