Swagger-baser server REST API interface
[ric-plt/appmgr.git] / cmd / appmgr / subscriptions.go
diff --git a/cmd/appmgr/subscriptions.go b/cmd/appmgr/subscriptions.go
deleted file mode 100755 (executable)
index 53bc212..0000000
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
-==================================================================================
-  Copyright (c) 2019 AT&T Intellectual Property.
-  Copyright (c) 2019 Nokia
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-==================================================================================
-*/
-
-package main
-
-import (
-       "bytes"
-       "encoding/json"
-       "github.com/segmentio/ksuid"
-       "net/http"
-       "time"
-)
-
-func (sd *SubscriptionDispatcher) Initialize() {
-       sd.client = &http.Client{}
-
-       sd.db = &DB{}
-       sd.db.Create()
-       sd.subscriptions = sd.db.RestoreSubscriptions()
-}
-
-func (sd *SubscriptionDispatcher) Add(sr SubscriptionReq) (resp SubscriptionResp) {
-       // Skip duplicates
-       for v := range sd.subscriptions.IterBuffered() {
-               r := v.Val.(Subscription).req
-               if r.TargetUrl == sr.TargetUrl && r.EventType == sr.EventType {
-                       Logger.Info("Similar subscription already exists!")
-                       return
-               }
-       }
-
-       key := ksuid.New().String()
-       resp = SubscriptionResp{key, 0, sr.EventType}
-       sr.Id = key
-
-       sd.subscriptions.Set(key, Subscription{sr, resp})
-       sd.db.StoreSubscriptions(sd.subscriptions)
-
-       Logger.Info("Sub: New subscription added: key=%s value=%v", key, sr)
-       return
-}
-
-func (sd *SubscriptionDispatcher) GetAll() (hooks []SubscriptionReq) {
-       hooks = []SubscriptionReq{}
-       for v := range sd.subscriptions.IterBuffered() {
-               hooks = append(hooks, v.Val.(Subscription).req)
-       }
-
-       return hooks
-}
-
-func (sd *SubscriptionDispatcher) Get(id string) (SubscriptionReq, bool) {
-       if v, found := sd.subscriptions.Get(id); found {
-               Logger.Info("Subscription id=%s found: %v", id, v.(Subscription).req)
-
-               return v.(Subscription).req, found
-       }
-       return SubscriptionReq{}, false
-}
-
-func (sd *SubscriptionDispatcher) Delete(id string) (SubscriptionReq, bool) {
-       if v, found := sd.subscriptions.Get(id); found {
-               Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(Subscription).req)
-
-               sd.subscriptions.Remove(id)
-               sd.db.StoreSubscriptions(sd.subscriptions)
-
-               return v.(Subscription).req, found
-       }
-       return SubscriptionReq{}, false
-}
-
-func (sd *SubscriptionDispatcher) Update(id string, sr SubscriptionReq) (SubscriptionReq, bool) {
-       if s, found := sd.subscriptions.Get(id); found {
-               Logger.Info("Subscription id=%s found: %v ... updating", id, s.(Subscription).req)
-
-               sr.Id = id
-               sd.subscriptions.Set(id, Subscription{sr, s.(Subscription).resp})
-               sd.db.StoreSubscriptions(sd.subscriptions)
-
-               return sr, found
-       }
-       return SubscriptionReq{}, false
-}
-
-func (sd *SubscriptionDispatcher) Publish(x Xapp, et EventType) {
-       sd.notifyClients([]Xapp{x}, et)
-}
-
-func (sd *SubscriptionDispatcher) notifyClients(xapps []Xapp, et EventType) {
-       if len(xapps) == 0 || len(sd.subscriptions) == 0 {
-               Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(sd.subscriptions))
-               return
-       }
-
-       sd.Seq = sd.Seq + 1
-       for v := range sd.subscriptions.Iter() {
-               go sd.notify(xapps, et, v.Val.(Subscription), sd.Seq)
-       }
-}
-
-func (sd *SubscriptionDispatcher) notify(xapps []Xapp, et EventType, s Subscription, seq int) error {
-       xappData, err := json.Marshal(xapps)
-       if err != nil {
-               Logger.Info("json.Marshal failed: %v", err)
-               return err
-       }
-
-       notif := SubscriptionNotif{Id: s.req.Id, Version: seq, EventType: string(et), XApps: string(xappData)}
-       jsonData, err := json.Marshal(notif)
-       if err != nil {
-               Logger.Info("json.Marshal failed: %v", err)
-               return err
-       }
-
-       // Execute the request with retry policy
-       return sd.retry(s, func() error {
-               Logger.Info("Posting notification to targetUrl=%s: %v", s.req.TargetUrl, notif)
-               resp, err := http.Post(s.req.TargetUrl, "application/json", bytes.NewBuffer(jsonData))
-               if err != nil {
-                       Logger.Info("Posting to subscription failed: %v", err)
-                       return err
-               }
-
-               if resp.StatusCode != http.StatusOK {
-                       Logger.Info("Client returned error code: %d", resp.StatusCode)
-                       return err
-               }
-
-               Logger.Info("subscription to '%s' dispatched, response code: %d", s.req.TargetUrl, resp.StatusCode)
-               return nil
-       })
-}
-
-func (sd *SubscriptionDispatcher) retry(s Subscription, fn func() error) error {
-       if err := fn(); err != nil {
-               // Todo: use exponential backoff, or similar mechanism
-               if s.req.MaxRetries--; s.req.MaxRetries > 0 {
-                       time.Sleep(time.Duration(s.req.RetryTimer) * time.Second)
-                       return sd.retry(s, fn)
-               }
-               sd.subscriptions.Remove(s.req.Id)
-               return err
-       }
-       return nil
-}