X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=internal%2Fsdlgoredis%2Fsdlgoredis.go;h=8cdae4638b3d9c6621bd90bbfcce4eb68fe8286f;hb=d14c9d8bc5ed846c4b9f05af739eb9fba1978737;hp=20ed0bf18e94853fcc60b6466bcd29646a8b91bc;hpb=1fb0e222e95d5af6df20d922501f585c1ef9fd0b;p=ric-plt%2Fsdlgo.git diff --git a/internal/sdlgoredis/sdlgoredis.go b/internal/sdlgoredis/sdlgoredis.go index 20ed0bf..8cdae46 100644 --- a/internal/sdlgoredis/sdlgoredis.go +++ b/internal/sdlgoredis/sdlgoredis.go @@ -324,26 +324,25 @@ func (db *DB) CloseDB() error { return db.client.Close() } -func (db *DB) UnsubscribeChannelDB(channels ...string) { +func (db *DB) UnsubscribeChannelDB(channels ...string) error { for _, v := range channels { db.sCbMap.Remove(v) db.ch.removeChannel <- v + errStr := <-db.ch.removeChannel + if errStr != "" { + return fmt.Errorf("SDL Unsubscribe of channel %s failed: %s", v, errStr) + } if db.sCbMap.Count() == 0 { db.ch.exit <- true } } + return nil } -func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) { +func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) error { if db.sCbMap.Count() == 0 { - for _, v := range channels { - db.sCbMap.Add(v, cb) - } - - go func(sCbMap *sharedCbMap, - ch intChannels, - channels ...string) { - sub := db.subscribe(db.ctx, db.client, channels...) + go func(sCbMap *sharedCbMap, ch intChannels) { + sub := db.subscribe(db.ctx, db.client, "") rxChannel := sub.Channel() lCbMap := sCbMap.GetMapCopy() for { @@ -356,10 +355,18 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) } case channel := <-ch.addChannel: lCbMap = sCbMap.GetMapCopy() - sub.Subscribe(db.ctx, channel) + if err := sub.Subscribe(db.ctx, channel); err != nil { + ch.addChannel <- err.Error() + } else { + ch.addChannel <- "" + } case channel := <-ch.removeChannel: lCbMap = sCbMap.GetMapCopy() - sub.Unsubscribe(db.ctx, channel) + if err := sub.Unsubscribe(db.ctx, channel); err != nil { + ch.removeChannel <- err.Error() + } else { + ch.removeChannel <- "" + } case exit := <-ch.exit: if exit { if err := sub.Close(); err != nil { @@ -369,14 +376,17 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) } } } - }(db.sCbMap, db.ch, channels...) - - } else { - for _, v := range channels { - db.sCbMap.Add(v, cb) - db.ch.addChannel <- v + }(db.sCbMap, db.ch) + } + for _, v := range channels { + db.sCbMap.Add(v, cb) + db.ch.addChannel <- v + errStr := <-db.ch.addChannel + if errStr != "" { + return fmt.Errorf("SDL Subscribe of channel %s failed: %s", v, errStr) } } + return nil } func (db *DB) MSet(pairs ...interface{}) error {