Bump base image to Redis 6.2.6 Alpine 3.15
[ric-plt/dbaas.git] / redismodule / src / exstrings.c
index ef3cf5d..11102d3 100755 (executable)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2018-2019 Nokia.
+ * Copyright (c) 2018-2020 Nokia.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
  *   limitations under the License.
  */
 
+/*
+ * This source code is part of the near-RT RIC (RAN Intelligent Controller)
+ * platform project (RICP).
+ */
+
 #include "redismodule.h"
-#include <stdio.h>
+#include <pthread.h>
+#include <stdbool.h>
 #include <stdlib.h>
-#include <ctype.h>
 #include <string.h>
-#include <stdbool.h>
-#include "../../redismodule/include/redismodule.h"
+#include <strings.h>
 
 #ifdef __UT__
 #include "exstringsStub.h"
+#include "commonStub.h"
 #endif
 
+
 /* make sure the response is not NULL or an error.
 sends the error to the client and exit the current function if its */
 #define  ASSERT_NOERROR(r) \
     if (r == NULL) { \
         return RedisModule_ReplyWithError(ctx,"ERR reply is NULL"); \
     } else if (RedisModule_CallReplyType(r) == REDISMODULE_REPLY_ERROR) { \
-        RedisModule_ReplyWithCallReply(ctx,r); \
-        RedisModule_FreeCallReply(r); \
-        return REDISMODULE_ERR; \
+        return RedisModule_ReplyWithCallReply(ctx,r); \
     }
 
 #define OBJ_OP_NO 0
@@ -43,6 +47,38 @@ sends the error to the client and exit the current function if its */
 #define OBJ_OP_IE (1<<4)     /* OP if equal old value */
 #define OBJ_OP_NE (1<<5)     /* OP if not equal old value */
 
+#define DEF_COUNT     50
+#define ZERO          0
+#define MATCH_STR     "MATCH"
+#define COUNT_STR     "COUNT"
+#define SCANARGC      5
+
+RedisModuleString *def_count_str = NULL, *match_str = NULL, *count_str = NULL, *zero_str = NULL;
+
+typedef struct _NgetArgs {
+    RedisModuleString *key;
+    RedisModuleString *count;
+} NgetArgs;
+
+typedef struct RedisModuleBlockedClientArgs {
+    RedisModuleBlockedClient *bc;
+    NgetArgs nget_args;
+} RedisModuleBlockedClientArgs;
+
+void InitStaticVariable()
+{
+    if (def_count_str == NULL)
+        def_count_str = RedisModule_CreateStringFromLongLong(NULL, DEF_COUNT);
+    if (match_str == NULL)
+        match_str = RedisModule_CreateString(NULL, MATCH_STR, sizeof(MATCH_STR));
+    if (count_str == NULL)
+        count_str = RedisModule_CreateString(NULL, COUNT_STR, sizeof(COUNT_STR));
+    if (zero_str == NULL)
+        zero_str = RedisModule_CreateStringFromLongLong(NULL, ZERO);
+
+    return;
+}
+
 int getKeyType(RedisModuleCtx *ctx, RedisModuleString *key_str)
 {
     RedisModuleKey *key = RedisModule_OpenKey(ctx, key_str, REDISMODULE_READ);
@@ -71,6 +107,170 @@ typedef struct _PubParams {
     size_t length;
 } PubParams;
 
+typedef struct _DelParams {
+    RedisModuleString **keys;
+    size_t length;
+} DelParams;
+
+typedef enum _ExstringsStatus {
+    EXSTRINGS_STATUS_NO_ERRORS = 0,
+    EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT,
+    EXSTRINGS_STATUS_NOT_SET
+} ExstringsStatus;
+
+void readNgetArgs(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
+                  NgetArgs* nget_args, ExstringsStatus* status)
+{
+    size_t str_len;
+    long long number;
+
+    if(argc == 2) {
+        nget_args->key = argv[1];
+        nget_args->count = def_count_str;
+    } else if (argc == 4) {
+        if (strcasecmp(RedisModule_StringPtrLen(argv[2], &str_len), "count")) {
+            RedisModule_ReplyWithError(ctx,"-ERR syntax error");
+            *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
+            return;
+        }
+
+        int ret = RedisModule_StringToLongLong(argv[3], &number) != REDISMODULE_OK;
+        if (ret != REDISMODULE_OK || number < 1) {
+            RedisModule_ReplyWithError(ctx,"-ERR value is not an integer or out of range");
+            *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
+            return;
+        }
+
+        nget_args->key = argv[1];
+        nget_args->count = argv[3];
+    } else {
+        /* In redis there is a bug (or undocumented feature see link)
+         * where calling 'RedisModule_WrongArity'
+         * within a blocked client will crash redis.
+         *
+         * Therefore we need to call this function to validate args
+         * before putting the client into blocking mode.
+         *
+         * Link to issue:
+         * https://github.com/antirez/redis/issues/6382
+         * 'If any thread tries to access the command arguments from
+         *  within the ThreadSafeContext they will crash redis' */
+        RedisModule_WrongArity(ctx);
+        *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
+        return;
+    }
+
+    *status = EXSTRINGS_STATUS_NO_ERRORS;
+    return;
+}
+
+long long callReplyLongLong(RedisModuleCallReply* reply)
+{
+    const char* cursor_str_ptr = RedisModule_CallReplyStringPtr(reply, NULL);
+    return strtoll(cursor_str_ptr, NULL, 10);
+}
+
+void forwardIfError(RedisModuleCtx *ctx, RedisModuleCallReply *reply, ExstringsStatus* status)
+{
+    if (RedisModule_CallReplyType(reply) == REDISMODULE_REPLY_ERROR) {
+        RedisModule_ReplyWithCallReply(ctx, reply);
+        RedisModule_FreeCallReply(reply);
+        *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
+    }
+    *status = EXSTRINGS_STATUS_NO_ERRORS;
+}
+
+typedef struct _ScannedKeys {
+    RedisModuleString **keys;
+    size_t len;
+} ScannedKeys;
+
+ScannedKeys* allocScannedKeys(size_t len)
+{
+    ScannedKeys *sk = RedisModule_Alloc(sizeof(ScannedKeys));
+    if (sk) {
+        sk->len = len;
+        sk->keys = RedisModule_Alloc(sizeof(RedisModuleString *)*len);
+    }
+    return sk;
+}
+
+void freeScannedKeys(RedisModuleCtx *ctx, ScannedKeys* sk)
+{
+    if (sk) {
+        size_t j;
+        for (j = 0; j < sk->len; j++)
+            RedisModule_FreeString(ctx, sk->keys[j]);
+        RedisModule_Free(sk->keys);
+    }
+    RedisModule_Free(sk);
+}
+
+typedef struct _ScanSomeState {
+    RedisModuleString *key;
+    RedisModuleString *count;
+    long long cursor;
+} ScanSomeState;
+
+ScannedKeys *scanSome(RedisModuleCtx* ctx, ScanSomeState* state, ExstringsStatus* status)
+{
+    RedisModuleString *scanargv[SCANARGC] = {NULL};
+
+    scanargv[0] = RedisModule_CreateStringFromLongLong(ctx, state->cursor);
+    scanargv[1] = match_str;
+    scanargv[2] = state->key;
+    scanargv[3] = count_str;
+    scanargv[4] = state->count;
+
+    RedisModuleCallReply *reply;
+    reply = RedisModule_Call(ctx, "SCAN", "v", scanargv, SCANARGC);
+    RedisModule_FreeString(ctx, scanargv[0]);
+    forwardIfError(ctx, reply, status);
+    if (*status == EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT)
+        return NULL;
+
+    state->cursor = callReplyLongLong(RedisModule_CallReplyArrayElement(reply, 0));
+    RedisModuleCallReply *cr_keys =
+        RedisModule_CallReplyArrayElement(reply, 1);
+
+    size_t scanned_keys_len = RedisModule_CallReplyLength(cr_keys);
+    if (scanned_keys_len == 0) {
+        RedisModule_FreeCallReply(reply);
+        *status = EXSTRINGS_STATUS_NO_ERRORS;
+        return NULL;
+    }
+
+    ScannedKeys *scanned_keys = allocScannedKeys(scanned_keys_len);
+    if (scanned_keys == NULL) {
+        RedisModule_FreeCallReply(reply);
+        RedisModule_ReplyWithError(ctx,"-ERR Out of memory");
+        *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
+        return NULL;
+    }
+
+    scanned_keys->len = scanned_keys_len;
+    size_t j;
+    for (j = 0; j < scanned_keys_len; j++) {
+        RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(cr_keys,j));
+        scanned_keys->keys[j] = rms;
+    }
+    RedisModule_FreeCallReply(reply);
+    *status = EXSTRINGS_STATUS_NO_ERRORS;
+    return scanned_keys;
+}
+
+inline void unlockThreadsafeContext(RedisModuleCtx *ctx, bool using_threadsafe_context)
+{
+    if (using_threadsafe_context)
+        RedisModule_ThreadSafeContextUnlock(ctx);
+}
+
+inline void lockThreadsafeContext(RedisModuleCtx *ctx, bool using_threadsafe_context)
+{
+    if (using_threadsafe_context)
+        RedisModule_ThreadSafeContextLock(ctx);
+}
+
 void multiPubCommand(RedisModuleCtx *ctx, PubParams* pubParams)
 {
     RedisModuleCallReply *reply = NULL;
@@ -142,11 +342,13 @@ int setStringGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
 
 int SetIE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
+    RedisModule_AutoMemory(ctx);
     return setStringGenericCommand(ctx, argv, argc, OBJ_OP_IE);
 }
 
 int SetNE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
