2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
26 sdl "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
27 cmap "github.com/orcaman/concurrent-map"
28 "github.com/segmentio/ksuid"
33 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr"
34 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/models"
37 //To encapsulate xApp Manager's keys under their own namespace in a DB
39 appmgrSdlNs = "appmgr"
43 func NewResthook(restoreData bool) *Resthook {
44 return createResthook(restoreData, sdl.NewSyncStorage())
47 func createResthook(restoreData bool, sdlInst iSdl) *Resthook {
49 client: &http.Client{},
54 rh.subscriptions = rh.RestoreSubscriptions()
56 rh.subscriptions = cmap.New()
61 func (rh *Resthook) AddSubscription(sr models.SubscriptionRequest) *models.SubscriptionResponse {
62 for v := range rh.subscriptions.IterBuffered() {
63 r := v.Val.(SubscriptionInfo).req
64 if *r.Data.TargetURL == *sr.Data.TargetURL && r.Data.EventType == sr.Data.EventType {
65 appmgr.Logger.Info("Similar subscription already exists!")
66 resp := v.Val.(SubscriptionInfo).resp
71 key := ksuid.New().String()
72 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: sr.Data.EventType}
73 rh.subscriptions.Set(key, SubscriptionInfo{key, sr, resp})
74 rh.StoreSubscriptions(rh.subscriptions)
76 appmgr.Logger.Info("Sub: New subscription added: key=%s targetUl=%s eventType=%s", key, *sr.Data.TargetURL, sr.Data.EventType)
80 func (rh *Resthook) DeleteSubscription(id string) (*models.SubscriptionResponse, bool) {
81 if v, found := rh.subscriptions.Get(id); found {
82 appmgr.Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(SubscriptionInfo).req)
84 rh.subscriptions.Remove(id)
85 rh.StoreSubscriptions(rh.subscriptions)
86 resp := v.(SubscriptionInfo).resp
89 return &models.SubscriptionResponse{}, false
92 func (rh *Resthook) ModifySubscription(id string, req models.SubscriptionRequest) (*models.SubscriptionResponse, bool) {
93 if s, found := rh.subscriptions.Get(id); found {
94 appmgr.Logger.Info("Subscription id=%s found: %v ... updating", id, s.(SubscriptionInfo).req)
96 resp := models.SubscriptionResponse{ID: id, Version: 0, EventType: req.Data.EventType}
97 rh.subscriptions.Set(id, SubscriptionInfo{id, req, resp})
98 rh.StoreSubscriptions(rh.subscriptions)
102 return &models.SubscriptionResponse{}, false
105 func (rh *Resthook) GetAllSubscriptions() (hooks models.AllSubscriptions) {
106 hooks = models.AllSubscriptions{}
107 for v := range rh.subscriptions.IterBuffered() {
108 s := v.Val.(SubscriptionInfo)
109 r := v.Val.(SubscriptionInfo).req
110 hooks = append(hooks, &models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, s.Id})
116 func (rh *Resthook) GetSubscriptionById(id string) (models.Subscription, bool) {
117 if v, found := rh.subscriptions.Get(id); found {
118 appmgr.Logger.Info("Subscription id=%s found: %v", id, v.(SubscriptionInfo).req)
119 r := v.(SubscriptionInfo).req
120 return models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, id}, found
122 return models.Subscription{}, false
125 func (rh *Resthook) PublishSubscription(x models.Xapp, et models.EventType) {
126 rh.NotifyClients(models.AllDeployedXapps{&x}, et)
129 func (rh *Resthook) NotifyClients(xapps models.AllDeployedXapps, et models.EventType) {
130 if len(xapps) == 0 || len(rh.subscriptions) == 0 {
131 appmgr.Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(rh.subscriptions))
136 for v := range rh.subscriptions.Iter() {
137 go rh.notify(xapps, et, v.Val.(SubscriptionInfo), rh.Seq)
141 func (rh *Resthook) notify(xapps models.AllDeployedXapps, et models.EventType, s SubscriptionInfo, seq int64) error {
142 xappData, err := json.Marshal(xapps)
144 appmgr.Logger.Info("json.Marshal failed: %v", err)
148 // TODO: Use models.SubscriptionNotification instead of internal ...
149 notif := SubscriptionNotification{ID: s.Id, Version: seq, Event: string(et), XApps: string(xappData)}
150 jsonData, err := json.Marshal(notif)
152 appmgr.Logger.Info("json.Marshal failed: %v", err)
156 // Execute the request with retry policy
157 return rh.retry(s, func() error {
158 appmgr.Logger.Info("Posting notification to TargetURL=%s: %v", *s.req.Data.TargetURL, notif)
159 resp, err := http.Post(*s.req.Data.TargetURL, "application/json", bytes.NewBuffer(jsonData))
161 appmgr.Logger.Info("Posting to subscription failed: %v", err)
165 if resp.StatusCode != http.StatusOK {
166 appmgr.Logger.Info("Client returned error code: %d", resp.StatusCode)
170 appmgr.Logger.Info("subscription to '%s' dispatched, response code: %d", *s.req.Data.TargetURL, resp.StatusCode)
175 func (rh *Resthook) retry(s SubscriptionInfo, fn func() error) error {
176 if err := fn(); err != nil {
177 // Todo: use exponential backoff, or similar mechanism
178 if *s.req.Data.MaxRetries--; *s.req.Data.MaxRetries > 0 {
179 time.Sleep(time.Duration(*s.req.Data.RetryTimer) * time.Second)
180 return rh.retry(s, fn)
182 rh.subscriptions.Remove(s.Id)
188 func (rh *Resthook) StoreSubscriptions(m cmap.ConcurrentMap) {
189 for v := range m.Iter() {
190 s := v.Val.(SubscriptionInfo)
191 data, err := json.Marshal(s.req)
193 appmgr.Logger.Error("json.marshal failed: %v ", err.Error())
197 if err := rh.db.Set(appmgrSdlNs, s.Id, data); err != nil {
198 appmgr.Logger.Error("DB.session.Set failed: %v ", err.Error())
203 func (rh *Resthook) RestoreSubscriptions() (m cmap.ConcurrentMap) {
204 rh.VerifyDBConnection()
207 keys, err := rh.db.GetAll(appmgrSdlNs)
209 appmgr.Logger.Error("DB.session.GetAll failed: %v ", err.Error())
213 for _, key := range keys {
214 value, err := rh.db.Get(appmgrSdlNs, []string{key})
216 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
220 var item models.SubscriptionRequest
221 if err = json.Unmarshal([]byte(value[key].(string)), &item); err != nil {
222 appmgr.Logger.Error("json.Unmarshal failed: %v ", err.Error())
226 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: item.Data.EventType}
227 m.Set(key, SubscriptionInfo{key, item, resp})
233 func (rh *Resthook) VerifyDBConnection() {
234 // Test DB connection, and wait until ready!
236 if _, err := rh.db.GetAll(appmgrSdlNs); err == nil {
239 appmgr.Logger.Error("Database connection not ready, waiting ...")
240 time.Sleep(time.Duration(5 * time.Second))
244 func (rh *Resthook) FlushSubscriptions() {
245 rh.db.RemoveAll(appmgrSdlNs)
246 rh.subscriptions = cmap.New()
249 func (rh *Resthook) UpdateAppData(params models.RegisterRequest, updateflag bool) {
250 appmgr.Logger.Info("Endpoint to be added in SDL: %s", *params.HTTPEndpoint)
251 if updateflag == false {
255 //Ensure config is empty string, as we dont want to store config in DB
256 if params.Config != "" {
260 value, err := rh.db.Get(appDbSdlNs, []string{"endpoints"})
262 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
266 appmgr.Logger.Info("List of Apps in SDL: %v", value["endpoints"])
267 var appsindb []string
271 if value["endpoints"] != nil {
272 formstring := fmt.Sprintf("%s", value["endpoints"])
273 newstring := strings.Split(formstring, " ")
274 for i, _ := range newstring {
275 if len(newstring) == 1 && strings.Contains(newstring[i], *params.HTTPEndpoint) {
276 appmgr.Logger.Info("Removing Key %s", *params.HTTPEndpoint)
277 rh.db.Remove(appDbSdlNs, []string{"endpoints"})
281 if strings.Contains(newstring[i], *params.HTTPEndpoint) {
282 appmgr.Logger.Info("Removing entry %s", *params.HTTPEndpoint)
286 appsindb = append(appsindb, newstring[i])
287 data = strings.Join(appsindb, " ")
289 rh.db.Set(appDbSdlNs, "endpoints", strings.TrimSpace(data))
293 xappData, err := json.Marshal(params)
295 appmgr.Logger.Info("json.Marshal failed: %v", err)
298 appsindb = append(appsindb, string(xappData))
299 data = strings.Join(appsindb, " ")
300 rh.db.Set(appDbSdlNs, "endpoints", strings.TrimSpace(data))
304 func (rh *Resthook) GetAppsInSDL() *string {
305 value, err := rh.db.Get(appDbSdlNs, []string{"endpoints"})
307 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
310 appmgr.Logger.Info("List of Apps in SDL: %v", value["endpoints"])
311 if value["endpoints"] == nil || value["endpoints"] == "" {
314 apps := fmt.Sprintf("%s", value["endpoints"])