2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
26 sdl "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
27 cmap "github.com/orcaman/concurrent-map"
28 "github.com/segmentio/ksuid"
33 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr"
34 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/models"
37 func NewResthook(restoreData bool) *Resthook {
38 return createResthook(restoreData, sdl.NewSdlInstance("appmgr", sdl.NewDatabase()),sdl.NewSdlInstance("appdb", sdl.NewDatabase()))
41 func createResthook(restoreData bool, sdlInst iSdl, sdlInst2 iSdl) *Resthook {
43 client: &http.Client{},
49 rh.subscriptions = rh.RestoreSubscriptions()
51 rh.subscriptions = cmap.New()
56 func (rh *Resthook) AddSubscription(sr models.SubscriptionRequest) *models.SubscriptionResponse {
57 for v := range rh.subscriptions.IterBuffered() {
58 r := v.Val.(SubscriptionInfo).req
59 if *r.Data.TargetURL == *sr.Data.TargetURL && r.Data.EventType == sr.Data.EventType {
60 appmgr.Logger.Info("Similar subscription already exists!")
61 resp := v.Val.(SubscriptionInfo).resp
66 key := ksuid.New().String()
67 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: sr.Data.EventType}
68 rh.subscriptions.Set(key, SubscriptionInfo{key, sr, resp})
69 rh.StoreSubscriptions(rh.subscriptions)
71 appmgr.Logger.Info("Sub: New subscription added: key=%s targetUl=%s eventType=%s", key, *sr.Data.TargetURL, sr.Data.EventType)
75 func (rh *Resthook) DeleteSubscription(id string) (*models.SubscriptionResponse, bool) {
76 if v, found := rh.subscriptions.Get(id); found {
77 appmgr.Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(SubscriptionInfo).req)
79 rh.subscriptions.Remove(id)
80 rh.StoreSubscriptions(rh.subscriptions)
81 resp := v.(SubscriptionInfo).resp
84 return &models.SubscriptionResponse{}, false
87 func (rh *Resthook) ModifySubscription(id string, req models.SubscriptionRequest) (*models.SubscriptionResponse, bool) {
88 if s, found := rh.subscriptions.Get(id); found {
89 appmgr.Logger.Info("Subscription id=%s found: %v ... updating", id, s.(SubscriptionInfo).req)
91 resp := models.SubscriptionResponse{ID: id, Version: 0, EventType: req.Data.EventType}
92 rh.subscriptions.Set(id, SubscriptionInfo{id, req, resp})
93 rh.StoreSubscriptions(rh.subscriptions)
97 return &models.SubscriptionResponse{}, false
100 func (rh *Resthook) GetAllSubscriptions() (hooks models.AllSubscriptions) {
101 hooks = models.AllSubscriptions{}
102 for v := range rh.subscriptions.IterBuffered() {
103 s := v.Val.(SubscriptionInfo)
104 r := v.Val.(SubscriptionInfo).req
105 hooks = append(hooks, &models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, s.Id})
111 func (rh *Resthook) GetSubscriptionById(id string) (models.Subscription, bool) {
112 if v, found := rh.subscriptions.Get(id); found {
113 appmgr.Logger.Info("Subscription id=%s found: %v", id, v.(SubscriptionInfo).req)
114 r := v.(SubscriptionInfo).req
115 return models.Subscription{&models.SubscriptionData{r.Data.EventType, r.Data.MaxRetries, r.Data.RetryTimer, r.Data.TargetURL}, id}, found
117 return models.Subscription{}, false
120 func (rh *Resthook) PublishSubscription(x models.Xapp, et models.EventType) {
121 rh.NotifyClients(models.AllDeployedXapps{&x}, et)
124 func (rh *Resthook) NotifyClients(xapps models.AllDeployedXapps, et models.EventType) {
125 if len(xapps) == 0 || len(rh.subscriptions) == 0 {
126 appmgr.Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(rh.subscriptions))
131 for v := range rh.subscriptions.Iter() {
132 go rh.notify(xapps, et, v.Val.(SubscriptionInfo), rh.Seq)
136 func (rh *Resthook) notify(xapps models.AllDeployedXapps, et models.EventType, s SubscriptionInfo, seq int64) error {
137 xappData, err := json.Marshal(xapps)
139 appmgr.Logger.Info("json.Marshal failed: %v", err)
143 // TODO: Use models.SubscriptionNotification instead of internal ...
144 notif := SubscriptionNotification{ID: s.Id, Version: seq, Event: string(et), XApps: string(xappData)}
145 jsonData, err := json.Marshal(notif)
147 appmgr.Logger.Info("json.Marshal failed: %v", err)
151 // Execute the request with retry policy
152 return rh.retry(s, func() error {
153 appmgr.Logger.Info("Posting notification to TargetURL=%s: %v", *s.req.Data.TargetURL, notif)
154 resp, err := http.Post(*s.req.Data.TargetURL, "application/json", bytes.NewBuffer(jsonData))
156 appmgr.Logger.Info("Posting to subscription failed: %v", err)
160 if resp.StatusCode != http.StatusOK {
161 appmgr.Logger.Info("Client returned error code: %d", resp.StatusCode)
165 appmgr.Logger.Info("subscription to '%s' dispatched, response code: %d", *s.req.Data.TargetURL, resp.StatusCode)
170 func (rh *Resthook) retry(s SubscriptionInfo, fn func() error) error {
171 if err := fn(); err != nil {
172 // Todo: use exponential backoff, or similar mechanism
173 if *s.req.Data.MaxRetries--; *s.req.Data.MaxRetries > 0 {
174 time.Sleep(time.Duration(*s.req.Data.RetryTimer) * time.Second)
175 return rh.retry(s, fn)
177 rh.subscriptions.Remove(s.Id)
183 func (rh *Resthook) StoreSubscriptions(m cmap.ConcurrentMap) {
184 for v := range m.Iter() {
185 s := v.Val.(SubscriptionInfo)
186 data, err := json.Marshal(s.req)
188 appmgr.Logger.Error("json.marshal failed: %v ", err.Error())
192 if err := rh.db.Set(s.Id, data); err != nil {
193 appmgr.Logger.Error("DB.session.Set failed: %v ", err.Error())
198 func (rh *Resthook) RestoreSubscriptions() (m cmap.ConcurrentMap) {
199 rh.VerifyDBConnection()
202 keys, err := rh.db.GetAll()
204 appmgr.Logger.Error("DB.session.GetAll failed: %v ", err.Error())
208 for _, key := range keys {
209 value, err := rh.db.Get([]string{key})
211 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
215 var item models.SubscriptionRequest
216 if err = json.Unmarshal([]byte(value[key].(string)), &item); err != nil {
217 appmgr.Logger.Error("json.Unmarshal failed: %v ", err.Error())
221 resp := models.SubscriptionResponse{ID: key, Version: 0, EventType: item.Data.EventType}
222 m.Set(key, SubscriptionInfo{key, item, resp})
228 func (rh *Resthook) VerifyDBConnection() {
229 // Test DB connection, and wait until ready!
231 if _, err := rh.db.GetAll(); err == nil {
234 appmgr.Logger.Error("Database connection not ready, waiting ...")
235 time.Sleep(time.Duration(5 * time.Second))
239 func (rh *Resthook) FlushSubscriptions() {
241 rh.subscriptions = cmap.New()
244 func (rh *Resthook) UpdateAppData(params models.RegisterRequest, updateflag bool) {
245 appmgr.Logger.Info("Endpoint to be added in SDL: %s", *params.HTTPEndpoint)
246 if updateflag == false {
250 value, err := rh.db2.Get([]string{"endpoints"})
252 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
256 appmgr.Logger.Info("List of Apps in SDL: %v", value["endpoints"])
257 var appsindb []string
261 if value["endpoints"] != nil {
262 formstring := fmt.Sprintf("%s", value["endpoints"])
263 newstring := strings.Split(formstring, " ")
264 for i, _ := range newstring {
265 if len(newstring) == 1 && strings.Contains(newstring[i], *params.HTTPEndpoint) {
266 appmgr.Logger.Info("Removing Key %s", *params.HTTPEndpoint)
267 rh.db2.Remove([]string{"endpoints"})
271 if strings.Contains(newstring[i], *params.HTTPEndpoint) {
272 appmgr.Logger.Info("Removing entry %s", *params.HTTPEndpoint)
276 appsindb = append(appsindb, newstring[i])
277 data = strings.Join(appsindb, " ")
279 rh.db2.Set("endpoints", strings.TrimSpace(data))
283 xappData, err := json.Marshal(params)
285 appmgr.Logger.Info("json.Marshal failed: %v", err)
288 appsindb = append(appsindb, string(xappData))
289 data = strings.Join(appsindb, " ")
290 rh.db2.Set("endpoints", strings.TrimSpace(data))
294 func (rh *Resthook) GetAppsInSDL() *string {
295 value, err := rh.db2.Get([]string{"endpoints"})
297 appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
300 appmgr.Logger.Info("List of Apps in SDL: %v", value["endpoints"])
301 if value["endpoints"] == nil || value["endpoints"] == "" {
304 apps := fmt.Sprintf("%s", value["endpoints"])