+    RedisModule_AutoMemory(ctx);
     return setStringGenericCommand(ctx, argv, argc, OBJ_OP_NE);
 }
 
@@ -205,136 +407,22 @@ int delStringGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
 
 int DelIE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
+    RedisModule_AutoMemory(ctx);
     return delStringGenericCommand(ctx, argv, argc, OBJ_OP_IE);
 }
 
 int DelNE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
+    RedisModule_AutoMemory(ctx);
     return delStringGenericCommand(ctx, argv, argc, OBJ_OP_NE);
 }
-
-int NGet_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
-{
-    RedisModuleCallReply *reply = NULL;
-
-    if (argc != 2)
-        return RedisModule_WrongArity(ctx);
-
-    /* Call the command to get keys with pattern. */
-    reply = RedisModule_Call(ctx, "KEYS", "s", argv[1]);
-    ASSERT_NOERROR(reply)
-
-    /* Prepare the arguments for the command. */
-    size_t items = RedisModule_CallReplyLength(reply);
-    if (items == 0) {
-        //RedisModule_ReplyWithArray(ctx, items);
-        RedisModule_ReplyWithCallReply(ctx, reply);
-        RedisModule_FreeCallReply(reply);
-    }
-    else {
-        RedisModuleString *cmdargv[items];
-        size_t i=0, j;
-        for (j = 0; j < items; j++) {
-           RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(reply, j));
-           cmdargv[i++] = rms;
-
-           /*Assume all keys via SDL is string type for sake of saving time*/
-#if 0
-           /*Check if key type is string*/
-           RedisModuleKey *key = RedisModule_OpenKey(ctx, rms ,REDISMODULE_READ);
-
-           if (key) {
-               int type = RedisModule_KeyType(key);
-               RedisModule_CloseKey(key);
-               if (type == REDISMODULE_KEYTYPE_STRING) {
-                   cmdargv[i++] = rms;
-               }
-           } else {
-               RedisModule_CloseKey(key);
-           }
-#endif
-        }
-        RedisModule_FreeCallReply(reply);
-
-        reply = RedisModule_Call(ctx, "MGET", "v", cmdargv, i);
-        ASSERT_NOERROR(reply)
-        items = RedisModule_CallReplyLength(reply);
-        RedisModule_ReplyWithArray(ctx, i*2);
-        for (j = 0; (j<items && j<i); j++) {
-           RedisModule_ReplyWithString(ctx, cmdargv[j]);
-           RedisModule_ReplyWithString(ctx, RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(reply, j)));
-        }
-
-        RedisModule_FreeCallReply(reply);
-    }
-
-    return REDISMODULE_OK;
-}
-
-int NDel_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
-{
-    RedisModuleCallReply *reply = NULL;
-
-    if (argc != 2)
-        return RedisModule_WrongArity(ctx);
-
-    /* Call the command to get keys with pattern. */
-    reply = RedisModule_Call(ctx, "KEYS", "s", argv[1]);
-    ASSERT_NOERROR(reply)
-
-    /* Prepare the arguments for the command. */
-    size_t items = RedisModule_CallReplyLength(reply);
-    if (items == 0) {
-        RedisModule_ReplyWithLongLong(ctx, 0);
-        RedisModule_FreeCallReply(reply);
-    }
-    else {
-        RedisModuleString *cmdargv[items];
-        size_t i=0, j;
-        for (j = 0; j < items; j++) {
-           RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(reply, j));
-           cmdargv[i++] = rms;
-
-           /*Assume all keys via SDL is string type for sake of saving time*/
-#if 0
-           //Check if key type is string
-           RedisModuleKey *key = RedisModule_OpenKey(ctx, rms ,REDISMODULE_READ);
-
-           if (key) {
-               int type = RedisModule_KeyType(key);
-               RedisModule_CloseKey(key);
-               if (type == REDISMODULE_KEYTYPE_STRING) {
-                   cmdargv[i++] = rms;
-               }
-           } else {
-               RedisModule_CloseKey(key);
-           }
-#endif
-        }
-        RedisModule_FreeCallReply(reply);
-
-        reply = RedisModule_Call(ctx, "UNLINK", "v!", cmdargv, i);
-        ASSERT_NOERROR(reply)
-        RedisModule_ReplyWithCallReply(ctx, reply);
-        RedisModule_FreeCallReply(reply);
-
-    }
-
-    return REDISMODULE_OK;
-}
-
 int setPubStringCommon(RedisModuleCtx *ctx, SetParams* setParamsPtr, PubParams* pubParamsPtr)
 {
     RedisModuleCallReply *setReply;
     setReply = RedisModule_Call(ctx, "MSET", "v!", setParamsPtr->key_val_pairs, setParamsPtr->length);
     ASSERT_NOERROR(setReply)
-    int replytype = RedisModule_CallReplyType(setReply);
-    if (replytype == REDISMODULE_REPLY_NULL) {
-        RedisModule_ReplyWithNull(ctx);
-    } else {
-        multiPubCommand(ctx, pubParamsPtr);
-        RedisModule_ReplyWithCallReply(ctx, setReply);
-    }
+    multiPubCommand(ctx, pubParamsPtr);
+    RedisModule_ReplyWithCallReply(ctx, setReply);
     RedisModule_FreeCallReply(setReply);
     return REDISMODULE_OK;
 }
