Handle Subscribe() and Unsubscribe() error
[ric-plt/sdlgo.git] / internal / sdlgoredis / sdlgoredis.go
index 20ed0bf..8cdae46 100644 (file)
@@ -324,26 +324,25 @@ func (db *DB) CloseDB() error {
        return db.client.Close()
 }
 
-func (db *DB) UnsubscribeChannelDB(channels ...string) {
+func (db *DB) UnsubscribeChannelDB(channels ...string) error {
        for _, v := range channels {
                db.sCbMap.Remove(v)
                db.ch.removeChannel <- v
+               errStr := <-db.ch.removeChannel
+               if errStr != "" {
+                       return fmt.Errorf("SDL Unsubscribe of channel %s failed: %s", v, errStr)
+               }
                if db.sCbMap.Count() == 0 {
                        db.ch.exit <- true
                }
        }
+       return nil
 }
 
-func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) {
+func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) error {
        if db.sCbMap.Count() == 0 {
-               for _, v := range channels {
-                       db.sCbMap.Add(v, cb)
-               }
-
-               go func(sCbMap *sharedCbMap,
-                       ch intChannels,
-                       channels ...string) {
-                       sub := db.subscribe(db.ctx, db.client, channels...)
+               go func(sCbMap *sharedCbMap, ch intChannels) {
+                       sub := db.subscribe(db.ctx, db.client, "")
                        rxChannel := sub.Channel()
                        lCbMap := sCbMap.GetMapCopy()
                        for {
@@ -356,10 +355,18 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string)
                                        }
                                case channel := <-ch.addChannel:
                                        lCbMap = sCbMap.GetMapCopy()
-                                       sub.Subscribe(db.ctx, channel)
+                                       if err := sub.Subscribe(db.ctx, channel); err != nil {
+                                               ch.addChannel <- err.Error()
+                                       } else {
+                                               ch.addChannel <- ""
+                                       }
                                case channel := <-ch.removeChannel:
                                        lCbMap = sCbMap.GetMapCopy()
-                                       sub.Unsubscribe(db.ctx, channel)
+                                       if err := sub.Unsubscribe(db.ctx, channel); err != nil {
+                                               ch.removeChannel <- err.Error()
+                                       } else {
+                                               ch.removeChannel <- ""
+                                       }
                                case exit := <-ch.exit:
                                        if exit {
                                                if err := sub.Close(); err != nil {
@@ -369,14 +376,17 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string)
                                        }
                                }
                        }
-               }(db.sCbMap, db.ch, channels...)
-
-       } else {
-               for _, v := range channels {
-                       db.sCbMap.Add(v, cb)
-                       db.ch.addChannel <- v
+               }(db.sCbMap, db.ch)
+       }
+       for _, v := range channels {
+               db.sCbMap.Add(v, cb)
+               db.ch.addChannel <- v
+               errStr := <-db.ch.addChannel
+               if errStr != "" {
+                       return fmt.Errorf("SDL Subscribe of channel %s failed: %s", v, errStr)
                }
        }
+       return nil
 }
 
 func (db *DB) MSet(pairs ...interface{}) error {