return db.client.Close()
}
-func (db *DB) UnsubscribeChannelDB(channels ...string) {
+func (db *DB) UnsubscribeChannelDB(channels ...string) error {
for _, v := range channels {
db.sCbMap.Remove(v)
db.ch.removeChannel <- v
+ errStr := <-db.ch.removeChannel
+ if errStr != "" {
+ return fmt.Errorf("SDL Unsubscribe of channel %s failed: %s", v, errStr)
+ }
if db.sCbMap.Count() == 0 {
db.ch.exit <- true
}
}
+ return nil
}
-func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) {
+func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) error {
if db.sCbMap.Count() == 0 {
- for _, v := range channels {
- db.sCbMap.Add(v, cb)
- }
-
- go func(sCbMap *sharedCbMap,
- ch intChannels,
- channels ...string) {
- sub := db.subscribe(db.ctx, db.client, channels...)
+ go func(sCbMap *sharedCbMap, ch intChannels) {
+ sub := db.subscribe(db.ctx, db.client, "")
rxChannel := sub.Channel()
lCbMap := sCbMap.GetMapCopy()
for {
}
case channel := <-ch.addChannel:
lCbMap = sCbMap.GetMapCopy()
- sub.Subscribe(db.ctx, channel)
+ if err := sub.Subscribe(db.ctx, channel); err != nil {
+ ch.addChannel <- err.Error()
+ } else {
+ ch.addChannel <- ""
+ }
case channel := <-ch.removeChannel:
lCbMap = sCbMap.GetMapCopy()
- sub.Unsubscribe(db.ctx, channel)
+ if err := sub.Unsubscribe(db.ctx, channel); err != nil {
+ ch.removeChannel <- err.Error()
+ } else {
+ ch.removeChannel <- ""
+ }
case exit := <-ch.exit:
if exit {
if err := sub.Close(); err != nil {
}
}
}
- }(db.sCbMap, db.ch, channels...)
-
- } else {
- for _, v := range channels {
- db.sCbMap.Add(v, cb)
- db.ch.addChannel <- v
+ }(db.sCbMap, db.ch)
+ }
+ for _, v := range channels {
+ db.sCbMap.Add(v, cb)
+ db.ch.addChannel <- v
+ errStr := <-db.ch.addChannel
+ if errStr != "" {
+ return fmt.Errorf("SDL Subscribe of channel %s failed: %s", v, errStr)
}
}
+ return nil
}
func (db *DB) MSet(pairs ...interface{}) error {
return &dbState, dbState.Err
}
-func createReplicaDbClient(host string) *DB {
- addr, port, _ := net.SplitHostPort(host)
- return createDbClient(addr, port, "", "", "", newRedisClient, subscribeNotifications, nil)
+func createReplicaDbClient(host string) (*DB, error) {
+ addr, port, err := net.SplitHostPort(host)
+ if err != nil {
+ return nil, err
+ }
+ return createDbClient(addr, port, "", "", "", newRedisClient, subscribeNotifications, nil), err
}
func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) {
+ var err error
dbStatisticsInfo := new(DbStatisticsInfo)
- dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host)
+ dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, err = net.SplitHostPort(host)
+ if err != nil {
+ return nil, err
+ }
info, err := db.Info()
if err != nil {
if dbState.ReplicasDbState != nil {
for _, r := range dbState.ReplicasDbState.States {
- replicaDb := createReplicaDbClient(r.GetAddress())
+ replicaDb, err := createReplicaDbClient(r.GetAddress())
+ if err != nil {
+ return nil, err
+ }
dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress())
- replicaDb.CloseDB()
+ if closeErr := replicaDb.CloseDB(); closeErr != nil {
+ return nil, closeErr
+ }
if err != nil {
return nil, err
}