@@ -344,12 +432,13 @@ int SetPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
     if (argc < 5 || (argc % 2) == 0)
         return RedisModule_WrongArity(ctx);
 
+    RedisModule_AutoMemory(ctx);
     SetParams setParams = {
                            .key_val_pairs = argv + 1,
                            .length = argc - 3
                           };
     PubParams pubParams = {
-                           .channel_msg_pairs = argv + argc - 2,
+                           .channel_msg_pairs = argv + 1 + setParams.length,
                            .length = 2
                           };
 
@@ -361,6 +450,7 @@ int SetMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
     if (argc < 7 || (argc % 2) == 0)
         return RedisModule_WrongArity(ctx);
 
+    RedisModule_AutoMemory(ctx);
     long long setPairsCount, pubPairsCount;
     RedisModule_StringToLongLong(argv[1], &setPairsCount);
     RedisModule_StringToLongLong(argv[2], &pubPairsCount);
@@ -370,6 +460,7 @@ int SetMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
     long long setLen, pubLen;
     setLen = 2*setPairsCount;
     pubLen = 2*pubPairsCount;
+
     if (setLen + pubLen + 3 != argc)
         return RedisModule_ReplyWithError(ctx, "ERR SET_PAIR_COUNT or PUB_PAIR_COUNT do not match the total pair count");
 
@@ -387,9 +478,6 @@ int SetMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
 
 int setIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
 {
-    if (argc < 6 || (argc % 2) != 0)
-        return RedisModule_WrongArity(ctx);
-
     SetParams setParams = {
                            .key_val_pairs = argv + 1,
                            .length = 2
@@ -420,21 +508,35 @@ int setIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int ar
     return setPubStringCommon(ctx, &setParams, &pubParams);
 }
 
-int SetNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+int SetIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
-    return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE);
+    if (argc != 6)
+        return RedisModule_WrongArity(ctx);
+
+    RedisModule_AutoMemory(ctx);
+    return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
 }
 
-int SetIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+int SetIEMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
+    if (argc < 6 || (argc % 2) != 0)
+        return RedisModule_WrongArity(ctx);
+
+    RedisModule_AutoMemory(ctx);
     return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
 }
 
