import (
"bytes"
"encoding/json"
- "fmt"
"github.com/segmentio/ksuid"
"net/http"
"time"
}
func (sd *SubscriptionDispatcher) Add(sr SubscriptionReq) (resp SubscriptionResp) {
+ // Skip duplicates
+ for v := range sd.subscriptions.IterBuffered() {
+ r := v.Val.(Subscription).req
+ if r.TargetUrl == sr.TargetUrl && r.EventType == sr.EventType {
+ Logger.Info("Similar subscription already exists!")
+ return
+ }
+ }
+
key := ksuid.New().String()
resp = SubscriptionResp{key, 0, sr.EventType}
sr.Id = key
sd.subscriptions.Set(key, Subscription{sr, resp})
sd.db.StoreSubscriptions(sd.subscriptions)
- mdclog(MdclogInfo, fmt.Sprintf("Sub: New subscription added: key=%s value=%v", key, sr))
+ Logger.Info("Sub: New subscription added: key=%s value=%v", key, sr)
return
}
func (sd *SubscriptionDispatcher) Get(id string) (SubscriptionReq, bool) {
if v, found := sd.subscriptions.Get(id); found {
- mdclog(MdclogInfo, fmt.Sprintf("Subscription id=%s found: %v", id, v.(Subscription).req))
+ Logger.Info("Subscription id=%s found: %v", id, v.(Subscription).req)
return v.(Subscription).req, found
}
func (sd *SubscriptionDispatcher) Delete(id string) (SubscriptionReq, bool) {
if v, found := sd.subscriptions.Get(id); found {
- mdclog(MdclogInfo, fmt.Sprintf("Subscription id=%s found: %v ... deleting", id, v.(Subscription).req))
+ Logger.Info("Subscription id=%s found: %v ... deleting", id, v.(Subscription).req)
sd.subscriptions.Remove(id)
sd.db.StoreSubscriptions(sd.subscriptions)
func (sd *SubscriptionDispatcher) Update(id string, sr SubscriptionReq) (SubscriptionReq, bool) {
if s, found := sd.subscriptions.Get(id); found {
- mdclog(MdclogInfo, fmt.Sprintf("Subscription id=%s found: %v ... updating", id, s.(Subscription).req))
+ Logger.Info("Subscription id=%s found: %v ... updating", id, s.(Subscription).req)
sr.Id = id
sd.subscriptions.Set(id, Subscription{sr, s.(Subscription).resp})
func (sd *SubscriptionDispatcher) notifyClients(xapps []Xapp, et EventType) {
if len(xapps) == 0 || len(sd.subscriptions) == 0 {
- mdclog(MdclogInfo, fmt.Sprintf("Nothing to publish [%d:%d]", len(xapps), len(sd.subscriptions)))
+ Logger.Info("Nothing to publish [%d:%d]", len(xapps), len(sd.subscriptions))
return
}
}
func (sd *SubscriptionDispatcher) notify(xapps []Xapp, et EventType, s Subscription, seq int) error {
- notif := []SubscriptionNotif{}
- notif = append(notif, SubscriptionNotif{Id: s.req.Id, Version: seq, EventType: string(et), XappData: xapps})
+ xappData, err := json.Marshal(xapps)
+ if err != nil {
+ Logger.Info("json.Marshal failed: %v", err)
+ return err
+ }
+ notif := SubscriptionNotif{Id: s.req.Id, Version: seq, EventType: string(et), XApps: string(xappData)}
jsonData, err := json.Marshal(notif)
if err != nil {
- mdclog(MdclogInfo, fmt.Sprintf("json.Marshal failed: %v", err))
+ Logger.Info("json.Marshal failed: %v", err)
return err
}
// Execute the request with retry policy
return sd.retry(s, func() error {
+ Logger.Info("Posting notification to targetUrl=%s: %v", s.req.TargetUrl, notif)
resp, err := http.Post(s.req.TargetUrl, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
- mdclog(MdclogInfo, fmt.Sprintf("Posting to subscription failed: %v", err))
+ Logger.Info("Posting to subscription failed: %v", err)
return err
}
if resp.StatusCode != http.StatusOK {
- mdclog(MdclogInfo, fmt.Sprintf("Client returned error code: %d", resp.StatusCode))
+ Logger.Info("Client returned error code: %d", resp.StatusCode)
return err
}
- mdclog(MdclogInfo, fmt.Sprintf("subscription to '%s' dispatched, response code: %d \n", s.req.TargetUrl, resp.StatusCode))
+ Logger.Info("subscription to '%s' dispatched, response code: %d", s.req.TargetUrl, resp.StatusCode)
return nil
})
}