X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=redismodule%2Fsrc%2Fexstrings.c;h=11102d3ce2b298488adcd0b1012b70b96a80d17b;hb=refs%2Fchanges%2F25%2F3425%2F1;hp=c1fe8d10b3d5d36557fbc7b0be48b6db13c2f048;hpb=a9dd09828c123da4387067c38dedcb54fd348dff;p=ric-plt%2Fdbaas.git diff --git a/redismodule/src/exstrings.c b/redismodule/src/exstrings.c index c1fe8d1..11102d3 100755 --- a/redismodule/src/exstrings.c +++ b/redismodule/src/exstrings.c @@ -20,9 +20,11 @@ */ #include "redismodule.h" -#include #include #include +#include +#include +#include #ifdef __UT__ #include "exstringsStub.h" @@ -53,6 +55,16 @@ sends the error to the client and exit the current function if its */ RedisModuleString *def_count_str = NULL, *match_str = NULL, *count_str = NULL, *zero_str = NULL; +typedef struct _NgetArgs { + RedisModuleString *key; + RedisModuleString *count; +} NgetArgs; + +typedef struct RedisModuleBlockedClientArgs { + RedisModuleBlockedClient *bc; + NgetArgs nget_args; +} RedisModuleBlockedClientArgs; + void InitStaticVariable() { if (def_count_str == NULL) @@ -100,6 +112,165 @@ typedef struct _DelParams { size_t length; } DelParams; +typedef enum _ExstringsStatus { + EXSTRINGS_STATUS_NO_ERRORS = 0, + EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT, + EXSTRINGS_STATUS_NOT_SET +} ExstringsStatus; + +void readNgetArgs(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, + NgetArgs* nget_args, ExstringsStatus* status) +{ + size_t str_len; + long long number; + + if(argc == 2) { + nget_args->key = argv[1]; + nget_args->count = def_count_str; + } else if (argc == 4) { + if (strcasecmp(RedisModule_StringPtrLen(argv[2], &str_len), "count")) { + RedisModule_ReplyWithError(ctx,"-ERR syntax error"); + *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT; + return; + } + + int ret = RedisModule_StringToLongLong(argv[3], &number) != REDISMODULE_OK; + if (ret != REDISMODULE_OK || number < 1) { + RedisModule_ReplyWithError(ctx,"-ERR value is not an integer or out of range"); + *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT; + return; + } + + nget_args->key = argv[1]; + nget_args->count = argv[3]; + } else { + /* In redis there is a bug (or undocumented feature see link) + * where calling 'RedisModule_WrongArity' + * within a blocked client will crash redis. + * + * Therefore we need to call this function to validate args + * before putting the client into blocking mode. + * + * Link to issue: + * https://github.com/antirez/redis/issues/6382 + * 'If any thread tries to access the command arguments from + * within the ThreadSafeContext they will crash redis' */ + RedisModule_WrongArity(ctx); + *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT; + return; + } + + *status = EXSTRINGS_STATUS_NO_ERRORS; + return; +} + +long long callReplyLongLong(RedisModuleCallReply* reply) +{ + const char* cursor_str_ptr = RedisModule_CallReplyStringPtr(reply, NULL); + return strtoll(cursor_str_ptr, NULL, 10); +} + +void forwardIfError(RedisModuleCtx *ctx, RedisModuleCallReply *reply, ExstringsStatus* status) +{ + if (RedisModule_CallReplyType(reply) == REDISMODULE_REPLY_ERROR) { + RedisModule_ReplyWithCallReply(ctx, reply); + RedisModule_FreeCallReply(reply); + *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT; + } + *status = EXSTRINGS_STATUS_NO_ERRORS; +} + +typedef struct _ScannedKeys { + RedisModuleString **keys; + size_t len; +} ScannedKeys; + +ScannedKeys* allocScannedKeys(size_t len) +{ + ScannedKeys *sk = RedisModule_Alloc(sizeof(ScannedKeys)); + if (sk) { + sk->len = len; + sk->keys = RedisModule_Alloc(sizeof(RedisModuleString *)*len); + } + return sk; +} + +void freeScannedKeys(RedisModuleCtx *ctx, ScannedKeys* sk) +{ + if (sk) { + size_t j; + for (j = 0; j < sk->len; j++) + RedisModule_FreeString(ctx, sk->keys[j]); + RedisModule_Free(sk->keys); + } + RedisModule_Free(sk); +} + +typedef struct _ScanSomeState { + RedisModuleString *key; + RedisModuleString *count; + long long cursor; +} ScanSomeState; + +ScannedKeys *scanSome(RedisModuleCtx* ctx, ScanSomeState* state, ExstringsStatus* status) +{ + RedisModuleString *scanargv[SCANARGC] = {NULL}; + + scanargv[0] = RedisModule_CreateStringFromLongLong(ctx, state->cursor); + scanargv[1] = match_str; + scanargv[2] = state->key; + scanargv[3] = count_str; + scanargv[4] = state->count; + + RedisModuleCallReply *reply; + reply = RedisModule_Call(ctx, "SCAN", "v", scanargv, SCANARGC); + RedisModule_FreeString(ctx, scanargv[0]); + forwardIfError(ctx, reply, status); + if (*status == EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT) + return NULL; + + state->cursor = callReplyLongLong(RedisModule_CallReplyArrayElement(reply, 0)); + RedisModuleCallReply *cr_keys = + RedisModule_CallReplyArrayElement(reply, 1); + + size_t scanned_keys_len = RedisModule_CallReplyLength(cr_keys); + if (scanned_keys_len == 0) { + RedisModule_FreeCallReply(reply); + *status = EXSTRINGS_STATUS_NO_ERRORS; + return NULL; + } + + ScannedKeys *scanned_keys = allocScannedKeys(scanned_keys_len); + if (scanned_keys == NULL) { + RedisModule_FreeCallReply(reply); + RedisModule_ReplyWithError(ctx,"-ERR Out of memory"); + *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT; + return NULL; + } + + scanned_keys->len = scanned_keys_len; + size_t j; + for (j = 0; j < scanned_keys_len; j++) { + RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(cr_keys,j)); + scanned_keys->keys[j] = rms; + } + RedisModule_FreeCallReply(reply); + *status = EXSTRINGS_STATUS_NO_ERRORS; + return scanned_keys; +} + +inline void unlockThreadsafeContext(RedisModuleCtx *ctx, bool using_threadsafe_context) +{ + if (using_threadsafe_context) + RedisModule_ThreadSafeContextUnlock(ctx); +} + +inline void lockThreadsafeContext(RedisModuleCtx *ctx, bool using_threadsafe_context) +{ + if (using_threadsafe_context) + RedisModule_ThreadSafeContextLock(ctx); +} + void multiPubCommand(RedisModuleCtx *ctx, PubParams* pubParams) { RedisModuleCallReply *reply = NULL; @@ -539,6 +710,189 @@ int DelNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int arg return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE); } +int Nget_RedisCommand(RedisModuleCtx *ctx, NgetArgs* nget_args, bool using_threadsafe_context) +{ + int ret = REDISMODULE_OK; + size_t replylen = 0; + RedisModuleCallReply *reply = NULL; + ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET; + ScanSomeState scan_state; + ScannedKeys *scanned_keys; + + scan_state.key = nget_args->key; + scan_state.count = nget_args->count; + scan_state.cursor = 0; + + RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); + do { + lockThreadsafeContext(ctx, using_threadsafe_context); + + status = EXSTRINGS_STATUS_NOT_SET; + scanned_keys = scanSome(ctx, &scan_state, &status); + + if (status != EXSTRINGS_STATUS_NO_ERRORS) { + unlockThreadsafeContext(ctx, using_threadsafe_context); + ret = REDISMODULE_ERR; + break; + } else if (scanned_keys == NULL) { + unlockThreadsafeContext(ctx, using_threadsafe_context); + continue; + } + + reply = RedisModule_Call(ctx, "MGET", "v", scanned_keys->keys, scanned_keys->len); + + unlockThreadsafeContext(ctx, using_threadsafe_context); + + status = EXSTRINGS_STATUS_NOT_SET; + forwardIfError(ctx, reply, &status); + if (status != EXSTRINGS_STATUS_NO_ERRORS) { + freeScannedKeys(ctx, scanned_keys); + ret = REDISMODULE_ERR; + break; + } + + size_t i; + for (i = 0; i < scanned_keys->len; i++) { + RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(reply, i)); + if (rms) { + RedisModule_ReplyWithString(ctx, scanned_keys->keys[i]); + RedisModule_ReplyWithString(ctx, rms); + RedisModule_FreeString(ctx, rms); + replylen += 2; + } + } + RedisModule_FreeCallReply(reply); + freeScannedKeys(ctx, scanned_keys); + } while (scan_state.cursor != 0); + + RedisModule_ReplySetArrayLength(ctx,replylen); + return ret; +} + +/* The thread entry point that actually executes the blocking part + * of the command nget.noatomic + */ +void *NGet_NoAtomic_ThreadMain(void *arg) +{ + pthread_detach(pthread_self()); + + RedisModuleBlockedClientArgs *bca = arg; + RedisModuleBlockedClient *bc = bca->bc; + RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc); + + Nget_RedisCommand(ctx, &bca->nget_args, true); + RedisModule_FreeThreadSafeContext(ctx); + RedisModule_UnblockClient(bc, NULL); + RedisModule_Free(bca); + return NULL; +} + +int NGet_NoAtomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + RedisModule_AutoMemory(ctx); + pthread_t tid; + + InitStaticVariable(); + + RedisModuleBlockedClientArgs *bca = RedisModule_Alloc(sizeof(RedisModuleBlockedClientArgs)); + if (bca == NULL) { + RedisModule_ReplyWithError(ctx,"-ERR Out of memory"); + return REDISMODULE_ERR; + } + + ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET; + readNgetArgs(ctx, argv, argc, &bca->nget_args, &status); + if (status != EXSTRINGS_STATUS_NO_ERRORS) { + RedisModule_Free(bca); + return REDISMODULE_ERR; + } + + /* Note that when blocking the client we do not set any callback: no + * timeout is possible since we passed '0', nor we need a reply callback + * because we'll use the thread safe context to accumulate a reply. */ + RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0); + + bca->bc = bc; + + /* Now that we setup a blocking client, we need to pass the control + * to the thread. However we need to pass arguments to the thread: + * the reference to the blocked client handle. */ + if (pthread_create(&tid,NULL,NGet_NoAtomic_ThreadMain,bca) != 0) { + RedisModule_AbortBlock(bc); + RedisModule_Free(bca); + return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread"); + } + + return REDISMODULE_OK; +} + +int NGet_Atomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + RedisModule_AutoMemory(ctx); + NgetArgs nget_args; + ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET; + + InitStaticVariable(); + + readNgetArgs(ctx, argv, argc, &nget_args, &status); + if (status != EXSTRINGS_STATUS_NO_ERRORS) { + return REDISMODULE_ERR; + } + + return Nget_RedisCommand(ctx, &nget_args, false); +} + +int NDel_Atomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + RedisModule_AutoMemory(ctx); + int ret = REDISMODULE_OK; + long long replylen = 0; + RedisModuleCallReply *reply = NULL; + ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET; + ScanSomeState scan_state; + ScannedKeys *scanned_keys = NULL; + + InitStaticVariable(); + if (argc != 2) + return RedisModule_WrongArity(ctx); + + scan_state.key = argv[1]; + scan_state.count = def_count_str; + scan_state.cursor = 0; + + do { + status = EXSTRINGS_STATUS_NOT_SET; + scanned_keys = scanSome(ctx, &scan_state, &status); + + if (status != EXSTRINGS_STATUS_NO_ERRORS) { + ret = REDISMODULE_ERR; + break; + } else if (scanned_keys == NULL) { + continue; + } + + reply = RedisModule_Call(ctx, "UNLINK", "v!", scanned_keys->keys, scanned_keys->len); + + status = EXSTRINGS_STATUS_NOT_SET; + forwardIfError(ctx, reply, &status); + if (status != EXSTRINGS_STATUS_NO_ERRORS) { + freeScannedKeys(ctx, scanned_keys); + ret = REDISMODULE_ERR; + break; + } + + replylen += RedisModule_CallReplyInteger(reply); + RedisModule_FreeCallReply(reply); + freeScannedKeys(ctx, scanned_keys); + } while (scan_state.cursor != 0); + + if (ret == REDISMODULE_OK) { + RedisModule_ReplyWithLongLong(ctx, replylen); + } + + return ret; +} + /* This function must be present on each Redis module. It is used in order to * register the commands into the Redis server. */ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { @@ -564,6 +918,18 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) DelNE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"nget.atomic", + NGet_Atomic_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"nget.noatomic", + NGet_NoAtomic_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"ndel.atomic", + NDel_Atomic_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"msetpub", SetPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR;