+func (db *DB) UnsubscribeChannelDB(channels ...string) {
+ for _, v := range channels {
+ db.ch.removeChannel <- v
+ delete(db.cbMap, v)
+ if len(db.cbMap) == 0 {
+ db.ch.exit <- true
+ }
+ }
+}
+
+func (db *DB) SubscribeChannelDB(cb ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string) {
+ if len(db.cbMap) == 0 {
+ for _, v := range channels {
+ db.cbMap[v] = cb
+ }
+
+ go func(cbMap *map[string]ChannelNotificationCb,
+ channelPrefix,
+ eventSeparator string,
+ ch intChannels,
+ channels ...string) {
+ sub := db.client.Subscribe(channels...)
+ rxChannel := sub.Channel()
+ for {
+ select {
+ case msg := <-rxChannel:
+ cb, ok := (*cbMap)[msg.Channel]
+ if ok {
+ cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
+ }
+ case channel := <-ch.addChannel:
+ sub.Subscribe(channel)
+ case channel := <-ch.removeChannel:
+ sub.Unsubscribe(channel)
+ case exit := <-ch.exit:
+ if exit {
+ if err := sub.Close(); err != nil {
+ fmt.Println(err)
+ }
+ return
+ }
+ }
+ }
+ }(&db.cbMap, channelPrefix, eventSeparator, db.ch, channels...)
+
+ } else {
+ for _, v := range channels {
+ db.cbMap[v] = cb
+ db.ch.addChannel <- v
+ }
+ }
+}
+