Refactor setpub-commands, Allow multiple channels
[ric-plt/dbaas.git] / redismodule / src / exstrings.c
index 66f31c2..436ccae 100755 (executable)
@@ -19,6 +19,7 @@
 #include <stdlib.h>
 #include <ctype.h>
 #include <string.h>
+#include <stdbool.h>
 #include "../../redismodule/include/redismodule.h"
 
 #ifdef __UT__
@@ -42,6 +43,42 @@ sends the error to the client and exit the current function if its */
 #define OBJ_OP_IE (1<<4)     /* OP if equal old value */
 #define OBJ_OP_NE (1<<5)     /* OP if not equal old value */
 
+int getKeyType(RedisModuleCtx *ctx, RedisModuleString *key_str)
+{
+    RedisModuleKey *key = RedisModule_OpenKey(ctx, key_str, REDISMODULE_READ);
+    int type = RedisModule_KeyType(key);
+    RedisModule_CloseKey(key);
+    return type;
+}
+
+bool replyContentsEqualString(RedisModuleCallReply *reply, RedisModuleString *expected_value)
+{
+    size_t replylen = 0, expectedlen = 0;
+    const char *expectedval = RedisModule_StringPtrLen(expected_value, &expectedlen);
+    const char *replyval = RedisModule_CallReplyStringPtr(reply, &replylen);
+    return replyval &&
+           expectedlen == replylen &&
+           !strncmp(expectedval, replyval, replylen);
+}
+
+typedef struct _SetParams {
+    RedisModuleString **key_val_pairs;
+    size_t length;
+} SetParams;
+
+typedef struct _PubParams {
+    RedisModuleString **channel_msg_pairs;
+    size_t length;
+} PubParams;
+
+void multiPubCommand(RedisModuleCtx *ctx, PubParams* pubParams)
+{
+    RedisModuleCallReply *reply = NULL;
+    for (unsigned int i = 0 ; i < pubParams->length ; i += 2) {
+        reply = RedisModule_Call(ctx, "PUBLISH", "v", pubParams->channel_msg_pairs + i, 2);
+        RedisModule_FreeCallReply(reply);
+    }
+}
 
 int setStringGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
                                        int argc, const int flag)
@@ -286,125 +323,119 @@ int NDel_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
     return REDISMODULE_OK;
 }
 
-int setPubStringGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
-                                       int argc, const int flag)
+int setPubStringCommon(RedisModuleCtx *ctx, SetParams* setParamsPtr, PubParams* pubParamsPtr)
 {
-    RedisModuleString *oldvalstr = NULL, *channel = NULL, *message = NULL;
-    RedisModuleCallReply *reply = NULL;
-
-    if (flag == OBJ_OP_NO) {
-        if (argc < 5 || (argc % 2) == 0)
-            return RedisModule_WrongArity(ctx);
-        else {
-            channel = argv[argc-2];
-            message = argv[argc-1];
-        }
-    } else if (flag == OBJ_OP_XX || flag == OBJ_OP_NX) {
-        if (argc != 5)
-            return RedisModule_WrongArity(ctx);
-        else {
-            channel = argv[3];
-            message = argv[4];
-        }
+    RedisModuleCallReply *setReply;
+    setReply = RedisModule_Call(ctx, "MSET", "v!", setParamsPtr->key_val_pairs, setParamsPtr->length);
+    ASSERT_NOERROR(setReply)
+    int replytype = RedisModule_CallReplyType(setReply);
+    if (replytype == REDISMODULE_REPLY_NULL) {
+        RedisModule_ReplyWithNull(ctx);
     } else {
-        if (argc != 6)
-            return RedisModule_WrongArity(ctx);
-        else {
-            oldvalstr = argv[3];
-            channel = argv[4];
-            message = argv[5];
-        }
+        multiPubCommand(ctx, pubParamsPtr);
+        RedisModule_ReplyWithCallReply(ctx, setReply);
     }
+    RedisModule_FreeCallReply(setReply);
+    return REDISMODULE_OK;
+}
 
-    /*Check if key type is string*/
-    RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
-        REDISMODULE_READ);
-    int type = RedisModule_KeyType(key);
-    RedisModule_CloseKey(key);
+int SetPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+    if (argc < 5 || (argc % 2) == 0)
+        return RedisModule_WrongArity(ctx);
 
-    if (flag != OBJ_OP_NO) {
-        if (type == REDISMODULE_KEYTYPE_EMPTY) {
-            if (flag == OBJ_OP_IE || flag == OBJ_OP_XX){
-                return RedisModule_ReplyWithNull(ctx);
-            }
-        } else if (flag == OBJ_OP_NX) {
-            return RedisModule_ReplyWithNull(ctx);
-        } else if (type != REDISMODULE_KEYTYPE_STRING) {
-            return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE);
-        }
-    }
+    SetParams setParams = {
+                           .key_val_pairs = argv + 1,
+                           .length = argc - 3
+                          };
+    PubParams pubParams = {
+                           .channel_msg_pairs = argv + argc - 2,
+                           .length = 2
+                          };
 
