X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=cmd%2Fappmgr%2Fsubscriptions.go;fp=cmd%2Fappmgr%2Fsubscriptions.go;h=0000000000000000000000000000000000000000;hb=34e4383c438f87023bc411d9b0baa4a828a7e306;hp=53bc212dbdc73ca34db57caf70e5a7b095f5a880;hpb=4703b1a7457cf072640adbc0f5487a0675f5b6d3;p=ric-plt%2Fappmgr.git diff --git a/cmd/appmgr/subscriptions.go b/cmd/appmgr/subscriptions.go deleted file mode 100755 index 53bc212..0000000 --- a/cmd/appmgr/subscriptions.go +++ /dev/null @@ -1,162 +0,0 @@ -/* -================================================================================== - Copyright (c) 2019 AT&T Intellectual Property. - Copyright (c) 2019 Nokia - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -================================================================================== -*/ - -package main - -import ( - "bytes" - "encoding/json" - "github.com/segmentio/ksuid" - "net/http" - "time" -) - -func (sd *SubscriptionDispatcher) Initialize() { - sd.client = &http.Client{} - - sd.db = &DB{} - sd.db.Create() - sd.subscriptions = sd.db.RestoreSubscriptions() -} - -func (sd *SubscriptionDispatcher) Add(sr SubscriptionReq) (resp SubscriptionResp) { - // Skip duplicates - for v := range sd.subscriptions.IterBuffered() { - r := v.Val.(Subscription).req - if r.TargetUrl == sr.TargetUrl && r.EventType == sr.EventType { - Logger.Info("Similar subscription already exists!") - return - } - } - - key := ksuid.New().String() - resp = SubscriptionResp{key, 0, sr.EventType} - sr.Id = key - - sd.subscriptions.Set(key, Subscription{sr, resp}) - sd.db.StoreSubscriptions(sd.subscriptions) - - Logger.Info("Sub: New subscription added: key=%s value=%v", key, sr) - return -} - -func (sd *SubscriptionDispatcher) GetAll() (hooks []SubscriptionReq) { - hooks = []SubscriptionReq{} - for v := range sd.subscriptions.IterBuffered() { - hooks = append(hooks, v.Val.(Subscription).req) - } - - return hooks -} - -func (sd *SubscriptionDispatcher) Get(id string) (SubscriptionReq, bool) { - if v, found := sd.subscriptions.Get(id); found { - Logger.Info("Subscription id=%s found: %v", id, v.(Subscription).req) - - return v.(Subscription).req, found - } - return SubscriptionReq{}, false -} - -func (sd *SubscriptionDispatcher) Delete(id string) (SubscriptionReq, bool) { - if v, found := sd.subscriptions.Get(id); found { - Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(Subscription).req) - - sd.subscriptions.Remove(id) - sd.db.StoreSubscriptions(sd.subscriptions) - - return v.(Subscription).req, found - } - return SubscriptionReq{}, false -} - -func (sd *SubscriptionDispatcher) Update(id string, sr SubscriptionReq) (SubscriptionReq, bool) { - if s, found := sd.subscriptions.Get(id); found { - Logger.Info("Subscription id=%s found: %v ... updating", id, s.(Subscription).req) - - sr.Id = id - sd.subscriptions.Set(id, Subscription{sr, s.(Subscription).resp}) - sd.db.StoreSubscriptions(sd.subscriptions) - - return sr, found - } - return SubscriptionReq{}, false -} - -func (sd *SubscriptionDispatcher) Publish(x Xapp, et EventType) { - sd.notifyClients([]Xapp{x}, et) -} - -func (sd *SubscriptionDispatcher) notifyClients(xapps []Xapp, et EventType) { - if len(xapps) == 0 || len(sd.subscriptions) == 0 { - Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(sd.subscriptions)) - return - } - - sd.Seq = sd.Seq + 1 - for v := range sd.subscriptions.Iter() { - go sd.notify(xapps, et, v.Val.(Subscription), sd.Seq) - } -} - -func (sd *SubscriptionDispatcher) notify(xapps []Xapp, et EventType, s Subscription, seq int) error { - xappData, err := json.Marshal(xapps) - if err != nil { - Logger.Info("json.Marshal failed: %v", err) - return err - } - - notif := SubscriptionNotif{Id: s.req.Id, Version: seq, EventType: string(et), XApps: string(xappData)} - jsonData, err := json.Marshal(notif) - if err != nil { - Logger.Info("json.Marshal failed: %v", err) - return err - } - - // Execute the request with retry policy - return sd.retry(s, func() error { - Logger.Info("Posting notification to targetUrl=%s: %v", s.req.TargetUrl, notif) - resp, err := http.Post(s.req.TargetUrl, "application/json", bytes.NewBuffer(jsonData)) - if err != nil { - Logger.Info("Posting to subscription failed: %v", err) - return err - } - - if resp.StatusCode != http.StatusOK { - Logger.Info("Client returned error code: %d", resp.StatusCode) - return err - } - - Logger.Info("subscription to '%s' dispatched, response code: %d", s.req.TargetUrl, resp.StatusCode) - return nil - }) -} - -func (sd *SubscriptionDispatcher) retry(s Subscription, fn func() error) error { - if err := fn(); err != nil { - // Todo: use exponential backoff, or similar mechanism - if s.req.MaxRetries--; s.req.MaxRetries > 0 { - time.Sleep(time.Duration(s.req.RetryTimer) * time.Second) - return sd.retry(s, fn) - } - sd.subscriptions.Remove(s.req.Id) - return err - } - return nil -}