X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=redismodule%2Fsrc%2Fexstrings.c;h=11102d3ce2b298488adcd0b1012b70b96a80d17b;hb=refs%2Fchanges%2F89%2F7589%2F3;hp=ef3cf5de07d151faf4d6dfeef61fcc1e871c7b84;hpb=49c7ba8f2a3efd63e795c961d5310c7224a1be2f;p=ric-plt%2Fdbaas.git diff --git a/redismodule/src/exstrings.c b/redismodule/src/exstrings.c index ef3cf5d..11102d3 100755 --- a/redismodule/src/exstrings.c +++ b/redismodule/src/exstrings.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2019 Nokia. + * Copyright (c) 2018-2020 Nokia. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,27 +14,31 @@ * limitations under the License. */ +/* + * This source code is part of the near-RT RIC (RAN Intelligent Controller) + * platform project (RICP). + */ + #include "redismodule.h" -#include +#include +#include #include -#include #include -#include -#include "../../redismodule/include/redismodule.h" +#include #ifdef __UT__ #include "exstringsStub.h" +#include "commonStub.h" #endif + /* make sure the response is not NULL or an error. sends the error to the client and exit the current function if its */ #define ASSERT_NOERROR(r) \ if (r == NULL) { \ return RedisModule_ReplyWithError(ctx,"ERR reply is NULL"); \ } else if (RedisModule_CallReplyType(r) == REDISMODULE_REPLY_ERROR) { \ - RedisModule_ReplyWithCallReply(ctx,r); \ - RedisModule_FreeCallReply(r); \ - return REDISMODULE_ERR; \ + return RedisModule_ReplyWithCallReply(ctx,r); \ } #define OBJ_OP_NO 0 @@ -43,6 +47,38 @@ sends the error to the client and exit the current function if its */ #define OBJ_OP_IE (1<<4) /* OP if equal old value */ #define OBJ_OP_NE (1<<5) /* OP if not equal old value */ +#define DEF_COUNT 50 +#define ZERO 0 +#define MATCH_STR "MATCH" +#define COUNT_STR "COUNT" +#define SCANARGC 5 + +RedisModuleString *def_count_str = NULL, *match_str = NULL, *count_str = NULL, *zero_str = NULL; + +typedef struct _NgetArgs { + RedisModuleString *key; + RedisModuleString *count; +} NgetArgs; + +typedef struct RedisModuleBlockedClientArgs { + RedisModuleBlockedClient *bc; + NgetArgs nget_args; +} RedisModuleBlockedClientArgs; + +void InitStaticVariable() +{ + if (def_count_str == NULL) + def_count_str = RedisModule_CreateStringFromLongLong(NULL, DEF_COUNT); + if (match_str == NULL) + match_str = RedisModule_CreateString(NULL, MATCH_STR, sizeof(MATCH_STR)); + if (count_str == NULL) + count_str = RedisModule_CreateString(NULL, COUNT_STR, sizeof(COUNT_STR)); + if (zero_str == NULL) + zero_str = RedisModule_CreateStringFromLongLong(NULL, ZERO); + + return; +} + int getKeyType(RedisModuleCtx *ctx, RedisModuleString *key_str) { RedisModuleKey *key = RedisModule_OpenKey(ctx, key_str, REDISMODULE_READ); @@ -71,6 +107,170 @@ typedef struct _PubParams { size_t length; } PubParams; +typedef struct _DelParams { + RedisModuleString **keys; + size_t length; +} DelParams; + +typedef enum _ExstringsStatus { + EXSTRINGS_STATUS_NO_ERRORS = 0, + EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT, + EXSTRINGS_STATUS_NOT_SET +} ExstringsStatus; + +void readNgetArgs(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, + NgetArgs* nget_args, ExstringsStatus* status) +{ + size_t str_len; + long long number; + + if(argc == 2) { + nget_args->key = argv[1]; + nget_args->count = def_count_str; + } else if (argc == 4) { + if (strcasecmp(RedisModule_StringPtrLen(argv[2], &str_len), "count")) { + RedisModule_ReplyWithError(ctx,"-ERR syntax error"); + *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT; + return; + } + + int ret = RedisModule_StringToLongLong(argv[3], &number) != REDISMODULE_OK; + if (ret != REDISMODULE_OK || number < 1) { + RedisModule_ReplyWithError(ctx,"-ERR value is not an integer or out of range"); + *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT; + return; + } + + nget_args->key = argv[1]; + nget_args->count = argv[3]; + } else { + /* In redis there is a bug (or undocumented feature see link) + * where calling 'RedisModule_WrongArity' + * within a blocked client will crash redis. + * + * Therefore we need to call this function to validate args + * before putting the client into blocking mode. + * + * Link to issue: + * https://github.com/antirez/redis/issues/6382 + * 'If any thread tries to access the command arguments from + * within the ThreadSafeContext they will crash redis' */ + RedisModule_WrongArity(ctx); + *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT; + return; + } + + *status = EXSTRINGS_STATUS_NO_ERRORS; + return; +} + +long long callReplyLongLong(RedisModuleCallReply* reply) +{ + const char* cursor_str_ptr = RedisModule_CallReplyStringPtr(reply, NULL); + return strtoll(cursor_str_ptr, NULL, 10); +} + +void forwardIfError(RedisModuleCtx *ctx, RedisModuleCallReply *reply, ExstringsStatus* status) +{ + if (RedisModule_CallReplyType(reply) == REDISMODULE_REPLY_ERROR) { + RedisModule_ReplyWithCallReply(ctx, reply); + RedisModule_FreeCallReply(reply); + *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT; + } + *status = EXSTRINGS_STATUS_NO_ERRORS; +} + +typedef struct _ScannedKeys { + RedisModuleString **keys; + size_t len; +} ScannedKeys; + +ScannedKeys* allocScannedKeys(size_t len) +{ + ScannedKeys *sk = RedisModule_Alloc(sizeof(ScannedKeys)); + if (sk) { + sk->len = len; + sk->keys = RedisModule_Alloc(sizeof(RedisModuleString *)*len); + } + return sk; +} + +void freeScannedKeys(RedisModuleCtx *ctx, ScannedKeys* sk) +{ + if (sk) { + size_t j; + for (j = 0; j < sk->len; j++) + RedisModule_FreeString(ctx, sk->keys[j]); + RedisModule_Free(sk->keys); + } + RedisModule_Free(sk); +} + +typedef struct _ScanSomeState { + RedisModuleString *key; + RedisModuleString *count; + long long cursor; +} ScanSomeState; + +ScannedKeys *scanSome(RedisModuleCtx* ctx, ScanSomeState* state, ExstringsStatus* status) +{ + RedisModuleString *scanargv[SCANARGC] = {NULL}; + + scanargv[0] = RedisModule_CreateStringFromLongLong(ctx, state->cursor); + scanargv[1] = match_str; + scanargv[2] = state->key; + scanargv[3] = count_str; + scanargv[4] = state->count; + + RedisModuleCallReply *reply; + reply = RedisModule_Call(ctx, "SCAN", "v", scanargv, SCANARGC); + RedisModule_FreeString(ctx, scanargv[0]); + forwardIfError(ctx, reply, status); + if (*status == EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT) + return NULL; + + state->cursor = callReplyLongLong(RedisModule_CallReplyArrayElement(reply, 0)); + RedisModuleCallReply *cr_keys = + RedisModule_CallReplyArrayElement(reply, 1); + + size_t scanned_keys_len = RedisModule_CallReplyLength(cr_keys); + if (scanned_keys_len == 0) { + RedisModule_FreeCallReply(reply); + *status = EXSTRINGS_STATUS_NO_ERRORS; + return NULL; + } + + ScannedKeys *scanned_keys = allocScannedKeys(scanned_keys_len); + if (scanned_keys == NULL) { + RedisModule_FreeCallReply(reply); + RedisModule_ReplyWithError(ctx,"-ERR Out of memory"); + *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT; + return NULL; + } + + scanned_keys->len = scanned_keys_len; + size_t j; + for (j = 0; j < scanned_keys_len; j++) { + RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(cr_keys,j)); + scanned_keys->keys[j] = rms; + } + RedisModule_FreeCallReply(reply); + *status = EXSTRINGS_STATUS_NO_ERRORS; + return scanned_keys; +} + +inline void unlockThreadsafeContext(RedisModuleCtx *ctx, bool using_threadsafe_context) +{ + if (using_threadsafe_context) + RedisModule_ThreadSafeContextUnlock(ctx); +} + +inline void lockThreadsafeContext(RedisModuleCtx *ctx, bool using_threadsafe_context) +{ + if (using_threadsafe_context) + RedisModule_ThreadSafeContextLock(ctx); +} + void multiPubCommand(RedisModuleCtx *ctx, PubParams* pubParams) { RedisModuleCallReply *reply = NULL; @@ -142,11 +342,13 @@ int setStringGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int SetIE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + RedisModule_AutoMemory(ctx); return setStringGenericCommand(ctx, argv, argc, OBJ_OP_IE); } int SetNE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + RedisModule_AutoMemory(ctx); return setStringGenericCommand(ctx, argv, argc, OBJ_OP_NE); } @@ -205,136 +407,22 @@ int delStringGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int DelIE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + RedisModule_AutoMemory(ctx); return delStringGenericCommand(ctx, argv, argc, OBJ_OP_IE); } int DelNE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + RedisModule_AutoMemory(ctx); return delStringGenericCommand(ctx, argv, argc, OBJ_OP_NE); } - -int NGet_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) -{ - RedisModuleCallReply *reply = NULL; - - if (argc != 2) - return RedisModule_WrongArity(ctx); - - /* Call the command to get keys with pattern. */ - reply = RedisModule_Call(ctx, "KEYS", "s", argv[1]); - ASSERT_NOERROR(reply) - - /* Prepare the arguments for the command. */ - size_t items = RedisModule_CallReplyLength(reply); - if (items == 0) { - //RedisModule_ReplyWithArray(ctx, items); - RedisModule_ReplyWithCallReply(ctx, reply); - RedisModule_FreeCallReply(reply); - } - else { - RedisModuleString *cmdargv[items]; - size_t i=0, j; - for (j = 0; j < items; j++) { - RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(reply, j)); - cmdargv[i++] = rms; - - /*Assume all keys via SDL is string type for sake of saving time*/ -#if 0 - /*Check if key type is string*/ - RedisModuleKey *key = RedisModule_OpenKey(ctx, rms ,REDISMODULE_READ); - - if (key) { - int type = RedisModule_KeyType(key); - RedisModule_CloseKey(key); - if (type == REDISMODULE_KEYTYPE_STRING) { - cmdargv[i++] = rms; - } - } else { - RedisModule_CloseKey(key); - } -#endif - } - RedisModule_FreeCallReply(reply); - - reply = RedisModule_Call(ctx, "MGET", "v", cmdargv, i); - ASSERT_NOERROR(reply) - items = RedisModule_CallReplyLength(reply); - RedisModule_ReplyWithArray(ctx, i*2); - for (j = 0; (jkey_val_pairs, setParamsPtr->length); ASSERT_NOERROR(setReply) - int replytype = RedisModule_CallReplyType(setReply); - if (replytype == REDISMODULE_REPLY_NULL) { - RedisModule_ReplyWithNull(ctx); - } else { - multiPubCommand(ctx, pubParamsPtr); - RedisModule_ReplyWithCallReply(ctx, setReply); - } + multiPubCommand(ctx, pubParamsPtr); + RedisModule_ReplyWithCallReply(ctx, setReply); RedisModule_FreeCallReply(setReply); return REDISMODULE_OK; } @@ -344,12 +432,13 @@ int SetPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (argc < 5 || (argc % 2) == 0) return RedisModule_WrongArity(ctx); + RedisModule_AutoMemory(ctx); SetParams setParams = { .key_val_pairs = argv + 1, .length = argc - 3 }; PubParams pubParams = { - .channel_msg_pairs = argv + argc - 2, + .channel_msg_pairs = argv + 1 + setParams.length, .length = 2 }; @@ -361,6 +450,7 @@ int SetMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc if (argc < 7 || (argc % 2) == 0) return RedisModule_WrongArity(ctx); + RedisModule_AutoMemory(ctx); long long setPairsCount, pubPairsCount; RedisModule_StringToLongLong(argv[1], &setPairsCount); RedisModule_StringToLongLong(argv[2], &pubPairsCount); @@ -370,6 +460,7 @@ int SetMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc long long setLen, pubLen; setLen = 2*setPairsCount; pubLen = 2*pubPairsCount; + if (setLen + pubLen + 3 != argc) return RedisModule_ReplyWithError(ctx, "ERR SET_PAIR_COUNT or PUB_PAIR_COUNT do not match the total pair count"); @@ -387,9 +478,6 @@ int SetMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc int setIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag) { - if (argc < 6 || (argc % 2) != 0) - return RedisModule_WrongArity(ctx); - SetParams setParams = { .key_val_pairs = argv + 1, .length = 2 @@ -420,21 +508,35 @@ int setIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int ar return setPubStringCommon(ctx, &setParams, &pubParams); } -int SetNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +int SetIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE); + if (argc != 6) + return RedisModule_WrongArity(ctx); + + RedisModule_AutoMemory(ctx); + return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE); } -int SetIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +int SetIEMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc < 6 || (argc % 2) != 0) + return RedisModule_WrongArity(ctx); + + RedisModule_AutoMemory(ctx); return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE); } -int setXXNXPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag) +int SetNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - if (argc < 5 || (argc % 2) == 0) + if (argc != 6) return RedisModule_WrongArity(ctx); + RedisModule_AutoMemory(ctx); + return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE); +} + +int setXXNXPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag) +{ SetParams setParams = { .key_val_pairs = argv + 1, .length = 2 @@ -459,114 +561,336 @@ int setXXNXPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int ar int SetNXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 5) + return RedisModule_WrongArity(ctx); + + RedisModule_AutoMemory(ctx); return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_NX); } -int SetXXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +int SetNXMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_XX); + if (argc < 5 || (argc % 2) == 0) + return RedisModule_WrongArity(ctx); + + RedisModule_AutoMemory(ctx); + return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_NX); } -int delPubStringGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv, - int argc, const int flag) +int SetXXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - RedisModuleString *oldvalstr = NULL, *channel = NULL, *message = NULL; - RedisModuleCallReply *reply = NULL; - - if (flag == OBJ_OP_NO) { - if (argc < 4) - return RedisModule_WrongArity(ctx); - else { - channel = argv[argc-2]; - message = argv[argc-1]; - } - } else { - if (argc != 5) - return RedisModule_WrongArity(ctx); - else { - oldvalstr = argv[2]; - channel = argv[3]; - message = argv[4]; - } - } - - if (flag != OBJ_OP_NO) { - /*Check if key type is string*/ - RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1], - REDISMODULE_READ); - int type = RedisModule_KeyType(key); - RedisModule_CloseKey(key); - - if (type == REDISMODULE_KEYTYPE_EMPTY) { - return RedisModule_ReplyWithLongLong(ctx, 0); - } else if (type != REDISMODULE_KEYTYPE_STRING) { - return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE); - } - } - - if (flag == OBJ_OP_IE || flag == OBJ_OP_NE) { - /*Get the value*/ - reply = RedisModule_Call(ctx, "GET", "s", argv[1]); - ASSERT_NOERROR(reply) - size_t curlen = 0, oldvallen = 0; - const char *oldval = RedisModule_StringPtrLen(oldvalstr, &oldvallen); - const char *curval = RedisModule_CallReplyStringPtr(reply, &curlen); - if (((flag == OBJ_OP_IE) && - (!curval || (oldvallen != curlen) || strncmp(oldval, curval, curlen))) - || - ((flag == OBJ_OP_NE) && curval && (oldvallen == curlen) && - !strncmp(oldval, curval, curlen))) { - RedisModule_FreeCallReply(reply); - return RedisModule_ReplyWithLongLong(ctx, 0); - } - RedisModule_FreeCallReply(reply); - } - + if (argc != 5) + return RedisModule_WrongArity(ctx); - /* Prepare the arguments for the command. */ - int i, j=0, cmdargc=argc-3; - RedisModuleString *cmdargv[cmdargc]; - for (i = 1; i < argc-2; i++) { - if ((flag == OBJ_OP_IE || flag == OBJ_OP_NE) && (i == 2)) - continue; - cmdargv[j++] = argv[i]; - } + RedisModule_AutoMemory(ctx); + return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_XX); +} - /* Call the command and pass back the reply. */ - reply = RedisModule_Call(ctx, "UNLINK", "v!", cmdargv, j); +int delPubStringCommon(RedisModuleCtx *ctx, DelParams *delParamsPtr, PubParams *pubParamsPtr) +{ + RedisModuleCallReply *reply = RedisModule_Call(ctx, "UNLINK", "v!", delParamsPtr->keys, delParamsPtr->length); ASSERT_NOERROR(reply) int replytype = RedisModule_CallReplyType(reply); if (replytype == REDISMODULE_REPLY_NULL) { RedisModule_ReplyWithNull(ctx); - } - else if (RedisModule_CallReplyInteger(reply) == 0) { + } else if (RedisModule_CallReplyInteger(reply) == 0) { RedisModule_ReplyWithCallReply(ctx, reply); } else { - cmdargc = 2; - cmdargv[0] = channel; - cmdargv[1] = message; - RedisModuleCallReply *pubreply = RedisModule_Call(ctx, "PUBLISH", "v", cmdargv, cmdargc); - RedisModule_FreeCallReply(pubreply); RedisModule_ReplyWithCallReply(ctx, reply); + multiPubCommand(ctx, pubParamsPtr); } - RedisModule_FreeCallReply(reply); return REDISMODULE_OK; } int DelPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - return delPubStringGenericCommand(ctx, argv, argc, OBJ_OP_NO); + if (argc < 4) + return RedisModule_WrongArity(ctx); + + RedisModule_AutoMemory(ctx); + DelParams delParams = { + .keys = argv + 1, + .length = argc - 3 + }; + PubParams pubParams = { + .channel_msg_pairs = argv + 1 + delParams.length, + .length = 2 + }; + + return delPubStringCommon(ctx, &delParams, &pubParams); +} + +int DelMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc < 6) + return RedisModule_WrongArity(ctx); + + RedisModule_AutoMemory(ctx); + long long delCount, pubPairsCount; + RedisModule_StringToLongLong(argv[1], &delCount); + RedisModule_StringToLongLong(argv[2], &pubPairsCount); + if (delCount < 1 || pubPairsCount < 1) + return RedisModule_ReplyWithError(ctx, "ERR DEL_COUNT and PUB_PAIR_COUNT must be greater than zero"); + + long long delLen, pubLen; + delLen = delCount; + pubLen = 2*pubPairsCount; + if (delLen + pubLen + 3 != argc) + return RedisModule_ReplyWithError(ctx, "ERR DEL_COUNT or PUB_PAIR_COUNT do not match the total pair count"); + + DelParams delParams = { + .keys = argv + 3, + .length = delLen + }; + PubParams pubParams = { + .channel_msg_pairs = argv + 3 + delParams.length, + .length = pubLen + }; + + return delPubStringCommon(ctx, &delParams, &pubParams); +} + +int delIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag) +{ + DelParams delParams = { + .keys = argv + 1, + .length = 1 + }; + PubParams pubParams = { + .channel_msg_pairs = argv + 3, + .length = argc - 3 + }; + RedisModuleString *key = argv[1]; + RedisModuleString *oldvalstr = argv[2]; + + int type = getKeyType(ctx, key); + if (type == REDISMODULE_KEYTYPE_EMPTY) { + return RedisModule_ReplyWithLongLong(ctx, 0); + } else if (type != REDISMODULE_KEYTYPE_STRING) { + return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); + } + + RedisModuleCallReply *reply = RedisModule_Call(ctx, "GET", "s", key); + ASSERT_NOERROR(reply) + bool is_equal = replyContentsEqualString(reply, oldvalstr); + RedisModule_FreeCallReply(reply); + if ((flag == OBJ_OP_IE && !is_equal) || + (flag == OBJ_OP_NE && is_equal)) { + return RedisModule_ReplyWithLongLong(ctx, 0); + } + + return delPubStringCommon(ctx, &delParams, &pubParams); } int DelIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - return delPubStringGenericCommand(ctx, argv, argc, OBJ_OP_IE); + if (argc != 5) + return RedisModule_WrongArity(ctx); + + RedisModule_AutoMemory(ctx); + return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE); +} + +int DelIEMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc < 5 || (argc % 2) == 0) + return RedisModule_WrongArity(ctx); + + RedisModule_AutoMemory(ctx); + return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE); } int DelNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - return delPubStringGenericCommand(ctx, argv, argc, OBJ_OP_NE); + if (argc != 5) + return RedisModule_WrongArity(ctx); + + RedisModule_AutoMemory(ctx); + return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE); +} + +int Nget_RedisCommand(RedisModuleCtx *ctx, NgetArgs* nget_args, bool using_threadsafe_context) +{ + int ret = REDISMODULE_OK; + size_t replylen = 0; + RedisModuleCallReply *reply = NULL; + ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET; + ScanSomeState scan_state; + ScannedKeys *scanned_keys; + + scan_state.key = nget_args->key; + scan_state.count = nget_args->count; + scan_state.cursor = 0; + + RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); + do { + lockThreadsafeContext(ctx, using_threadsafe_context); + + status = EXSTRINGS_STATUS_NOT_SET; + scanned_keys = scanSome(ctx, &scan_state, &status); + + if (status != EXSTRINGS_STATUS_NO_ERRORS) { + unlockThreadsafeContext(ctx, using_threadsafe_context); + ret = REDISMODULE_ERR; + break; + } else if (scanned_keys == NULL) { + unlockThreadsafeContext(ctx, using_threadsafe_context); + continue; + } + + reply = RedisModule_Call(ctx, "MGET", "v", scanned_keys->keys, scanned_keys->len); + + unlockThreadsafeContext(ctx, using_threadsafe_context); + + status = EXSTRINGS_STATUS_NOT_SET; + forwardIfError(ctx, reply, &status); + if (status != EXSTRINGS_STATUS_NO_ERRORS) { + freeScannedKeys(ctx, scanned_keys); + ret = REDISMODULE_ERR; + break; + } + + size_t i; + for (i = 0; i < scanned_keys->len; i++) { + RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(reply, i)); + if (rms) { + RedisModule_ReplyWithString(ctx, scanned_keys->keys[i]); + RedisModule_ReplyWithString(ctx, rms); + RedisModule_FreeString(ctx, rms); + replylen += 2; + } + } + RedisModule_FreeCallReply(reply); + freeScannedKeys(ctx, scanned_keys); + } while (scan_state.cursor != 0); + + RedisModule_ReplySetArrayLength(ctx,replylen); + return ret; +} + +/* The thread entry point that actually executes the blocking part + * of the command nget.noatomic + */ +void *NGet_NoAtomic_ThreadMain(void *arg) +{ + pthread_detach(pthread_self()); + + RedisModuleBlockedClientArgs *bca = arg; + RedisModuleBlockedClient *bc = bca->bc; + RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc); + + Nget_RedisCommand(ctx, &bca->nget_args, true); + RedisModule_FreeThreadSafeContext(ctx); + RedisModule_UnblockClient(bc, NULL); + RedisModule_Free(bca); + return NULL; +} + +int NGet_NoAtomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + RedisModule_AutoMemory(ctx); + pthread_t tid; + + InitStaticVariable(); + + RedisModuleBlockedClientArgs *bca = RedisModule_Alloc(sizeof(RedisModuleBlockedClientArgs)); + if (bca == NULL) { + RedisModule_ReplyWithError(ctx,"-ERR Out of memory"); + return REDISMODULE_ERR; + } + + ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET; + readNgetArgs(ctx, argv, argc, &bca->nget_args, &status); + if (status != EXSTRINGS_STATUS_NO_ERRORS) { + RedisModule_Free(bca); + return REDISMODULE_ERR; + } + + /* Note that when blocking the client we do not set any callback: no + * timeout is possible since we passed '0', nor we need a reply callback + * because we'll use the thread safe context to accumulate a reply. */ + RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0); + + bca->bc = bc; + + /* Now that we setup a blocking client, we need to pass the control + * to the thread. However we need to pass arguments to the thread: + * the reference to the blocked client handle. */ + if (pthread_create(&tid,NULL,NGet_NoAtomic_ThreadMain,bca) != 0) { + RedisModule_AbortBlock(bc); + RedisModule_Free(bca); + return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread"); + } + + return REDISMODULE_OK; +} + +int NGet_Atomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + RedisModule_AutoMemory(ctx); + NgetArgs nget_args; + ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET; + + InitStaticVariable(); + + readNgetArgs(ctx, argv, argc, &nget_args, &status); + if (status != EXSTRINGS_STATUS_NO_ERRORS) { + return REDISMODULE_ERR; + } + + return Nget_RedisCommand(ctx, &nget_args, false); +} + +int NDel_Atomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + RedisModule_AutoMemory(ctx); + int ret = REDISMODULE_OK; + long long replylen = 0; + RedisModuleCallReply *reply = NULL; + ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET; + ScanSomeState scan_state; + ScannedKeys *scanned_keys = NULL; + + InitStaticVariable(); + if (argc != 2) + return RedisModule_WrongArity(ctx); + + scan_state.key = argv[1]; + scan_state.count = def_count_str; + scan_state.cursor = 0; + + do { + status = EXSTRINGS_STATUS_NOT_SET; + scanned_keys = scanSome(ctx, &scan_state, &status); + + if (status != EXSTRINGS_STATUS_NO_ERRORS) { + ret = REDISMODULE_ERR; + break; + } else if (scanned_keys == NULL) { + continue; + } + + reply = RedisModule_Call(ctx, "UNLINK", "v!", scanned_keys->keys, scanned_keys->len); + + status = EXSTRINGS_STATUS_NOT_SET; + forwardIfError(ctx, reply, &status); + if (status != EXSTRINGS_STATUS_NO_ERRORS) { + freeScannedKeys(ctx, scanned_keys); + ret = REDISMODULE_ERR; + break; + } + + replylen += RedisModule_CallReplyInteger(reply); + RedisModule_FreeCallReply(reply); + freeScannedKeys(ctx, scanned_keys); + } while (scan_state.cursor != 0); + + if (ret == REDISMODULE_OK) { + RedisModule_ReplyWithLongLong(ctx, replylen); + } + + return ret; } /* This function must be present on each Redis module. It is used in order to @@ -594,12 +918,16 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) DelNE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; - if (RedisModule_CreateCommand(ctx,"nget", - NGet_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR) + if (RedisModule_CreateCommand(ctx,"nget.atomic", + NGet_Atomic_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; - if (RedisModule_CreateCommand(ctx,"ndel", - NDel_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) + if (RedisModule_CreateCommand(ctx,"nget.noatomic", + NGet_NoAtomic_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"ndel.atomic", + NDel_Atomic_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"msetpub", @@ -614,6 +942,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) SetIEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"setiempub", + SetIEMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"setnepub", SetNEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; @@ -626,14 +958,26 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) SetNXPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"setnxmpub", + SetNXMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"delpub", DelPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"delmpub", + DelMPub_RedisCommand,"write deny-oom pubsub",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"deliepub", DelIEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"deliempub", + DelIEMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"delnepub", DelNEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR;