/* * Copyright (c) 2018-2020 Nokia. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /* * This source code is part of the near-RT RIC (RAN Intelligent Controller) * platform project (RICP). */ #include "redismodule.h" #include #include #include #include #include #ifdef __UT__ #include "exstringsStub.h" #include "commonStub.h" #endif /* make sure the response is not NULL or an error. sends the error to the client and exit the current function if its */ #define ASSERT_NOERROR(r) \ if (r == NULL) { \ return RedisModule_ReplyWithError(ctx,"ERR reply is NULL"); \ } else if (RedisModule_CallReplyType(r) == REDISMODULE_REPLY_ERROR) { \ return RedisModule_ReplyWithCallReply(ctx,r); \ } #define OBJ_OP_NO 0 #define OBJ_OP_XX (1<<1) /* OP if key exist */ #define OBJ_OP_NX (1<<2) /* OP if key not exist */ #define OBJ_OP_IE (1<<4) /* OP if equal old value */ #define OBJ_OP_NE (1<<5) /* OP if not equal old value */ #define DEF_COUNT 50 #define ZERO 0 #define MATCH_STR "MATCH" #define COUNT_STR "COUNT" #define SCANARGC 5 RedisModuleString *def_count_str = NULL, *match_str = NULL, *count_str = NULL, *zero_str = NULL; typedef struct _NgetArgs { RedisModuleString *key; RedisModuleString *count; } NgetArgs; typedef struct RedisModuleBlockedClientArgs { RedisModuleBlockedClient *bc; NgetArgs nget_args; } RedisModuleBlockedClientArgs; void InitStaticVariable() { if (def_count_str == NULL) def_count_str = RedisModule_CreateStringFromLongLong(NULL, DEF_COUNT); if (match_str == NULL) match_str = RedisModule_CreateString(NULL, MATCH_STR, sizeof(MATCH_STR)); if (count_str == NULL) count_str = RedisModule_CreateString(NULL, COUNT_STR, sizeof(COUNT_STR)); if (zero_str == NULL) zero_str = RedisModule_CreateStringFromLongLong(NULL, ZERO); return; } int getKeyType(RedisModuleCtx *ctx, RedisModuleString *key_str) { RedisModuleKey *key = RedisModule_OpenKey(ctx, key_str, REDISMODULE_READ); int type = RedisModule_KeyType(key); RedisModule_CloseKey(key); return type; } bool replyContentsEqualString(RedisModuleCallReply *reply, RedisModuleString *expected_value) { size_t replylen = 0, expectedlen = 0; const char *expectedval = RedisModule_StringPtrLen(expected_value, &expectedlen); const char *replyval = RedisModule_CallReplyStringPtr(reply, &replylen); return replyval && expectedlen == replylen && !strncmp(expectedval, replyval, replylen); } typedef struct _SetParams { RedisModuleString **key_val_pairs; size_t length; } SetParams; typedef struct _PubParams { RedisModuleString **channel_msg_pairs; size_t length; } PubParams; typedef struct _DelParams { RedisModuleString **keys; size_t length; } DelParams; typedef enum _ExstringsStatus { EXSTRINGS_STATUS_NO_ERRORS = 0, EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT, EXSTRINGS_STATUS_NOT_SET } ExstringsStatus; void readNgetArgs(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, NgetArgs* nget_args, ExstringsStatus* status) { size_t str_len; long long number; if(argc == 2) { nget_args->key = argv[1]; nget_args->count = def_count_str; } else if (argc == 4) { if (strcasecmp(RedisModule_StringPtrLen(argv[2], &str_len), "count")) { RedisModule_ReplyWithError(ctx,"-ERR syntax error"); *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT; return; } int ret = RedisModule_StringToLongLong(argv[3], &number) != REDISMODULE_OK; if (ret != REDISMODULE_OK || number < 1) { RedisModule_ReplyWithError(ctx,"-ERR value is not an integer or out of range"); *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT; return; } nget_args->key = argv[1]; nget_args->count = argv[3]; } else { /* In redis there is a bug (or undocumented feature see link) * where calling 'RedisModule_WrongArity' * within a blocked client will crash redis. * * Therefore we need to call this function to validate args * before putting the client into blocking mode. * * Link to issue: * https://github.com/antirez/redis/issues/6382 * 'If any thread tries to access the command arguments from * within the ThreadSafeContext they will crash redis' */ RedisModule_WrongArity(ctx); *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT; return; } *status = EXSTRINGS_STATUS_NO_ERRORS; return; } long long callReplyLongLong(RedisModuleCallReply* reply) { const char* cursor_str_ptr = RedisModule_CallReplyStringPtr(reply, NULL); return strtoll(cursor_str_ptr, NULL, 10); } void forwardIfError(RedisModuleCtx *ctx, RedisModuleCallReply *reply, ExstringsStatus* status) { if (RedisModule_CallReplyType(reply) == REDISMODULE_REPLY_ERROR) { RedisModule_ReplyWithCallReply(ctx, reply); RedisModule_FreeCallReply(reply); *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT; } *status = EXSTRINGS_STATUS_NO_ERRORS; } typedef struct _ScannedKeys { RedisModuleString **keys; size_t len; } ScannedKeys; ScannedKeys* allocScannedKeys(size_t len) { ScannedKeys *sk = RedisModule_Alloc(sizeof(ScannedKeys)); if (sk) { sk->len = len; sk->keys = RedisModule_Alloc(sizeof(RedisModuleString *)*len); } return sk; } void freeScannedKeys(RedisModuleCtx *ctx, ScannedKeys* sk) { if (sk) { size_t j; for (j = 0; j < sk->len; j++) RedisModule_FreeString(ctx, sk->keys[j]); RedisModule_Free(sk->keys); } RedisModule_Free(sk); } typedef struct _ScanSomeState { RedisModuleString *key; RedisModuleString *count; long long cursor; } ScanSomeState; ScannedKeys *scanSome(RedisModuleCtx* ctx, ScanSomeState* state, ExstringsStatus* status) { RedisModuleString *scanargv[SCANARGC] = {NULL}; scanargv[0] = RedisModule_CreateStringFromLongLong(ctx, state->cursor); scanargv[1] = match_str; scanargv[2] = state->key; scanargv[3] = count_str; scanargv[4] = state->count; RedisModuleCallReply *reply; reply = RedisModule_Call(ctx, "SCAN", "v", scanargv, SCANARGC); RedisModule_FreeString(ctx, scanargv[0]); forwardIfError(ctx, reply, status); if (*status == EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT) return NULL; state->cursor = callReplyLongLong(RedisModule_CallReplyArrayElement(reply, 0)); RedisModuleCallReply *cr_keys = RedisModule_CallReplyArrayElement(reply, 1); size_t scanned_keys_len = RedisModule_CallReplyLength(cr_keys); if (scanned_keys_len == 0) { RedisModule_FreeCallReply(reply); *status = EXSTRINGS_STATUS_NO_ERRORS; return NULL; } ScannedKeys *scanned_keys = allocScannedKeys(scanned_keys_len); if (scanned_keys == NULL) { RedisModule_FreeCallReply(reply); RedisModule_ReplyWithError(ctx,"-ERR Out of memory"); *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT; return NULL; } scanned_keys->len = scanned_keys_len; size_t j; for (j = 0; j < scanned_keys_len; j++) { RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(cr_keys,j)); scanned_keys->keys[j] = rms; } RedisModule_FreeCallReply(reply); *status = EXSTRINGS_STATUS_NO_ERRORS; return scanned_keys; } inline void unlockThreadsafeContext(RedisModuleCtx *ctx, bool using_threadsafe_context) { if (using_threadsafe_context) RedisModule_ThreadSafeContextUnlock(ctx); } inline void lockThreadsafeContext(RedisModuleCtx *ctx, bool using_threadsafe_context) { if (using_threadsafe_context) RedisModule_ThreadSafeContextLock(ctx); } void multiPubCommand(RedisModuleCtx *ctx, PubParams* pubParams) { RedisModuleCallReply *reply = NULL; for (unsigned int i = 0 ; i < pubParams->length ; i += 2) { reply = RedisModule_Call(ctx, "PUBLISH", "v", pubParams->channel_msg_pairs + i, 2); RedisModule_FreeCallReply(reply); } } int setStringGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, const int flag) { RedisModuleString *oldvalstr = NULL; RedisModuleCallReply *reply = NULL; if (argc < 4) return RedisModule_WrongArity(ctx); else oldvalstr = argv[3]; /*Check if key type is string*/ RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1], REDISMODULE_READ); int type = RedisModule_KeyType(key); RedisModule_CloseKey(key); if (type == REDISMODULE_KEYTYPE_EMPTY) { if (flag == OBJ_OP_IE){ RedisModule_ReplyWithNull(ctx); return REDISMODULE_OK; } } else if (type != REDISMODULE_KEYTYPE_STRING) { return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE); } /*Get the value*/ reply = RedisModule_Call(ctx, "GET", "s", argv[1]); ASSERT_NOERROR(reply) size_t curlen=0, oldvallen=0; const char *oldval = RedisModule_StringPtrLen(oldvalstr, &oldvallen); const char *curval = RedisModule_CallReplyStringPtr(reply, &curlen); if (((flag == OBJ_OP_IE) && (!curval || (oldvallen != curlen) || strncmp(oldval, curval, curlen))) || ((flag == OBJ_OP_NE) && curval && (oldvallen == curlen) && !strncmp(oldval, curval, curlen))) { RedisModule_FreeCallReply(reply); return RedisModule_ReplyWithNull(ctx); } RedisModule_FreeCallReply(reply); /* Prepare the arguments for the command. */ int i, j=0, cmdargc=argc-2; RedisModuleString *cmdargv[cmdargc]; for (i = 1; i < argc; i++) { if (i == 3) continue; cmdargv[j++] = argv[i]; } /* Call the command and pass back the reply. */ reply = RedisModule_Call(ctx, "SET", "v!", cmdargv, cmdargc); ASSERT_NOERROR(reply) RedisModule_ReplyWithCallReply(ctx, reply); RedisModule_FreeCallReply(reply); return REDISMODULE_OK; } int SetIE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_AutoMemory(ctx); return setStringGenericCommand(ctx, argv, argc, OBJ_OP_IE); } int SetNE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_AutoMemory(ctx); return setStringGenericCommand(ctx, argv, argc, OBJ_OP_NE); } int delStringGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, const int flag) { RedisModuleString *oldvalstr = NULL; RedisModuleCallReply *reply = NULL; if (argc == 3) oldvalstr = argv[2]; else return RedisModule_WrongArity(ctx); /*Check if key type is string*/ RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1], REDISMODULE_READ); int type = RedisModule_KeyType(key); RedisModule_CloseKey(key); if (type == REDISMODULE_KEYTYPE_EMPTY) { return RedisModule_ReplyWithLongLong(ctx, 0); } else if (type != REDISMODULE_KEYTYPE_STRING) { return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE); } /*Get the value*/ reply = RedisModule_Call(ctx, "GET", "s", argv[1]); ASSERT_NOERROR(reply) size_t curlen = 0, oldvallen = 0; const char *oldval = RedisModule_StringPtrLen(oldvalstr, &oldvallen); const char *curval = RedisModule_CallReplyStringPtr(reply, &curlen); if (((flag == OBJ_OP_IE) && (!curval || (oldvallen != curlen) || strncmp(oldval, curval, curlen))) || ((flag == OBJ_OP_NE) && curval && (oldvallen == curlen) && !strncmp(oldval, curval, curlen))) { RedisModule_FreeCallReply(reply); return RedisModule_ReplyWithLongLong(ctx, 0); } RedisModule_FreeCallReply(reply); /* Prepare the arguments for the command. */ int cmdargc=1; RedisModuleString *cmdargv[1]; cmdargv[0] = argv[1]; /* Call the command and pass back the reply. */ reply = RedisModule_Call(ctx, "UNLINK", "v!", cmdargv, cmdargc); ASSERT_NOERROR(reply) RedisModule_ReplyWithCallReply(ctx, reply); RedisModule_FreeCallReply(reply); return REDISMODULE_OK; } int DelIE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_AutoMemory(ctx); return delStringGenericCommand(ctx, argv, argc, OBJ_OP_IE); } int DelNE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_AutoMemory(ctx); return delStringGenericCommand(ctx, argv, argc, OBJ_OP_NE); } int setPubStringCommon(RedisModuleCtx *ctx, SetParams* setParamsPtr, PubParams* pubParamsPtr) { RedisModuleCallReply *setReply; setReply = RedisModule_Call(ctx, "MSET", "v!", setParamsPtr->key_val_pairs, setParamsPtr->length); ASSERT_NOERROR(setReply) multiPubCommand(ctx, pubParamsPtr); RedisModule_ReplyWithCallReply(ctx, setReply); RedisModule_FreeCallReply(setReply); return REDISMODULE_OK; } int SetPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc < 5 || (argc % 2) == 0) return RedisModule_WrongArity(ctx); RedisModule_AutoMemory(ctx); SetParams setParams = { .key_val_pairs = argv + 1, .length = argc - 3 }; PubParams pubParams = { .channel_msg_pairs = argv + 1 + setParams.length, .length = 2 }; return setPubStringCommon(ctx, &setParams, &pubParams); } int SetMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc < 7 || (argc % 2) == 0) return RedisModule_WrongArity(ctx); RedisModule_AutoMemory(ctx); long long setPairsCount, pubPairsCount; RedisModule_StringToLongLong(argv[1], &setPairsCount); RedisModule_StringToLongLong(argv[2], &pubPairsCount); if (setPairsCount < 1 || pubPairsCount < 1) return RedisModule_ReplyWithError(ctx, "ERR SET_PAIR_COUNT and PUB_PAIR_COUNT must be greater than zero"); long long setLen, pubLen; setLen = 2*setPairsCount; pubLen = 2*pubPairsCount; if (setLen + pubLen + 3 != argc) return RedisModule_ReplyWithError(ctx, "ERR SET_PAIR_COUNT or PUB_PAIR_COUNT do not match the total pair count"); SetParams setParams = { .key_val_pairs = argv + 3, .length = setLen }; PubParams pubParams = { .channel_msg_pairs = argv + 3 + setParams.length, .length = pubLen }; return setPubStringCommon(ctx, &setParams, &pubParams); } int setIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag) { SetParams setParams = { .key_val_pairs = argv + 1, .length = 2 }; PubParams pubParams = { .channel_msg_pairs = argv + 4, .length = argc - 4 }; RedisModuleString *key = setParams.key_val_pairs[0]; RedisModuleString *oldvalstr = argv[3]; int type = getKeyType(ctx, key); if (flag == OBJ_OP_IE && type == REDISMODULE_KEYTYPE_EMPTY) { return RedisModule_ReplyWithNull(ctx); } else if (type != REDISMODULE_KEYTYPE_STRING && type != REDISMODULE_KEYTYPE_EMPTY) { return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); } RedisModuleCallReply *reply = RedisModule_Call(ctx, "GET", "s", key); ASSERT_NOERROR(reply) bool is_equal = replyContentsEqualString(reply, oldvalstr); RedisModule_FreeCallReply(reply); if ((flag == OBJ_OP_IE && !is_equal) || (flag == OBJ_OP_NE && is_equal)) { return RedisModule_ReplyWithNull(ctx); } return setPubStringCommon(ctx, &setParams, &pubParams); } int SetIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc != 6) return RedisModule_WrongArity(ctx); RedisModule_AutoMemory(ctx); return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE); } int SetIEMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc < 6 || (argc % 2) != 0) return RedisModule_WrongArity(ctx); RedisModule_AutoMemory(ctx); return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE); } int SetNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc != 6) return RedisModule_WrongArity(ctx); RedisModule_AutoMemory(ctx); return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE); } int setXXNXPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag) { SetParams setParams = { .key_val_pairs = argv + 1, .length = 2 }; PubParams pubParams = { .channel_msg_pairs = argv + 3, .length = argc - 3 }; RedisModuleString *key = setParams.key_val_pairs[0]; int type = getKeyType(ctx, key); if ((flag == OBJ_OP_XX && type == REDISMODULE_KEYTYPE_EMPTY) || (flag == OBJ_OP_NX && type == REDISMODULE_KEYTYPE_STRING)) { return RedisModule_ReplyWithNull(ctx); } else if (type != REDISMODULE_KEYTYPE_STRING && type != REDISMODULE_KEYTYPE_EMPTY) { RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); return REDISMODULE_OK; } return setPubStringCommon(ctx, &setParams, &pubParams); } int SetNXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc != 5) return RedisModule_WrongArity(ctx); RedisModule_AutoMemory(ctx); return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_NX); } int SetNXMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc < 5 || (argc % 2) == 0) return RedisModule_WrongArity(ctx); RedisModule_AutoMemory(ctx); return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_NX); } int SetXXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc != 5) return RedisModule_WrongArity(ctx); RedisModule_AutoMemory(ctx); return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_XX); } int delPubStringCommon(RedisModuleCtx *ctx, DelParams *delParamsPtr, PubParams *pubParamsPtr) { RedisModuleCallReply *reply = RedisModule_Call(ctx, "UNLINK", "v!", delParamsPtr->keys, delParamsPtr->length); ASSERT_NOERROR(reply) int replytype = RedisModule_CallReplyType(reply); if (replytype == REDISMODULE_REPLY_NULL) { RedisModule_ReplyWithNull(ctx); } else if (RedisModule_CallReplyInteger(reply) == 0) { RedisModule_ReplyWithCallReply(ctx, reply); } else { RedisModule_ReplyWithCallReply(ctx, reply); multiPubCommand(ctx, pubParamsPtr); } RedisModule_FreeCallReply(reply); return REDISMODULE_OK; } int DelPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc < 4) return RedisModule_WrongArity(ctx); RedisModule_AutoMemory(ctx); DelParams delParams = { .keys = argv + 1, .length = argc - 3 }; PubParams pubParams = { .channel_msg_pairs = argv + 1 + delParams.length, .length = 2 }; return delPubStringCommon(ctx, &delParams, &pubParams); } int DelMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc < 6) return RedisModule_WrongArity(ctx); RedisModule_AutoMemory(ctx); long long delCount, pubPairsCount; RedisModule_StringToLongLong(argv[1], &delCount); RedisModule_StringToLongLong(argv[2], &pubPairsCount); if (delCount < 1 || pubPairsCount < 1) return RedisModule_ReplyWithError(ctx, "ERR DEL_COUNT and PUB_PAIR_COUNT must be greater than zero"); long long delLen, pubLen; delLen = delCount; pubLen = 2*pubPairsCount; if (delLen + pubLen + 3 != argc) return RedisModule_ReplyWithError(ctx, "ERR DEL_COUNT or PUB_PAIR_COUNT do not match the total pair count"); DelParams delParams = { .keys = argv + 3, .length = delLen }; PubParams pubParams = { .channel_msg_pairs = argv + 3 + delParams.length, .length = pubLen }; return delPubStringCommon(ctx, &delParams, &pubParams); } int delIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag) { DelParams delParams = { .keys = argv + 1, .length = 1 }; PubParams pubParams = { .channel_msg_pairs = argv + 3, .length = argc - 3 }; RedisModuleString *key = argv[1]; RedisModuleString *oldvalstr = argv[2]; int type = getKeyType(ctx, key); if (type == REDISMODULE_KEYTYPE_EMPTY) { return RedisModule_ReplyWithLongLong(ctx, 0); } else if (type != REDISMODULE_KEYTYPE_STRING) { return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); } RedisModuleCallReply *reply = RedisModule_Call(ctx, "GET", "s", key); ASSERT_NOERROR(reply) bool is_equal = replyContentsEqualString(reply, oldvalstr); RedisModule_FreeCallReply(reply); if ((flag == OBJ_OP_IE && !is_equal) || (flag == OBJ_OP_NE && is_equal)) { return RedisModule_ReplyWithLongLong(ctx, 0); } return delPubStringCommon(ctx, &delParams, &pubParams); } int DelIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc != 5) return RedisModule_WrongArity(ctx); RedisModule_AutoMemory(ctx); return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE); } int DelIEMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc < 5 || (argc % 2) == 0) return RedisModule_WrongArity(ctx); RedisModule_AutoMemory(ctx); return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE); } int DelNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc != 5) return RedisModule_WrongArity(ctx); RedisModule_AutoMemory(ctx); return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE); } int Nget_RedisCommand(RedisModuleCtx *ctx, NgetArgs* nget_args, bool using_threadsafe_context) { int ret = REDISMODULE_OK; size_t replylen = 0; RedisModuleCallReply *reply = NULL; ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET; ScanSomeState scan_state; ScannedKeys *scanned_keys; scan_state.key = nget_args->key; scan_state.count = nget_args->count; scan_state.cursor = 0; RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); do { lockThreadsafeContext(ctx, using_threadsafe_context); status = EXSTRINGS_STATUS_NOT_SET; scanned_keys = scanSome(ctx, &scan_state, &status); if (status != EXSTRINGS_STATUS_NO_ERRORS) { unlockThreadsafeContext(ctx, using_threadsafe_context); ret = REDISMODULE_ERR; break; } else if (scanned_keys == NULL) { unlockThreadsafeContext(ctx, using_threadsafe_context); continue; } reply = RedisModule_Call(ctx, "MGET", "v", scanned_keys->keys, scanned_keys->len); unlockThreadsafeContext(ctx, using_threadsafe_context); status = EXSTRINGS_STATUS_NOT_SET; forwardIfError(ctx, reply, &status); if (status != EXSTRINGS_STATUS_NO_ERRORS) { freeScannedKeys(ctx, scanned_keys); ret = REDISMODULE_ERR; break; } size_t i; for (i = 0; i < scanned_keys->len; i++) { RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(reply, i)); if (rms) { RedisModule_ReplyWithString(ctx, scanned_keys->keys[i]); RedisModule_ReplyWithString(ctx, rms); RedisModule_FreeString(ctx, rms); replylen += 2; } } RedisModule_FreeCallReply(reply); freeScannedKeys(ctx, scanned_keys); } while (scan_state.cursor != 0); RedisModule_ReplySetArrayLength(ctx,replylen); return ret; } /* The thread entry point that actually executes the blocking part * of the command nget.noatomic */ void *NGet_NoAtomic_ThreadMain(void *arg) { pthread_detach(pthread_self()); RedisModuleBlockedClientArgs *bca = arg; RedisModuleBlockedClient *bc = bca->bc; RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc); Nget_RedisCommand(ctx, &bca->nget_args, true); RedisModule_FreeThreadSafeContext(ctx); RedisModule_UnblockClient(bc, NULL); RedisModule_Free(bca); return NULL; } int NGet_NoAtomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_AutoMemory(ctx); pthread_t tid; InitStaticVariable(); RedisModuleBlockedClientArgs *bca = RedisModule_Alloc(sizeof(RedisModuleBlockedClientArgs)); if (bca == NULL) { RedisModule_ReplyWithError(ctx,"-ERR Out of memory"); return REDISMODULE_ERR; } ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET; readNgetArgs(ctx, argv, argc, &bca->nget_args, &status); if (status != EXSTRINGS_STATUS_NO_ERRORS) { RedisModule_Free(bca); return REDISMODULE_ERR; } /* Note that when blocking the client we do not set any callback: no * timeout is possible since we passed '0', nor we need a reply callback * because we'll use the thread safe context to accumulate a reply. */ RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0); bca->bc = bc; /* Now that we setup a blocking client, we need to pass the control * to the thread. However we need to pass arguments to the thread: * the reference to the blocked client handle. */ if (pthread_create(&tid,NULL,NGet_NoAtomic_ThreadMain,bca) != 0) { RedisModule_AbortBlock(bc); RedisModule_Free(bca); return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread"); } return REDISMODULE_OK; } int NGet_Atomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_AutoMemory(ctx); NgetArgs nget_args; ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET; InitStaticVariable(); readNgetArgs(ctx, argv, argc, &nget_args, &status); if (status != EXSTRINGS_STATUS_NO_ERRORS) { return REDISMODULE_ERR; } return Nget_RedisCommand(ctx, &nget_args, false); } int NDel_Atomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_AutoMemory(ctx); int ret = REDISMODULE_OK; long long replylen = 0; RedisModuleCallReply *reply = NULL; ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET; ScanSomeState scan_state; ScannedKeys *scanned_keys = NULL; InitStaticVariable(); if (argc != 2) return RedisModule_WrongArity(ctx); scan_state.key = argv[1]; scan_state.count = def_count_str; scan_state.cursor = 0; do { status = EXSTRINGS_STATUS_NOT_SET; scanned_keys = scanSome(ctx, &scan_state, &status); if (status != EXSTRINGS_STATUS_NO_ERRORS) { ret = REDISMODULE_ERR; break; } else if (scanned_keys == NULL) { continue; } reply = RedisModule_Call(ctx, "UNLINK", "v!", scanned_keys->keys, scanned_keys->len); status = EXSTRINGS_STATUS_NOT_SET; forwardIfError(ctx, reply, &status); if (status != EXSTRINGS_STATUS_NO_ERRORS) { freeScannedKeys(ctx, scanned_keys); ret = REDISMODULE_ERR; break; } replylen += RedisModule_CallReplyInteger(reply); RedisModule_FreeCallReply(reply); freeScannedKeys(ctx, scanned_keys); } while (scan_state.cursor != 0); if (ret == REDISMODULE_OK) { RedisModule_ReplyWithLongLong(ctx, replylen); } return ret; } /* This function must be present on each Redis module. It is used in order to * register the commands into the Redis server. */ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); if (RedisModule_Init(ctx,"exstrings",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"setie", SetIE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"setne", SetNE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"delie", DelIE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"delne", DelNE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"nget.atomic", NGet_Atomic_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"nget.noatomic", NGet_NoAtomic_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"ndel.atomic", NDel_Atomic_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"msetpub", SetPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"msetmpub", SetMPub_RedisCommand,"write deny-oom pubsub",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"setiepub", SetIEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"setiempub", SetIEMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"setnepub", SetNEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"setxxpub", SetXXPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"setnxpub", SetNXPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"setnxmpub", SetNXMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"delpub", DelPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"delmpub", DelMPub_RedisCommand,"write deny-oom pubsub",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"deliepub", DelIEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"deliempub", DelIEMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"delnepub", DelNEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; return REDISMODULE_OK; }