/*
- * Copyright (c) 2018-2019 Nokia.
+ * Copyright (c) 2018-2020 Nokia.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* limitations under the License.
*/
+/*
+ * This source code is part of the near-RT RIC (RAN Intelligent Controller)
+ * platform project (RICP).
+ */
+
#include "redismodule.h"
-#include <stdio.h>
+#include <pthread.h>
+#include <stdbool.h>
#include <stdlib.h>
-#include <ctype.h>
#include <string.h>
-#include <stdbool.h>
-#include "../../redismodule/include/redismodule.h"
+#include <strings.h>
#ifdef __UT__
#include "exstringsStub.h"
+#include "commonStub.h"
#endif
+
/* make sure the response is not NULL or an error.
sends the error to the client and exit the current function if its */
#define ASSERT_NOERROR(r) \
if (r == NULL) { \
return RedisModule_ReplyWithError(ctx,"ERR reply is NULL"); \
} else if (RedisModule_CallReplyType(r) == REDISMODULE_REPLY_ERROR) { \
- RedisModule_ReplyWithCallReply(ctx,r); \
- RedisModule_FreeCallReply(r); \
- return REDISMODULE_ERR; \
+ return RedisModule_ReplyWithCallReply(ctx,r); \
}
#define OBJ_OP_NO 0
#define OBJ_OP_IE (1<<4) /* OP if equal old value */
#define OBJ_OP_NE (1<<5) /* OP if not equal old value */
+#define DEF_COUNT 50
+#define ZERO 0
+#define MATCH_STR "MATCH"
+#define COUNT_STR "COUNT"
+#define SCANARGC 5
+
+RedisModuleString *def_count_str = NULL, *match_str = NULL, *count_str = NULL, *zero_str = NULL;
+
+typedef struct _NgetArgs {
+ RedisModuleString *key;
+ RedisModuleString *count;
+} NgetArgs;
+
+typedef struct RedisModuleBlockedClientArgs {
+ RedisModuleBlockedClient *bc;
+ NgetArgs nget_args;
+} RedisModuleBlockedClientArgs;
+
+void InitStaticVariable()
+{
+ if (def_count_str == NULL)
+ def_count_str = RedisModule_CreateStringFromLongLong(NULL, DEF_COUNT);
+ if (match_str == NULL)
+ match_str = RedisModule_CreateString(NULL, MATCH_STR, sizeof(MATCH_STR));
+ if (count_str == NULL)
+ count_str = RedisModule_CreateString(NULL, COUNT_STR, sizeof(COUNT_STR));
+ if (zero_str == NULL)
+ zero_str = RedisModule_CreateStringFromLongLong(NULL, ZERO);
+
+ return;
+}
+
int getKeyType(RedisModuleCtx *ctx, RedisModuleString *key_str)
{
RedisModuleKey *key = RedisModule_OpenKey(ctx, key_str, REDISMODULE_READ);
size_t length;
} DelParams;
+typedef enum _ExstringsStatus {
+ EXSTRINGS_STATUS_NO_ERRORS = 0,
+ EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT,
+ EXSTRINGS_STATUS_NOT_SET
+} ExstringsStatus;
+
+void readNgetArgs(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
+ NgetArgs* nget_args, ExstringsStatus* status)
+{
+ size_t str_len;
+ long long number;
+
+ if(argc == 2) {
+ nget_args->key = argv[1];
+ nget_args->count = def_count_str;
+ } else if (argc == 4) {
+ if (strcasecmp(RedisModule_StringPtrLen(argv[2], &str_len), "count")) {
+ RedisModule_ReplyWithError(ctx,"-ERR syntax error");
+ *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
+ return;
+ }
+
+ int ret = RedisModule_StringToLongLong(argv[3], &number) != REDISMODULE_OK;
+ if (ret != REDISMODULE_OK || number < 1) {
+ RedisModule_ReplyWithError(ctx,"-ERR value is not an integer or out of range");
+ *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
+ return;
+ }
+
+ nget_args->key = argv[1];
+ nget_args->count = argv[3];
+ } else {
+ /* In redis there is a bug (or undocumented feature see link)
+ * where calling 'RedisModule_WrongArity'
+ * within a blocked client will crash redis.
+ *
+ * Therefore we need to call this function to validate args
+ * before putting the client into blocking mode.
+ *
+ * Link to issue:
+ * https://github.com/antirez/redis/issues/6382
+ * 'If any thread tries to access the command arguments from
+ * within the ThreadSafeContext they will crash redis' */
+ RedisModule_WrongArity(ctx);
+ *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
+ return;
+ }
+
+ *status = EXSTRINGS_STATUS_NO_ERRORS;
+ return;
+}
+
+long long callReplyLongLong(RedisModuleCallReply* reply)
+{
+ const char* cursor_str_ptr = RedisModule_CallReplyStringPtr(reply, NULL);
+ return strtoll(cursor_str_ptr, NULL, 10);
+}
+
+void forwardIfError(RedisModuleCtx *ctx, RedisModuleCallReply *reply, ExstringsStatus* status)
+{
+ if (RedisModule_CallReplyType(reply) == REDISMODULE_REPLY_ERROR) {
+ RedisModule_ReplyWithCallReply(ctx, reply);
+ RedisModule_FreeCallReply(reply);
+ *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
+ }
+ *status = EXSTRINGS_STATUS_NO_ERRORS;
+}
+
+typedef struct _ScannedKeys {
+ RedisModuleString **keys;
+ size_t len;
+} ScannedKeys;
+
+ScannedKeys* allocScannedKeys(size_t len)
+{
+ ScannedKeys *sk = RedisModule_Alloc(sizeof(ScannedKeys));
+ if (sk) {
+ sk->len = len;
+ sk->keys = RedisModule_Alloc(sizeof(RedisModuleString *)*len);
+ }
+ return sk;
+}
+
+void freeScannedKeys(RedisModuleCtx *ctx, ScannedKeys* sk)
+{
+ if (sk) {
+ size_t j;
+ for (j = 0; j < sk->len; j++)
+ RedisModule_FreeString(ctx, sk->keys[j]);
+ RedisModule_Free(sk->keys);
+ }
+ RedisModule_Free(sk);
+}
+
+typedef struct _ScanSomeState {
+ RedisModuleString *key;
+ RedisModuleString *count;
+ long long cursor;
+} ScanSomeState;
+
+ScannedKeys *scanSome(RedisModuleCtx* ctx, ScanSomeState* state, ExstringsStatus* status)
+{
+ RedisModuleString *scanargv[SCANARGC] = {NULL};
+
+ scanargv[0] = RedisModule_CreateStringFromLongLong(ctx, state->cursor);
+ scanargv[1] = match_str;
+ scanargv[2] = state->key;
+ scanargv[3] = count_str;
+ scanargv[4] = state->count;
+
+ RedisModuleCallReply *reply;
+ reply = RedisModule_Call(ctx, "SCAN", "v", scanargv, SCANARGC);
+ RedisModule_FreeString(ctx, scanargv[0]);
+ forwardIfError(ctx, reply, status);
+ if (*status == EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT)
+ return NULL;
+
+ state->cursor = callReplyLongLong(RedisModule_CallReplyArrayElement(reply, 0));
+ RedisModuleCallReply *cr_keys =
+ RedisModule_CallReplyArrayElement(reply, 1);
+
+ size_t scanned_keys_len = RedisModule_CallReplyLength(cr_keys);
+ if (scanned_keys_len == 0) {
+ RedisModule_FreeCallReply(reply);
+ *status = EXSTRINGS_STATUS_NO_ERRORS;
+ return NULL;
+ }
+
+ ScannedKeys *scanned_keys = allocScannedKeys(scanned_keys_len);
+ if (scanned_keys == NULL) {
+ RedisModule_FreeCallReply(reply);
+ RedisModule_ReplyWithError(ctx,"-ERR Out of memory");
+ *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
+ return NULL;
+ }
+
+ scanned_keys->len = scanned_keys_len;
+ size_t j;
+ for (j = 0; j < scanned_keys_len; j++) {
+ RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(cr_keys,j));
+ scanned_keys->keys[j] = rms;
+ }
+ RedisModule_FreeCallReply(reply);
+ *status = EXSTRINGS_STATUS_NO_ERRORS;
+ return scanned_keys;
+}
+
+inline void unlockThreadsafeContext(RedisModuleCtx *ctx, bool using_threadsafe_context)
+{
+ if (using_threadsafe_context)
+ RedisModule_ThreadSafeContextUnlock(ctx);
+}
+
+inline void lockThreadsafeContext(RedisModuleCtx *ctx, bool using_threadsafe_context)
+{
+ if (using_threadsafe_context)
+ RedisModule_ThreadSafeContextLock(ctx);
+}
+
void multiPubCommand(RedisModuleCtx *ctx, PubParams* pubParams)
{
RedisModuleCallReply *reply = NULL;
int SetIE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
+ RedisModule_AutoMemory(ctx);
return setStringGenericCommand(ctx, argv, argc, OBJ_OP_IE);
}
int SetNE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
+ RedisModule_AutoMemory(ctx);
return setStringGenericCommand(ctx, argv, argc, OBJ_OP_NE);
}
int DelIE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
+ RedisModule_AutoMemory(ctx);
return delStringGenericCommand(ctx, argv, argc, OBJ_OP_IE);
}
int DelNE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
+ RedisModule_AutoMemory(ctx);
return delStringGenericCommand(ctx, argv, argc, OBJ_OP_NE);
}
-
-int NGet_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
-{
- RedisModuleCallReply *reply = NULL;
-
- if (argc != 2)
- return RedisModule_WrongArity(ctx);
-
- /* Call the command to get keys with pattern. */
- reply = RedisModule_Call(ctx, "KEYS", "s", argv[1]);
- ASSERT_NOERROR(reply)
-
- /* Prepare the arguments for the command. */
- size_t items = RedisModule_CallReplyLength(reply);
- if (items == 0) {
- //RedisModule_ReplyWithArray(ctx, items);
- RedisModule_ReplyWithCallReply(ctx, reply);
- RedisModule_FreeCallReply(reply);
- }
- else {
- RedisModuleString *cmdargv[items];
- size_t i=0, j;
- for (j = 0; j < items; j++) {
- RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(reply, j));
- cmdargv[i++] = rms;
-
- /*Assume all keys via SDL is string type for sake of saving time*/
-#if 0
- /*Check if key type is string*/
- RedisModuleKey *key = RedisModule_OpenKey(ctx, rms ,REDISMODULE_READ);
-
- if (key) {
- int type = RedisModule_KeyType(key);
- RedisModule_CloseKey(key);
- if (type == REDISMODULE_KEYTYPE_STRING) {
- cmdargv[i++] = rms;
- }
- } else {
- RedisModule_CloseKey(key);
- }
-#endif
- }
- RedisModule_FreeCallReply(reply);
-
- reply = RedisModule_Call(ctx, "MGET", "v", cmdargv, i);
- ASSERT_NOERROR(reply)
- items = RedisModule_CallReplyLength(reply);
- RedisModule_ReplyWithArray(ctx, i*2);
- for (j = 0; (j<items && j<i); j++) {
- RedisModule_ReplyWithString(ctx, cmdargv[j]);
- RedisModule_ReplyWithString(ctx, RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(reply, j)));
- }
-
- RedisModule_FreeCallReply(reply);
- }
-
- return REDISMODULE_OK;
-}
-
-int NDel_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
-{
- RedisModuleCallReply *reply = NULL;
-
- if (argc != 2)
- return RedisModule_WrongArity(ctx);
-
- /* Call the command to get keys with pattern. */
- reply = RedisModule_Call(ctx, "KEYS", "s", argv[1]);
- ASSERT_NOERROR(reply)
-
- /* Prepare the arguments for the command. */
- size_t items = RedisModule_CallReplyLength(reply);
- if (items == 0) {
- RedisModule_ReplyWithLongLong(ctx, 0);
- RedisModule_FreeCallReply(reply);
- }
- else {
- RedisModuleString *cmdargv[items];
- size_t i=0, j;
- for (j = 0; j < items; j++) {
- RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(reply, j));
- cmdargv[i++] = rms;
-
- /*Assume all keys via SDL is string type for sake of saving time*/
-#if 0
- //Check if key type is string
- RedisModuleKey *key = RedisModule_OpenKey(ctx, rms ,REDISMODULE_READ);
-
- if (key) {
- int type = RedisModule_KeyType(key);
- RedisModule_CloseKey(key);
- if (type == REDISMODULE_KEYTYPE_STRING) {
- cmdargv[i++] = rms;
- }
- } else {
- RedisModule_CloseKey(key);
- }
-#endif
- }
- RedisModule_FreeCallReply(reply);
-
- reply = RedisModule_Call(ctx, "UNLINK", "v!", cmdargv, i);
- ASSERT_NOERROR(reply)
- RedisModule_ReplyWithCallReply(ctx, reply);
- RedisModule_FreeCallReply(reply);
-
- }
-
- return REDISMODULE_OK;
-}
-
int setPubStringCommon(RedisModuleCtx *ctx, SetParams* setParamsPtr, PubParams* pubParamsPtr)
{
RedisModuleCallReply *setReply;
setReply = RedisModule_Call(ctx, "MSET", "v!", setParamsPtr->key_val_pairs, setParamsPtr->length);
ASSERT_NOERROR(setReply)
- int replytype = RedisModule_CallReplyType(setReply);
- if (replytype == REDISMODULE_REPLY_NULL) {
- RedisModule_ReplyWithNull(ctx);
- } else {
- multiPubCommand(ctx, pubParamsPtr);
- RedisModule_ReplyWithCallReply(ctx, setReply);
- }
+ multiPubCommand(ctx, pubParamsPtr);
+ RedisModule_ReplyWithCallReply(ctx, setReply);
RedisModule_FreeCallReply(setReply);
return REDISMODULE_OK;
}
if (argc < 5 || (argc % 2) == 0)
return RedisModule_WrongArity(ctx);
+ RedisModule_AutoMemory(ctx);
SetParams setParams = {
.key_val_pairs = argv + 1,
.length = argc - 3
};
PubParams pubParams = {
- .channel_msg_pairs = argv + argc - 2,
+ .channel_msg_pairs = argv + 1 + setParams.length,
.length = 2
};
if (argc < 7 || (argc % 2) == 0)
return RedisModule_WrongArity(ctx);
+ RedisModule_AutoMemory(ctx);
long long setPairsCount, pubPairsCount;
RedisModule_StringToLongLong(argv[1], &setPairsCount);
RedisModule_StringToLongLong(argv[2], &pubPairsCount);
long long setLen, pubLen;
setLen = 2*setPairsCount;
pubLen = 2*pubPairsCount;
+
if (setLen + pubLen + 3 != argc)
return RedisModule_ReplyWithError(ctx, "ERR SET_PAIR_COUNT or PUB_PAIR_COUNT do not match the total pair count");
int setIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
{
- if (argc < 6 || (argc % 2) != 0)
- return RedisModule_WrongArity(ctx);
-
SetParams setParams = {
.key_val_pairs = argv + 1,
.length = 2
return setPubStringCommon(ctx, &setParams, &pubParams);
}
-int SetNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+int SetIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
- return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE);
+ if (argc != 6)
+ return RedisModule_WrongArity(ctx);
+
+ RedisModule_AutoMemory(ctx);
+ return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
}
-int SetIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+int SetIEMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
+ if (argc < 6 || (argc % 2) != 0)
+ return RedisModule_WrongArity(ctx);
+
+ RedisModule_AutoMemory(ctx);
return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
}
-int setXXNXPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
+int SetNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
- if (argc < 5 || (argc % 2) == 0)
+ if (argc != 6)
return RedisModule_WrongArity(ctx);
+ RedisModule_AutoMemory(ctx);
+ return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE);
+}
+
+int setXXNXPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
+{
SetParams setParams = {
.key_val_pairs = argv + 1,
.length = 2
int SetNXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
+ if (argc != 5)
+ return RedisModule_WrongArity(ctx);
+
+ RedisModule_AutoMemory(ctx);
+ return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_NX);
+}
+
+int SetNXMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ if (argc < 5 || (argc % 2) == 0)
+ return RedisModule_WrongArity(ctx);
+
+ RedisModule_AutoMemory(ctx);
return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_NX);
}
int SetXXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
+ if (argc != 5)
+ return RedisModule_WrongArity(ctx);
+
+ RedisModule_AutoMemory(ctx);
return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_XX);
}
if (argc < 4)
return RedisModule_WrongArity(ctx);
+ RedisModule_AutoMemory(ctx);
DelParams delParams = {
.keys = argv + 1,
.length = argc - 3
if (argc < 6)
return RedisModule_WrongArity(ctx);
+ RedisModule_AutoMemory(ctx);
long long delCount, pubPairsCount;
RedisModule_StringToLongLong(argv[1], &delCount);
RedisModule_StringToLongLong(argv[2], &pubPairsCount);
int delIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
{
- if (argc < 5 || (argc % 2) == 0)
- return RedisModule_WrongArity(ctx);
-
DelParams delParams = {
.keys = argv + 1,
.length = 1
int DelIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
- return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
+ if (argc != 5)
+ return RedisModule_WrongArity(ctx);
+
+ RedisModule_AutoMemory(ctx);
+ return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
+}
+
+int DelIEMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ if (argc < 5 || (argc % 2) == 0)
+ return RedisModule_WrongArity(ctx);
+
+ RedisModule_AutoMemory(ctx);
+ return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
}
int DelNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
- return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE);
+ if (argc != 5)
+ return RedisModule_WrongArity(ctx);
+
+ RedisModule_AutoMemory(ctx);
+ return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE);
+}
+
+int Nget_RedisCommand(RedisModuleCtx *ctx, NgetArgs* nget_args, bool using_threadsafe_context)
+{
+ int ret = REDISMODULE_OK;
+ size_t replylen = 0;
+ RedisModuleCallReply *reply = NULL;
+ ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
+ ScanSomeState scan_state;
+ ScannedKeys *scanned_keys;
+
+ scan_state.key = nget_args->key;
+ scan_state.count = nget_args->count;
+ scan_state.cursor = 0;
+
+ RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
+ do {
+ lockThreadsafeContext(ctx, using_threadsafe_context);
+
+ status = EXSTRINGS_STATUS_NOT_SET;
+ scanned_keys = scanSome(ctx, &scan_state, &status);
+
+ if (status != EXSTRINGS_STATUS_NO_ERRORS) {
+ unlockThreadsafeContext(ctx, using_threadsafe_context);
+ ret = REDISMODULE_ERR;
+ break;
+ } else if (scanned_keys == NULL) {
+ unlockThreadsafeContext(ctx, using_threadsafe_context);
+ continue;
+ }
+
+ reply = RedisModule_Call(ctx, "MGET", "v", scanned_keys->keys, scanned_keys->len);
+
+ unlockThreadsafeContext(ctx, using_threadsafe_context);
+
+ status = EXSTRINGS_STATUS_NOT_SET;
+ forwardIfError(ctx, reply, &status);
+ if (status != EXSTRINGS_STATUS_NO_ERRORS) {
+ freeScannedKeys(ctx, scanned_keys);
+ ret = REDISMODULE_ERR;
+ break;
+ }
+
+ size_t i;
+ for (i = 0; i < scanned_keys->len; i++) {
+ RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(reply, i));
+ if (rms) {
+ RedisModule_ReplyWithString(ctx, scanned_keys->keys[i]);
+ RedisModule_ReplyWithString(ctx, rms);
+ RedisModule_FreeString(ctx, rms);
+ replylen += 2;
+ }
+ }
+ RedisModule_FreeCallReply(reply);
+ freeScannedKeys(ctx, scanned_keys);
+ } while (scan_state.cursor != 0);
+
+ RedisModule_ReplySetArrayLength(ctx,replylen);
+ return ret;
+}
+
+/* The thread entry point that actually executes the blocking part
+ * of the command nget.noatomic
+ */
+void *NGet_NoAtomic_ThreadMain(void *arg)
+{
+ pthread_detach(pthread_self());
+
+ RedisModuleBlockedClientArgs *bca = arg;
+ RedisModuleBlockedClient *bc = bca->bc;
+ RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
+
+ Nget_RedisCommand(ctx, &bca->nget_args, true);
+ RedisModule_FreeThreadSafeContext(ctx);
+ RedisModule_UnblockClient(bc, NULL);
+ RedisModule_Free(bca);
+ return NULL;
+}
+
+int NGet_NoAtomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ RedisModule_AutoMemory(ctx);
+ pthread_t tid;
+
+ InitStaticVariable();
+
+ RedisModuleBlockedClientArgs *bca = RedisModule_Alloc(sizeof(RedisModuleBlockedClientArgs));
+ if (bca == NULL) {
+ RedisModule_ReplyWithError(ctx,"-ERR Out of memory");
+ return REDISMODULE_ERR;
+ }
+
+ ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
+ readNgetArgs(ctx, argv, argc, &bca->nget_args, &status);
+ if (status != EXSTRINGS_STATUS_NO_ERRORS) {
+ RedisModule_Free(bca);
+ return REDISMODULE_ERR;
+ }
+
+ /* Note that when blocking the client we do not set any callback: no
+ * timeout is possible since we passed '0', nor we need a reply callback
+ * because we'll use the thread safe context to accumulate a reply. */
+ RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);
+
+ bca->bc = bc;
+
+ /* Now that we setup a blocking client, we need to pass the control
+ * to the thread. However we need to pass arguments to the thread:
+ * the reference to the blocked client handle. */
+ if (pthread_create(&tid,NULL,NGet_NoAtomic_ThreadMain,bca) != 0) {
+ RedisModule_AbortBlock(bc);
+ RedisModule_Free(bca);
+ return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
+ }
+
+ return REDISMODULE_OK;
+}
+
+int NGet_Atomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ RedisModule_AutoMemory(ctx);
+ NgetArgs nget_args;
+ ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
+
+ InitStaticVariable();
+
+ readNgetArgs(ctx, argv, argc, &nget_args, &status);
+ if (status != EXSTRINGS_STATUS_NO_ERRORS) {
+ return REDISMODULE_ERR;
+ }
+
+ return Nget_RedisCommand(ctx, &nget_args, false);
+}
+
+int NDel_Atomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ RedisModule_AutoMemory(ctx);
+ int ret = REDISMODULE_OK;
+ long long replylen = 0;
+ RedisModuleCallReply *reply = NULL;
+ ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
+ ScanSomeState scan_state;
+ ScannedKeys *scanned_keys = NULL;
+
+ InitStaticVariable();
+ if (argc != 2)
+ return RedisModule_WrongArity(ctx);
+
+ scan_state.key = argv[1];
+ scan_state.count = def_count_str;
+ scan_state.cursor = 0;
+
+ do {
+ status = EXSTRINGS_STATUS_NOT_SET;
+ scanned_keys = scanSome(ctx, &scan_state, &status);
+
+ if (status != EXSTRINGS_STATUS_NO_ERRORS) {
+ ret = REDISMODULE_ERR;
+ break;
+ } else if (scanned_keys == NULL) {
+ continue;
+ }
+
+ reply = RedisModule_Call(ctx, "UNLINK", "v!", scanned_keys->keys, scanned_keys->len);
+
+ status = EXSTRINGS_STATUS_NOT_SET;
+ forwardIfError(ctx, reply, &status);
+ if (status != EXSTRINGS_STATUS_NO_ERRORS) {
+ freeScannedKeys(ctx, scanned_keys);
+ ret = REDISMODULE_ERR;
+ break;
+ }
+
+ replylen += RedisModule_CallReplyInteger(reply);
+ RedisModule_FreeCallReply(reply);
+ freeScannedKeys(ctx, scanned_keys);
+ } while (scan_state.cursor != 0);
+
+ if (ret == REDISMODULE_OK) {
+ RedisModule_ReplyWithLongLong(ctx, replylen);
+ }
+
+ return ret;
}
/* This function must be present on each Redis module. It is used in order to
DelNE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx,"nget",
- NGet_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
+ if (RedisModule_CreateCommand(ctx,"nget.atomic",
+ NGet_Atomic_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"nget.noatomic",
+ NGet_NoAtomic_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx,"ndel",
- NDel_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
+ if (RedisModule_CreateCommand(ctx,"ndel.atomic",
+ NDel_Atomic_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"msetpub",
SetIEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"setiempub",
+ SetIEMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
if (RedisModule_CreateCommand(ctx,"setnepub",
SetNEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
SetNXPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"setnxmpub",
+ SetNXMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
if (RedisModule_CreateCommand(ctx,"delpub",
DelPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
DelIEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"deliempub",
+ DelIEMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
if (RedisModule_CreateCommand(ctx,"delnepub",
DelNEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;