Refactor delpub-commands, Allow multiple channels
[ric-plt/dbaas.git] / redismodule / src / exstrings.c
index 436ccae..6570889 100755 (executable)
@@ -71,6 +71,11 @@ typedef struct _PubParams {
     size_t length;
 } PubParams;
 
+typedef struct _DelParams {
+    RedisModuleString **keys;
+    size_t length;
+} DelParams;
+
 void multiPubCommand(RedisModuleCtx *ctx, PubParams* pubParams)
 {
     RedisModuleCallReply *reply = NULL;
@@ -356,6 +361,35 @@ int SetPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
     return setPubStringCommon(ctx, &setParams, &pubParams);
 }
 
+int SetMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+    if (argc < 7 || (argc % 2) == 0)
+        return RedisModule_WrongArity(ctx);
+
+    long long setPairsCount, pubPairsCount;
+    RedisModule_StringToLongLong(argv[1], &setPairsCount);
+    RedisModule_StringToLongLong(argv[2], &pubPairsCount);
+    if (setPairsCount < 1 || pubPairsCount < 1)
+        return RedisModule_ReplyWithError(ctx, "ERR SET_PAIR_COUNT and PUB_PAIR_COUNT must be greater than zero");
+
+    long long setLen, pubLen;
+    setLen = 2*setPairsCount;
+    pubLen = 2*pubPairsCount;
+    if (setLen + pubLen + 3 != argc)
+        return RedisModule_ReplyWithError(ctx, "ERR SET_PAIR_COUNT or PUB_PAIR_COUNT do not match the total pair count");
+
+    SetParams setParams = {
+                           .key_val_pairs = argv + 3,
+                           .length = setLen
+                          };
+    PubParams pubParams = {
+                           .channel_msg_pairs = argv + 3 + setParams.length,
+                           .length = pubLen
+                          };
+
+    return setPubStringCommon(ctx, &setParams, &pubParams);
+}
+
 int setIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
 {
     if (argc < 6 || (argc % 2) != 0)
@@ -438,106 +472,83 @@ int SetXXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int arg
     return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_XX);
 }
 
-int delPubStringGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
-                                       int argc, const int flag)
+int delPubStringCommon(RedisModuleCtx *ctx, DelParams *delParamsPtr, PubParams *pubParamsPtr)
 {
-    RedisModuleString *oldvalstr = NULL, *channel = NULL, *message = NULL;
-    RedisModuleCallReply *reply = NULL;
-
-    if (flag == OBJ_OP_NO) {
-        if (argc < 4)
-            return RedisModule_WrongArity(ctx);
-        else {
-            channel = argv[argc-2];
-            message = argv[argc-1];
-        }
-    } else {
-        if (argc != 5)
-            return RedisModule_WrongArity(ctx);
-        else {
-            oldvalstr = argv[2];
-            channel = argv[3];
-            message = argv[4];
-        }
-    }
-
-    if (flag != OBJ_OP_NO) {
-        /*Check if key type is string*/
-        RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
-            REDISMODULE_READ);
-        int type = RedisModule_KeyType(key);
-        RedisModule_CloseKey(key);
-
-        if (type == REDISMODULE_KEYTYPE_EMPTY) {
-            return RedisModule_ReplyWithLongLong(ctx, 0);
-        } else if (type != REDISMODULE_KEYTYPE_STRING) {
-            return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE);
-        }
-    }
-
-    if (flag == OBJ_OP_IE || flag == OBJ_OP_NE) {
-        /*Get the value*/
-        reply = RedisModule_Call(ctx, "GET", "s", argv[1]);
-        ASSERT_NOERROR(reply)
-        size_t curlen = 0, oldvallen = 0;
-        const char *oldval = RedisModule_StringPtrLen(oldvalstr, &oldvallen);
-        const char *curval = RedisModule_CallReplyStringPtr(reply, &curlen);
-        if (((flag == OBJ_OP_IE) &&
-            (!curval || (oldvallen != curlen) || strncmp(oldval, curval, curlen)))
-            ||
-            ((flag == OBJ_OP_NE) && curval && (oldvallen == curlen) &&
-              !strncmp(oldval, curval, curlen))) {
-            RedisModule_FreeCallReply(reply);
-            return RedisModule_ReplyWithLongLong(ctx, 0);
-        }
-        RedisModule_FreeCallReply(reply);
-    }
-
-
-    /* Prepare the arguments for the command. */
-    int i, j=0, cmdargc=argc-3;
-    RedisModuleString *cmdargv[cmdargc];
-    for (i = 1; i < argc-2; i++) {
-        if ((flag == OBJ_OP_IE || flag == OBJ_OP_NE) && (i == 2))
-            continue;
-        cmdargv[j++] = argv[i];
-    }
-
-    /* Call the command and pass back the reply. */
-    reply = RedisModule_Call(ctx, "UNLINK", "v!", cmdargv, j);
+    RedisModuleCallReply *reply = RedisModule_Call(ctx, "UNLINK", "v!", delParamsPtr->keys, delParamsPtr->length);
     ASSERT_NOERROR(reply)
     int replytype = RedisModule_CallReplyType(reply);
     if (replytype == REDISMODULE_REPLY_NULL) {
         RedisModule_ReplyWithNull(ctx);
-    }
-    else if (RedisModule_CallReplyInteger(reply) == 0) {
+    } else if (RedisModule_CallReplyInteger(reply) == 0) {
         RedisModule_ReplyWithCallReply(ctx, reply);
     } else {
-        cmdargc = 2;
-        cmdargv[0] = channel;
-        cmdargv[1] = message;
-        RedisModuleCallReply *pubreply = RedisModule_Call(ctx, "PUBLISH", "v", cmdargv, cmdargc);
-        RedisModule_FreeCallReply(pubreply);
         RedisModule_ReplyWithCallReply(ctx, reply);
+        multiPubCommand(ctx, pubParamsPtr);
     }