-int setXXNXPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
+int SetNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
-    if (argc < 5 || (argc % 2) == 0)
+    if (argc != 6)
         return RedisModule_WrongArity(ctx);
 
+    RedisModule_AutoMemory(ctx);
+    return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE);
+}
+
+int setXXNXPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
+{
     SetParams setParams = {
                            .key_val_pairs = argv + 1,
                            .length = 2
@@ -459,114 +561,336 @@ int setXXNXPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int ar
 
 int SetNXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
+    if (argc != 5)
+        return RedisModule_WrongArity(ctx);
+
+    RedisModule_AutoMemory(ctx);
     return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_NX);
 }
 
-int SetXXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+int SetNXMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
-    return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_XX);
+    if (argc < 5 || (argc % 2) == 0)
+        return RedisModule_WrongArity(ctx);
+
+    RedisModule_AutoMemory(ctx);
+    return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_NX);
 }
 
-int delPubStringGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
-                                       int argc, const int flag)
+int SetXXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
-    RedisModuleString *oldvalstr = NULL, *channel = NULL, *message = NULL;
-    RedisModuleCallReply *reply = NULL;
-
-    if (flag == OBJ_OP_NO) {
-        if (argc < 4)
-            return RedisModule_WrongArity(ctx);
-        else {
-            channel = argv[argc-2];
-            message = argv[argc-1];
-        }
-    } else {
-        if (argc != 5)
-            return RedisModule_WrongArity(ctx);
-        else {
-            oldvalstr = argv[2];
-            channel = argv[3];
-            message = argv[4];
-        }
-    }
-
-    if (flag != OBJ_OP_NO) {
-        /*Check if key type is string*/
-        RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
-            REDISMODULE_READ);
-        int type = RedisModule_KeyType(key);
-        RedisModule_CloseKey(key);
-
-        if (type == REDISMODULE_KEYTYPE_EMPTY) {
-            return RedisModule_ReplyWithLongLong(ctx, 0);
-        } else if (type != REDISMODULE_KEYTYPE_STRING) {
-            return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE);
-        }
-    }
-
-    if (flag == OBJ_OP_IE || flag == OBJ_OP_NE) {
-        /*Get the value*/
-        reply = RedisModule_Call(ctx, "GET", "s", argv[1]);
-        ASSERT_NOERROR(reply)
-        size_t curlen = 0, oldvallen = 0;
-        const char *oldval = RedisModule_StringPtrLen(oldvalstr, &oldvallen);
-        const char *curval = RedisModule_CallReplyStringPtr(reply, &curlen);
-        if (((flag == OBJ_OP_IE) &&
-            (!curval || (oldvallen != curlen) || strncmp(oldval, curval, curlen)))
-            ||
-            ((flag == OBJ_OP_NE) && curval && (oldvallen == curlen) &&
-              !strncmp(oldval, curval, curlen))) {
-            RedisModule_FreeCallReply(reply);
-            return RedisModule_ReplyWithLongLong(ctx, 0);
-        }
-        RedisModule_FreeCallReply(reply);
-    }
-
+    if (argc != 5)
+        return RedisModule_WrongArity(ctx);
 
-    /* Prepare the arguments for the command. */
-    int i, j=0, cmdargc=argc-3;
-    RedisModuleString *cmdargv[cmdargc];
-    for (i = 1; i < argc-2; i++) {
-        if ((flag == OBJ_OP_IE || flag == OBJ_OP_NE) && (i == 2))
-            continue;
-        cmdargv[j++] = argv[i];
-    }
+    RedisModule_AutoMemory(ctx);
+    return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_XX);
+}
 
-    /* Call the command and pass back the reply. */
-    reply = RedisModule_Call(ctx, "UNLINK", "v!", cmdargv, j);
+int delPubStringCommon(RedisModuleCtx *ctx, DelParams *delParamsPtr, PubParams *pubParamsPtr)
+{
+    RedisModuleCallReply *reply = RedisModule_Call(ctx, "UNLINK", "v!", delParamsPtr->keys, delParamsPtr->length);
     ASSERT_NOERROR(reply)
     int replytype = RedisModule_CallReplyType(reply);
     if (replytype == REDISMODULE_REPLY_NULL) {
         RedisModule_ReplyWithNull(ctx);
-    }
-    else if (RedisModule_CallReplyInteger(reply) == 0) {
+    } else if (RedisModule_CallReplyInteger(reply) == 0) {
         RedisModule_ReplyWithCallReply(ctx, reply);
     } else {
-        cmdargc = 2;
-        cmdargv[0] = channel;
-        cmdargv[1] = message;
-        RedisModuleCallReply *pubreply = RedisModule_Call(ctx, "PUBLISH", "v", cmdargv, cmdargc);
-        RedisModule_FreeCallReply(pubreply);
         RedisModule_ReplyWithCallReply(ctx, reply);
+        multiPubCommand(ctx, pubParamsPtr);
     }
-
     RedisModule_FreeCallReply(reply);
     return REDISMODULE_OK;
 }
 
 int DelPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
