2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
25 sdl "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
26 cmap "github.com/orcaman/concurrent-map"
27 "github.com/segmentio/ksuid"
31 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr"
32 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/models"
35 func NewResthook(restoreData bool) *Resthook {
37 client: &http.Client{},
38 db: sdl.NewSdlInstance("appmgr", sdl.NewDatabase()),
42 rh.subscriptions = rh.RestoreSubscriptions()
44 rh.subscriptions = cmap.New()
49 func (rh *Resthook) AddSubscription(sr models.SubscriptionRequest) *models.SubscriptionResponse {
50 for v := range rh.subscriptions.IterBuffered() {
51 r := v.Val.(SubscriptionInfo).req
52 if *r.Data.TargetURL == *sr.Data.TargetURL && r.Data.EventType == sr.Data.EventType {
53 appmgr.Logger.Info("Similar subscription already exists!")
54 return &models.SubscriptionResponse{}
58 key := ksuid.New().String()
59 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: sr.Data.EventType}
60 rh.subscriptions.Set(key, SubscriptionInfo{key, sr, resp})
61 rh.StoreSubscriptions(rh.subscriptions)
63 appmgr.Logger.Info("Sub: New subscription added: key=%s targetUl=%s eventType=%s", key, *sr.Data.TargetURL, sr.Data.EventType)
67 func (rh *Resthook) DeleteSubscription(id string) (*models.SubscriptionResponse, bool) {
68 if v, found := rh.subscriptions.Get(id); found {
69 appmgr.Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(SubscriptionInfo).req)
71 rh.subscriptions.Remove(id)
72 rh.StoreSubscriptions(rh.subscriptions)
73 resp := v.(SubscriptionInfo).resp
76 return &models.SubscriptionResponse{}, false
79 func (rh *Resthook) ModifySubscription(id string, req models.SubscriptionRequest) (*models.SubscriptionResponse, bool) {
80 if s, found := rh.subscriptions.Get(id); found {
81 appmgr.Logger.Info("Subscription id=%s found: %v ... updating", id, s.(SubscriptionInfo).req)
83 resp := models.SubscriptionResponse{ID: id, Version: 0, EventType: req.Data.EventType}
84 rh.subscriptions.Set(id, SubscriptionInfo{id, req, resp})
85 rh.StoreSubscriptions(rh.subscriptions)
89 return &models.SubscriptionResponse{}, false
92 func (rh *Resthook) GetAllSubscriptions() (hooks models.AllSubscriptions) {
93 hooks = models.AllSubscriptions{}
94 for v := range rh.subscriptions.IterBuffered() {
95 s := v.Val.(SubscriptionInfo)
96 r := v.Val.(SubscriptionInfo).req
97 hooks = append(hooks, &models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, s.Id})
103 func (rh *Resthook) GetSubscriptionById(id string) (models.Subscription, bool) {
104 if v, found := rh.subscriptions.Get(id); found {
105 appmgr.Logger.Info("Subscription id=%s found: %v", id, v.(SubscriptionInfo).req)
106 r := v.(SubscriptionInfo).req
107 return models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, id}, found
109 return models.Subscription{}, false
112 func (rh *Resthook) PublishSubscription(x models.Xapp, et models.EventType) {
113 rh.NotifyClients(models.AllDeployedXapps{&x}, et)
116 func (rh *Resthook) NotifyClients(xapps models.AllDeployedXapps, et models.EventType) {
117 if len(xapps) == 0 || len(rh.subscriptions) == 0 {
118 appmgr.Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(rh.subscriptions))
123 for v := range rh.subscriptions.Iter() {
124 go rh.notify(xapps, et, v.Val.(SubscriptionInfo), rh.Seq)
128 func (rh *Resthook) notify(xapps models.AllDeployedXapps, et models.EventType, s SubscriptionInfo, seq int64) error {
129 xappData, err := json.Marshal(xapps)
131 appmgr.Logger.Info("json.Marshal failed: %v", err)
135 // TODO: Use models.SubscriptionNotification instead of internal ...
136 notif := SubscriptionNotification{ID: s.Id, Version: seq, Event: string(et), XApps: string(xappData)}
137 jsonData, err := json.Marshal(notif)
139 appmgr.Logger.Info("json.Marshal failed: %v", err)
143 // Execute the request with retry policy
144 return rh.retry(s, func() error {
145 appmgr.Logger.Info("Posting notification to TargetURL=%s: %v", *s.req.Data.TargetURL, notif)
146 resp, err := http.Post(*s.req.Data.TargetURL, "application/json", bytes.NewBuffer(jsonData))
148 appmgr.Logger.Info("Posting to subscription failed: %v", err)
152 if resp.StatusCode != http.StatusOK {
153 appmgr.Logger.Info("Client returned error code: %d", resp.StatusCode)
157 appmgr.Logger.Info("subscription to '%s' dispatched, response code: %d", *s.req.Data.TargetURL, resp.StatusCode)
162 func (rh *Resthook) retry(s SubscriptionInfo, fn func() error) error {
163 if err := fn(); err != nil {
164 // Todo: use exponential backoff, or similar mechanism
165 if *s.req.Data.MaxRetries--; *s.req.Data.MaxRetries > 0 {
166 time.Sleep(time.Duration(*s.req.Data.RetryTimer) * time.Second)
167 return rh.retry(s, fn)
169 rh.subscriptions.Remove(s.Id)
175 func (rh *Resthook) StoreSubscriptions(m cmap.ConcurrentMap) {
176 for v := range m.Iter() {
177 s := v.Val.(SubscriptionInfo)
179 data, err := json.Marshal(s.req)
181 appmgr.Logger.Error("json.marshal failed: %v ", err.Error())
185 if err := rh.db.Set(s.Id, data); err != nil {
186 appmgr.Logger.Error("DB.session.Set failed: %v ", err.Error())
191 func (rh *Resthook) RestoreSubscriptions() (m cmap.ConcurrentMap) {
192 rh.VerifyDBConnection()
195 keys, err := rh.db.GetAll()
197 appmgr.Logger.Error("DB.session.GetAll failed: %v ", err.Error())
201 for _, key := range keys {
202 value, err := rh.db.Get([]string{key})
204 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
208 var item models.SubscriptionRequest
209 if err = json.Unmarshal([]byte(value[key].(string)), &item); err != nil {
210 appmgr.Logger.Error("json.Unmarshal failed: %v ", err.Error())
214 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: item.Data.EventType}
215 m.Set(key, SubscriptionInfo{key, item, resp})
221 func (rh *Resthook) VerifyDBConnection() {
222 // Test DB connection, and wait until ready!
224 if _, err := rh.db.GetAll(); err == nil {
227 appmgr.Logger.Error("Database connection not ready, waiting ...")
228 time.Sleep(time.Duration(5 * time.Second))
232 func (rh *Resthook) FlushSubscriptions() {
234 rh.subscriptions = cmap.New()