2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
25 sdl "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
26 cmap "github.com/orcaman/concurrent-map"
27 "github.com/segmentio/ksuid"
31 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr"
32 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/models"
35 func NewResthook(restoreData bool) *Resthook {
36 return createResthook(restoreData, sdl.NewSdlInstance("appmgr", sdl.NewDatabase()))
39 func createResthook(restoreData bool, sdlInst iSdl) *Resthook {
41 client: &http.Client{},
46 rh.subscriptions = rh.RestoreSubscriptions()
48 rh.subscriptions = cmap.New()
53 func (rh *Resthook) AddSubscription(sr models.SubscriptionRequest) *models.SubscriptionResponse {
54 for v := range rh.subscriptions.IterBuffered() {
55 r := v.Val.(SubscriptionInfo).req
56 if *r.Data.TargetURL == *sr.Data.TargetURL && r.Data.EventType == sr.Data.EventType {
57 appmgr.Logger.Info("Similar subscription already exists!")
58 resp := v.Val.(SubscriptionInfo).resp
63 key := ksuid.New().String()
64 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: sr.Data.EventType}
65 rh.subscriptions.Set(key, SubscriptionInfo{key, sr, resp})
66 rh.StoreSubscriptions(rh.subscriptions)
68 appmgr.Logger.Info("Sub: New subscription added: key=%s targetUl=%s eventType=%s", key, *sr.Data.TargetURL, sr.Data.EventType)
72 func (rh *Resthook) DeleteSubscription(id string) (*models.SubscriptionResponse, bool) {
73 if v, found := rh.subscriptions.Get(id); found {
74 appmgr.Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(SubscriptionInfo).req)
76 rh.subscriptions.Remove(id)
77 rh.StoreSubscriptions(rh.subscriptions)
78 resp := v.(SubscriptionInfo).resp
81 return &models.SubscriptionResponse{}, false
84 func (rh *Resthook) ModifySubscription(id string, req models.SubscriptionRequest) (*models.SubscriptionResponse, bool) {
85 if s, found := rh.subscriptions.Get(id); found {
86 appmgr.Logger.Info("Subscription id=%s found: %v ... updating", id, s.(SubscriptionInfo).req)
88 resp := models.SubscriptionResponse{ID: id, Version: 0, EventType: req.Data.EventType}
89 rh.subscriptions.Set(id, SubscriptionInfo{id, req, resp})
90 rh.StoreSubscriptions(rh.subscriptions)
94 return &models.SubscriptionResponse{}, false
97 func (rh *Resthook) GetAllSubscriptions() (hooks models.AllSubscriptions) {
98 hooks = models.AllSubscriptions{}
99 for v := range rh.subscriptions.IterBuffered() {
100 s := v.Val.(SubscriptionInfo)
101 r := v.Val.(SubscriptionInfo).req
102 hooks = append(hooks, &models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, s.Id})
108 func (rh *Resthook) GetSubscriptionById(id string) (models.Subscription, bool) {
109 if v, found := rh.subscriptions.Get(id); found {
110 appmgr.Logger.Info("Subscription id=%s found: %v", id, v.(SubscriptionInfo).req)
111 r := v.(SubscriptionInfo).req
112 return models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, id}, found
114 return models.Subscription{}, false
117 func (rh *Resthook) PublishSubscription(x models.Xapp, et models.EventType) {
118 rh.NotifyClients(models.AllDeployedXapps{&x}, et)
121 func (rh *Resthook) NotifyClients(xapps models.AllDeployedXapps, et models.EventType) {
122 if len(xapps) == 0 || len(rh.subscriptions) == 0 {
123 appmgr.Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(rh.subscriptions))
128 for v := range rh.subscriptions.Iter() {
129 go rh.notify(xapps, et, v.Val.(SubscriptionInfo), rh.Seq)
133 func (rh *Resthook) notify(xapps models.AllDeployedXapps, et models.EventType, s SubscriptionInfo, seq int64) error {
134 xappData, err := json.Marshal(xapps)
136 appmgr.Logger.Info("json.Marshal failed: %v", err)
140 // TODO: Use models.SubscriptionNotification instead of internal ...
141 notif := SubscriptionNotification{ID: s.Id, Version: seq, Event: string(et), XApps: string(xappData)}
142 jsonData, err := json.Marshal(notif)
144 appmgr.Logger.Info("json.Marshal failed: %v", err)
148 // Execute the request with retry policy
149 return rh.retry(s, func() error {
150 appmgr.Logger.Info("Posting notification to TargetURL=%s: %v", *s.req.Data.TargetURL, notif)
151 resp, err := http.Post(*s.req.Data.TargetURL, "application/json", bytes.NewBuffer(jsonData))
153 appmgr.Logger.Info("Posting to subscription failed: %v", err)
157 if resp.StatusCode != http.StatusOK {
158 appmgr.Logger.Info("Client returned error code: %d", resp.StatusCode)
162 appmgr.Logger.Info("subscription to '%s' dispatched, response code: %d", *s.req.Data.TargetURL, resp.StatusCode)
167 func (rh *Resthook) retry(s SubscriptionInfo, fn func() error) error {
168 if err := fn(); err != nil {
169 // Todo: use exponential backoff, or similar mechanism
170 if *s.req.Data.MaxRetries--; *s.req.Data.MaxRetries > 0 {
171 time.Sleep(time.Duration(*s.req.Data.RetryTimer) * time.Second)
172 return rh.retry(s, fn)
174 rh.subscriptions.Remove(s.Id)
180 func (rh *Resthook) StoreSubscriptions(m cmap.ConcurrentMap) {
181 for v := range m.Iter() {
182 s := v.Val.(SubscriptionInfo)
183 data, err := json.Marshal(s.req)
185 appmgr.Logger.Error("json.marshal failed: %v ", err.Error())
189 if err := rh.db.Set(s.Id, data); err != nil {
190 appmgr.Logger.Error("DB.session.Set failed: %v ", err.Error())
195 func (rh *Resthook) RestoreSubscriptions() (m cmap.ConcurrentMap) {
196 rh.VerifyDBConnection()
199 keys, err := rh.db.GetAll()
201 appmgr.Logger.Error("DB.session.GetAll failed: %v ", err.Error())
205 for _, key := range keys {
206 value, err := rh.db.Get([]string{key})
208 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
212 var item models.SubscriptionRequest
213 if err = json.Unmarshal([]byte(value[key].(string)), &item); err != nil {
214 appmgr.Logger.Error("json.Unmarshal failed: %v ", err.Error())
218 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: item.Data.EventType}
219 m.Set(key, SubscriptionInfo{key, item, resp})
225 func (rh *Resthook) VerifyDBConnection() {
226 // Test DB connection, and wait until ready!
228 if _, err := rh.db.GetAll(); err == nil {
231 appmgr.Logger.Error("Database connection not ready, waiting ...")
232 time.Sleep(time.Duration(5 * time.Second))
236 func (rh *Resthook) FlushSubscriptions() {
238 rh.subscriptions = cmap.New()