-   return delPubStringGenericCommand(ctx, argv, argc, OBJ_OP_NO);
+    if (argc < 4)
+        return RedisModule_WrongArity(ctx);
+
+    RedisModule_AutoMemory(ctx);
+    DelParams delParams = {
+                           .keys = argv + 1,
+                           .length = argc - 3
+                          };
+    PubParams pubParams = {
+                           .channel_msg_pairs = argv + 1 + delParams.length,
+                           .length = 2
+                          };
+
+    return delPubStringCommon(ctx, &delParams, &pubParams);
+}
+
+int DelMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+    if (argc < 6)
+        return RedisModule_WrongArity(ctx);
+
+    RedisModule_AutoMemory(ctx);
+    long long delCount, pubPairsCount;
+    RedisModule_StringToLongLong(argv[1], &delCount);
+    RedisModule_StringToLongLong(argv[2], &pubPairsCount);
+    if (delCount < 1 || pubPairsCount < 1)
+        return RedisModule_ReplyWithError(ctx, "ERR DEL_COUNT and PUB_PAIR_COUNT must be greater than zero");
+
+    long long delLen, pubLen;
+    delLen = delCount;
+    pubLen = 2*pubPairsCount;
+    if (delLen + pubLen + 3 != argc)
+        return RedisModule_ReplyWithError(ctx, "ERR DEL_COUNT or PUB_PAIR_COUNT do not match the total pair count");
+
+    DelParams delParams = {
+                           .keys = argv + 3,
+                           .length = delLen
+                          };
+    PubParams pubParams = {
+                           .channel_msg_pairs = argv + 3 + delParams.length,
+                           .length = pubLen
+                          };
+
+    return delPubStringCommon(ctx, &delParams, &pubParams);
+}
+
+int delIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
+{
+    DelParams delParams = {
+                           .keys = argv + 1,
+                           .length = 1
+                          };
+    PubParams pubParams = {
+                           .channel_msg_pairs = argv + 3,
+                           .length = argc - 3
+                          };
+    RedisModuleString *key = argv[1];
+    RedisModuleString *oldvalstr = argv[2];
+
+    int type = getKeyType(ctx, key);
+    if (type == REDISMODULE_KEYTYPE_EMPTY) {
+        return RedisModule_ReplyWithLongLong(ctx, 0);
+    } else if (type != REDISMODULE_KEYTYPE_STRING) {
+        return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
+    }
+
+    RedisModuleCallReply *reply = RedisModule_Call(ctx, "GET", "s", key);
+    ASSERT_NOERROR(reply)
+    bool is_equal = replyContentsEqualString(reply, oldvalstr);
+    RedisModule_FreeCallReply(reply);
+    if ((flag == OBJ_OP_IE && !is_equal) ||
+        (flag == OBJ_OP_NE && is_equal)) {
+        return RedisModule_ReplyWithLongLong(ctx, 0);
+    }
+
+    return delPubStringCommon(ctx, &delParams, &pubParams);
 }
 
 int DelIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
-   return delPubStringGenericCommand(ctx, argv, argc, OBJ_OP_IE);
+    if (argc != 5)
+        return RedisModule_WrongArity(ctx);
+
+    RedisModule_AutoMemory(ctx);
+    return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
+}
+
+int DelIEMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+    if (argc < 5 || (argc % 2) == 0)
+        return RedisModule_WrongArity(ctx);
+
+    RedisModule_AutoMemory(ctx);
+    return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
 }
 
 int DelNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