-    if (flag == OBJ_OP_IE || flag == OBJ_OP_NE) {
-        /*Get the value*/
-        reply = RedisModule_Call(ctx, "GET", "s", argv[1]);
-        ASSERT_NOERROR(reply)
-        size_t curlen = 0, oldvallen = 0;
-        const char *oldval = RedisModule_StringPtrLen(oldvalstr, &oldvallen);
-        const char *curval = RedisModule_CallReplyStringPtr(reply, &curlen);
-        if (((flag == OBJ_OP_IE) &&
-            (!curval || (oldvallen != curlen) || strncmp(oldval, curval, curlen)))
-            ||
-            ((flag == OBJ_OP_NE) && curval && (oldvallen == curlen) &&
-              !strncmp(oldval, curval, curlen))) {
-            RedisModule_FreeCallReply(reply);
-            return RedisModule_ReplyWithNull(ctx);
-        }
-        RedisModule_FreeCallReply(reply);
-    }
+    return setPubStringCommon(ctx, &setParams, &pubParams);
+}
 
+int setIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
+{
+    if (argc < 6 || (argc % 2) != 0)
+        return RedisModule_WrongArity(ctx);
 
-    /* Prepare the arguments for the command. */
-    int i, j=0, cmdargc=argc-3;
-    RedisModuleString *cmdargv[cmdargc];
-    for (i = 1; i < argc-2; i++) {
-        if ((flag == OBJ_OP_IE || flag == OBJ_OP_NE) && (i == 3))
-            continue;
-        cmdargv[j++] = argv[i];
+    SetParams setParams = {
+                           .key_val_pairs = argv + 1,
+                           .length = 2
+                          };
+    PubParams pubParams = {
+                           .channel_msg_pairs = argv + 4,
+                           .length = argc - 4
+                          };
+    RedisModuleString *key = setParams.key_val_pairs[0];
+    RedisModuleString *oldvalstr = argv[3];
+
+    int type = getKeyType(ctx, key);
+    if (flag == OBJ_OP_IE && type == REDISMODULE_KEYTYPE_EMPTY) {
+        return RedisModule_ReplyWithNull(ctx);
+    } else if (type != REDISMODULE_KEYTYPE_STRING && type != REDISMODULE_KEYTYPE_EMPTY) {
+        return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
     }
 
-    /* Call the command and pass back the reply. */
-    reply = RedisModule_Call(ctx, "MSET", "v!", cmdargv, j);
+    RedisModuleCallReply *reply = RedisModule_Call(ctx, "GET", "s", key);
     ASSERT_NOERROR(reply)
-    int replytype = RedisModule_CallReplyType(reply);
-    if (replytype == REDISMODULE_REPLY_NULL) {
-        RedisModule_ReplyWithNull(ctx);
-    }
-    else {
-        cmdargc = 2;
-        cmdargv[0] = channel;
-        cmdargv[1] = message;
-        RedisModuleCallReply *pubreply = RedisModule_Call(ctx, "PUBLISH", "v", cmdargv, cmdargc);
-        RedisModule_FreeCallReply(pubreply);
-        RedisModule_ReplyWithCallReply(ctx, reply);
+    bool is_equal = replyContentsEqualString(reply, oldvalstr);
+    RedisModule_FreeCallReply(reply);
+    if ((flag == OBJ_OP_IE && !is_equal) ||
+        (flag == OBJ_OP_NE && is_equal)) {
+        return RedisModule_ReplyWithNull(ctx);
     }
 
-    RedisModule_FreeCallReply(reply);
-    return REDISMODULE_OK;
+    return setPubStringCommon(ctx, &setParams, &pubParams);
 }
 
-int SetPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+int SetNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
-    return setPubStringGenericCommand(ctx, argv, argc, OBJ_OP_NO);
+    return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE);
 }
 
 int SetIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
-    return setPubStringGenericCommand(ctx, argv, argc, OBJ_OP_IE);
+    return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
 }
 
-int SetNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+int setXXNXPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
 {
-    return setPubStringGenericCommand(ctx, argv, argc, OBJ_OP_NE);
+    if (argc < 5 || (argc % 2) == 0)
+        return RedisModule_WrongArity(ctx);
+
+    SetParams setParams = {
+                           .key_val_pairs = argv + 1,
+                           .length = 2
+                          };
+    PubParams pubParams = {
+                           .channel_msg_pairs = argv + 3,
+                           .length = argc - 3
+                          };
+    RedisModuleString *key = setParams.key_val_pairs[0];
+
+    int type = getKeyType(ctx, key);
+    if ((flag == OBJ_OP_XX && type == REDISMODULE_KEYTYPE_EMPTY) ||
+        (flag == OBJ_OP_NX && type == REDISMODULE_KEYTYPE_STRING)) {
+        return RedisModule_ReplyWithNull(ctx);
+    } else if (type != REDISMODULE_KEYTYPE_STRING && type != REDISMODULE_KEYTYPE_EMPTY) {
+        RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
+        return REDISMODULE_OK;
+    }
+
+    return setPubStringCommon(ctx, &setParams, &pubParams);
 }
 
 int SetNXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
-    return setPubStringGenericCommand(ctx, argv, argc, OBJ_OP_NX);
+    return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_NX);
 }
 
 int SetXXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
-    return setPubStringGenericCommand(ctx, argv, argc, OBJ_OP_XX);
+    return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_XX);
 }
 
 int delPubStringGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv,