2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
25 sdl "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
26 cmap "github.com/orcaman/concurrent-map"
27 "github.com/segmentio/ksuid"
31 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr"
32 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/models"
35 func NewResthook(restoreData bool) *Resthook {
37 client: &http.Client{},
38 db: sdl.NewSdlInstance("appmgr", sdl.NewDatabase()),
42 rh.subscriptions = rh.RestoreSubscriptions()
44 rh.subscriptions = cmap.New()
49 func (rh *Resthook) AddSubscription(sr models.SubscriptionRequest) *models.SubscriptionResponse {
50 for v := range rh.subscriptions.IterBuffered() {
51 r := v.Val.(SubscriptionInfo).req
52 if *r.Data.TargetURL == *sr.Data.TargetURL && r.Data.EventType == sr.Data.EventType {
53 appmgr.Logger.Info("Similar subscription already exists!")
54 resp := v.Val.(SubscriptionInfo).resp
59 key := ksuid.New().String()
60 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: sr.Data.EventType}
61 rh.subscriptions.Set(key, SubscriptionInfo{key, sr, resp})
62 rh.StoreSubscriptions(rh.subscriptions)
64 appmgr.Logger.Info("Sub: New subscription added: key=%s targetUl=%s eventType=%s", key, *sr.Data.TargetURL, sr.Data.EventType)
68 func (rh *Resthook) DeleteSubscription(id string) (*models.SubscriptionResponse, bool) {
69 if v, found := rh.subscriptions.Get(id); found {
70 appmgr.Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(SubscriptionInfo).req)
72 rh.subscriptions.Remove(id)
73 rh.StoreSubscriptions(rh.subscriptions)
74 resp := v.(SubscriptionInfo).resp
77 return &models.SubscriptionResponse{}, false
80 func (rh *Resthook) ModifySubscription(id string, req models.SubscriptionRequest) (*models.SubscriptionResponse, bool) {
81 if s, found := rh.subscriptions.Get(id); found {
82 appmgr.Logger.Info("Subscription id=%s found: %v ... updating", id, s.(SubscriptionInfo).req)
84 resp := models.SubscriptionResponse{ID: id, Version: 0, EventType: req.Data.EventType}
85 rh.subscriptions.Set(id, SubscriptionInfo{id, req, resp})
86 rh.StoreSubscriptions(rh.subscriptions)
90 return &models.SubscriptionResponse{}, false
93 func (rh *Resthook) GetAllSubscriptions() (hooks models.AllSubscriptions) {
94 hooks = models.AllSubscriptions{}
95 for v := range rh.subscriptions.IterBuffered() {
96 s := v.Val.(SubscriptionInfo)
97 r := v.Val.(SubscriptionInfo).req
98 hooks = append(hooks, &models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, s.Id})
104 func (rh *Resthook) GetSubscriptionById(id string) (models.Subscription, bool) {
105 if v, found := rh.subscriptions.Get(id); found {
106 appmgr.Logger.Info("Subscription id=%s found: %v", id, v.(SubscriptionInfo).req)
107 r := v.(SubscriptionInfo).req
108 return models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, id}, found
110 return models.Subscription{}, false
113 func (rh *Resthook) PublishSubscription(x models.Xapp, et models.EventType) {
114 rh.NotifyClients(models.AllDeployedXapps{&x}, et)
117 func (rh *Resthook) NotifyClients(xapps models.AllDeployedXapps, et models.EventType) {
118 if len(xapps) == 0 || len(rh.subscriptions) == 0 {
119 appmgr.Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(rh.subscriptions))
124 for v := range rh.subscriptions.Iter() {
125 go rh.notify(xapps, et, v.Val.(SubscriptionInfo), rh.Seq)
129 func (rh *Resthook) notify(xapps models.AllDeployedXapps, et models.EventType, s SubscriptionInfo, seq int64) error {
130 xappData, err := json.Marshal(xapps)
132 appmgr.Logger.Info("json.Marshal failed: %v", err)
136 // TODO: Use models.SubscriptionNotification instead of internal ...
137 notif := SubscriptionNotification{ID: s.Id, Version: seq, Event: string(et), XApps: string(xappData)}
138 jsonData, err := json.Marshal(notif)
140 appmgr.Logger.Info("json.Marshal failed: %v", err)
144 // Execute the request with retry policy
145 return rh.retry(s, func() error {
146 appmgr.Logger.Info("Posting notification to TargetURL=%s: %v", *s.req.Data.TargetURL, notif)
147 resp, err := http.Post(*s.req.Data.TargetURL, "application/json", bytes.NewBuffer(jsonData))
149 appmgr.Logger.Info("Posting to subscription failed: %v", err)
153 if resp.StatusCode != http.StatusOK {
154 appmgr.Logger.Info("Client returned error code: %d", resp.StatusCode)
158 appmgr.Logger.Info("subscription to '%s' dispatched, response code: %d", *s.req.Data.TargetURL, resp.StatusCode)
163 func (rh *Resthook) retry(s SubscriptionInfo, fn func() error) error {
164 if err := fn(); err != nil {
165 // Todo: use exponential backoff, or similar mechanism
166 if *s.req.Data.MaxRetries--; *s.req.Data.MaxRetries > 0 {
167 time.Sleep(time.Duration(*s.req.Data.RetryTimer) * time.Second)
168 return rh.retry(s, fn)
170 rh.subscriptions.Remove(s.Id)
176 func (rh *Resthook) StoreSubscriptions(m cmap.ConcurrentMap) {
177 for v := range m.Iter() {
178 s := v.Val.(SubscriptionInfo)
180 data, err := json.Marshal(s.req)
182 appmgr.Logger.Error("json.marshal failed: %v ", err.Error())
186 if err := rh.db.Set(s.Id, data); err != nil {
187 appmgr.Logger.Error("DB.session.Set failed: %v ", err.Error())
192 func (rh *Resthook) RestoreSubscriptions() (m cmap.ConcurrentMap) {
193 rh.VerifyDBConnection()
196 keys, err := rh.db.GetAll()
198 appmgr.Logger.Error("DB.session.GetAll failed: %v ", err.Error())
202 for _, key := range keys {
203 value, err := rh.db.Get([]string{key})
205 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
209 var item models.SubscriptionRequest
210 if err = json.Unmarshal([]byte(value[key].(string)), &item); err != nil {
211 appmgr.Logger.Error("json.Unmarshal failed: %v ", err.Error())
215 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: item.Data.EventType}
216 m.Set(key, SubscriptionInfo{key, item, resp})
222 func (rh *Resthook) VerifyDBConnection() {
223 // Test DB connection, and wait until ready!
225 if _, err := rh.db.GetAll(); err == nil {
228 appmgr.Logger.Error("Database connection not ready, waiting ...")
229 time.Sleep(time.Duration(5 * time.Second))
233 func (rh *Resthook) FlushSubscriptions() {
235 rh.subscriptions = cmap.New()