-   return delPubStringGenericCommand(ctx, argv, argc, OBJ_OP_NE);
+    if (argc != 5)
+        return RedisModule_WrongArity(ctx);
+
+    RedisModule_AutoMemory(ctx);
+    return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE);
+}
+
+int Nget_RedisCommand(RedisModuleCtx *ctx, NgetArgs* nget_args, bool using_threadsafe_context)
+{
+    int ret = REDISMODULE_OK;
+    size_t replylen = 0;
+    RedisModuleCallReply *reply = NULL;
+    ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
+    ScanSomeState scan_state;
+    ScannedKeys *scanned_keys;
+
+    scan_state.key = nget_args->key;
+    scan_state.count = nget_args->count;
+    scan_state.cursor = 0;
+
+    RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
+    do {
+        lockThreadsafeContext(ctx, using_threadsafe_context);
+
+        status = EXSTRINGS_STATUS_NOT_SET;
+        scanned_keys = scanSome(ctx, &scan_state, &status);
+
+        if (status != EXSTRINGS_STATUS_NO_ERRORS) {
+            unlockThreadsafeContext(ctx, using_threadsafe_context);
+            ret = REDISMODULE_ERR;
+            break;
+        } else if (scanned_keys == NULL) {
+            unlockThreadsafeContext(ctx, using_threadsafe_context);
+            continue;
+        }
+
+        reply = RedisModule_Call(ctx, "MGET", "v", scanned_keys->keys, scanned_keys->len);
+
+        unlockThreadsafeContext(ctx, using_threadsafe_context);
+
+        status = EXSTRINGS_STATUS_NOT_SET;
+        forwardIfError(ctx, reply, &status);
+        if (status != EXSTRINGS_STATUS_NO_ERRORS) {
+            freeScannedKeys(ctx, scanned_keys);
+            ret = REDISMODULE_ERR;
+            break;
+        }
+
+        size_t i;
+        for (i = 0; i < scanned_keys->len; i++) {
+            RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(reply, i));
+            if (rms) {
+                RedisModule_ReplyWithString(ctx, scanned_keys->keys[i]);
+                RedisModule_ReplyWithString(ctx, rms);
+                RedisModule_FreeString(ctx, rms);
+                replylen += 2;
+            }
+        }
+        RedisModule_FreeCallReply(reply);
+        freeScannedKeys(ctx, scanned_keys);
+    } while (scan_state.cursor != 0);
+
+    RedisModule_ReplySetArrayLength(ctx,replylen);
+    return ret;
+}
+
+/* The thread entry point that actually executes the blocking part
+ * of the command nget.noatomic
+ */
+void *NGet_NoAtomic_ThreadMain(void *arg)
+{
+    pthread_detach(pthread_self());
+
+    RedisModuleBlockedClientArgs *bca = arg;
+    RedisModuleBlockedClient *bc = bca->bc;
+    RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
+
+    Nget_RedisCommand(ctx, &bca->nget_args, true);
+    RedisModule_FreeThreadSafeContext(ctx);
+    RedisModule_UnblockClient(bc, NULL);
+    RedisModule_Free(bca);
+    return NULL;
+}
+
+int NGet_NoAtomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+    RedisModule_AutoMemory(ctx);
+    pthread_t tid;
+
+    InitStaticVariable();
+
+    RedisModuleBlockedClientArgs *bca = RedisModule_Alloc(sizeof(RedisModuleBlockedClientArgs));
+    if (bca == NULL) {
+        RedisModule_ReplyWithError(ctx,"-ERR Out of memory");
+        return REDISMODULE_ERR;
+    }
+
+    ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
+    readNgetArgs(ctx, argv, argc, &bca->nget_args, &status);
+    if (status != EXSTRINGS_STATUS_NO_ERRORS) {
+        RedisModule_Free(bca);
+        return REDISMODULE_ERR;
+    }
+
+    /* Note that when blocking the client we do not set any callback: no
+     * timeout is possible since we passed '0', nor we need a reply callback
+     * because we'll use the thread safe context to accumulate a reply. */
+    RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);
+
+    bca->bc = bc;
+
+    /* Now that we setup a blocking client, we need to pass the control
+     * to the thread. However we need to pass arguments to the thread:
+     * the reference to the blocked client handle. */
+    if (pthread_create(&tid,NULL,NGet_NoAtomic_ThreadMain,bca) != 0) {
+        RedisModule_AbortBlock(bc);
+        RedisModule_Free(bca);
+        return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
+    }
+
+    return REDISMODULE_OK;
+}
+
+int NGet_Atomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+    RedisModule_AutoMemory(ctx);
+    NgetArgs nget_args;
+    ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
+
+    InitStaticVariable();
+
+    readNgetArgs(ctx, argv, argc, &nget_args, &status);
+    if (status != EXSTRINGS_STATUS_NO_ERRORS) {
+        return REDISMODULE_ERR;
+    }
+
+    return Nget_RedisCommand(ctx, &nget_args, false);
+}
+
+int NDel_Atomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+    RedisModule_AutoMemory(ctx);
+    int ret = REDISMODULE_OK;
+    long long replylen = 0;
+    RedisModuleCallReply *reply = NULL;
+    ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
+    ScanSomeState scan_state;
+    ScannedKeys *scanned_keys = NULL;
+
+    InitStaticVariable();
+    if (argc != 2)
+        return RedisModule_WrongArity(ctx);
+
+    scan_state.key = argv[1];
+    scan_state.count = def_count_str;
+    scan_state.cursor = 0;
+
+    do {
+        status = EXSTRINGS_STATUS_NOT_SET;
+        scanned_keys = scanSome(ctx, &scan_state, &status);
+
+        if (status != EXSTRINGS_STATUS_NO_ERRORS) {
+            ret = REDISMODULE_ERR;
+            break;
+        } else if (scanned_keys == NULL) {
+            continue;
+        }
+
+        reply = RedisModule_Call(ctx, "UNLINK", "v!", scanned_keys->keys, scanned_keys->len);
+
+        status = EXSTRINGS_STATUS_NOT_SET;
+        forwardIfError(ctx, reply, &status);
+        if (status != EXSTRINGS_STATUS_NO_ERRORS) {
+            freeScannedKeys(ctx, scanned_keys);
+            ret = REDISMODULE_ERR;
+            break;
+        }
+
+        replylen += RedisModule_CallReplyInteger(reply);
+        RedisModule_FreeCallReply(reply);
+        freeScannedKeys(ctx, scanned_keys);
+    } while (scan_state.cursor != 0);
+
+    if (ret == REDISMODULE_OK) {
+        RedisModule_ReplyWithLongLong(ctx, replylen);
+    }
+
+    return ret;
 }
 
 /* This function must be present on each Redis module. It is used in order to
@@ -594,12 +918,16 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
         DelNE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
         return REDISMODULE_ERR;
 
-    if (RedisModule_CreateCommand(ctx,"nget",
-        NGet_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
+    if (RedisModule_CreateCommand(ctx,"nget.atomic",
+        NGet_Atomic_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
         return REDISMODULE_ERR;
 
-    if (RedisModule_CreateCommand(ctx,"ndel",
-        NDel_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
+    if (RedisModule_CreateCommand(ctx,"nget.noatomic",
+        NGet_NoAtomic_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
+        return REDISMODULE_ERR;
+
+    if (RedisModule_CreateCommand(ctx,"ndel.atomic",
+        NDel_Atomic_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
         return REDISMODULE_ERR;
 
     if (RedisModule_CreateCommand(ctx,"msetpub",
@@ -614,6 +942,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
         SetIEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
         return REDISMODULE_ERR;
 
+    if (RedisModule_CreateCommand(ctx,"setiempub",
+        SetIEMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
+        return REDISMODULE_ERR;
+
     if (RedisModule_CreateCommand(ctx,"setnepub",
         SetNEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
         return REDISMODULE_ERR;
@@ -626,14 +958,26 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
         SetNXPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
         return REDISMODULE_ERR;
 
+    if (RedisModule_CreateCommand(ctx,"setnxmpub",
+        SetNXMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
+        return REDISMODULE_ERR;
+
     if (RedisModule_CreateCommand(ctx,"delpub",
         DelPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
         return REDISMODULE_ERR;
 
+    if (RedisModule_CreateCommand(ctx,"delmpub",
+        DelMPub_RedisCommand,"write deny-oom pubsub",1,1,1) == REDISMODULE_ERR)
+        return REDISMODULE_ERR;
+
     if (RedisModule_CreateCommand(ctx,"deliepub",
         DelIEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
         return REDISMODULE_ERR;
 
+    if (RedisModule_CreateCommand(ctx,"deliempub",
+        DelIEMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
+        return REDISMODULE_ERR;
+
     if (RedisModule_CreateCommand(ctx,"delnepub",
         DelNEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
         return REDISMODULE_ERR;