-
     RedisModule_FreeCallReply(reply);
     return REDISMODULE_OK;
 }
 
 int DelPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
-   return delPubStringGenericCommand(ctx, argv, argc, OBJ_OP_NO);
+    if (argc < 4)
+        return RedisModule_WrongArity(ctx);
+
+    DelParams delParams = {
+                           .keys = argv + 1,
+                           .length = argc - 3
+                          };
+    PubParams pubParams = {
+                           .channel_msg_pairs = argv + 1 + delParams.length,
+                           .length = 2
+                          };
+
+    return delPubStringCommon(ctx, &delParams, &pubParams);
+}
+
+int delIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
+{
+    if (argc < 5 || (argc % 2) == 0)
+        return RedisModule_WrongArity(ctx);
+
+    DelParams delParams = {
+                           .keys = argv + 1,
+                           .length = 1
+                          };
+    PubParams pubParams = {
+                           .channel_msg_pairs = argv + 3,
+                           .length = argc - 3
+                          };
+    RedisModuleString *key = argv[1];
+    RedisModuleString *oldvalstr = argv[2];
+
+    int type = getKeyType(ctx, key);
+    if (type == REDISMODULE_KEYTYPE_EMPTY) {
+        return RedisModule_ReplyWithLongLong(ctx, 0);
+    } else if (type != REDISMODULE_KEYTYPE_STRING) {
+        return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
+    }
+
+    RedisModuleCallReply *reply = RedisModule_Call(ctx, "GET", "s", key);
+    ASSERT_NOERROR(reply)
+    bool is_equal = replyContentsEqualString(reply, oldvalstr);
+    RedisModule_FreeCallReply(reply);
+    if ((flag == OBJ_OP_IE && !is_equal) ||
+        (flag == OBJ_OP_NE && is_equal)) {
+        return RedisModule_ReplyWithLongLong(ctx, 0);
+    }
+
+    return delPubStringCommon(ctx, &delParams, &pubParams);
 }
 
 int DelIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
-   return delPubStringGenericCommand(ctx, argv, argc, OBJ_OP_IE);
+   return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
 }
 
 int DelNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
-   return delPubStringGenericCommand(ctx, argv, argc, OBJ_OP_NE);
+   return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE);
 }
 
 /* This function must be present on each Redis module. It is used in order to
@@ -577,6 +588,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
         SetPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
         return REDISMODULE_ERR;
 
+    if (RedisModule_CreateCommand(ctx,"msetmpub",
+        SetMPub_RedisCommand,"write deny-oom pubsub",1,1,1) == REDISMODULE_ERR)
+        return REDISMODULE_ERR;
+
     if (RedisModule_CreateCommand(ctx,"setiepub",
         SetIEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
         return REDISMODULE_ERR;