2 * Copyright (c) 2018-2020 Nokia.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
22 #include "redismodule.h"
30 #include "exstringsStub.h"
31 #include "commonStub.h"
35 /* make sure the response is not NULL or an error.
36 sends the error to the client and exit the current function if its */
37 #define ASSERT_NOERROR(r) \
39 return RedisModule_ReplyWithError(ctx,"ERR reply is NULL"); \
40 } else if (RedisModule_CallReplyType(r) == REDISMODULE_REPLY_ERROR) { \
41 return RedisModule_ReplyWithCallReply(ctx,r); \
45 #define OBJ_OP_XX (1<<1) /* OP if key exist */
46 #define OBJ_OP_NX (1<<2) /* OP if key not exist */
47 #define OBJ_OP_IE (1<<4) /* OP if equal old value */
48 #define OBJ_OP_NE (1<<5) /* OP if not equal old value */
52 #define MATCH_STR "MATCH"
53 #define COUNT_STR "COUNT"
56 RedisModuleString *def_count_str = NULL, *match_str = NULL, *count_str = NULL, *zero_str = NULL;
58 typedef struct _NgetArgs {
59 RedisModuleString *key;
60 RedisModuleString *count;
63 typedef struct RedisModuleBlockedClientArgs {
64 RedisModuleBlockedClient *bc;
66 } RedisModuleBlockedClientArgs;
68 void InitStaticVariable()
70 if (def_count_str == NULL)
71 def_count_str = RedisModule_CreateStringFromLongLong(NULL, DEF_COUNT);
72 if (match_str == NULL)
73 match_str = RedisModule_CreateString(NULL, MATCH_STR, sizeof(MATCH_STR));
74 if (count_str == NULL)
75 count_str = RedisModule_CreateString(NULL, COUNT_STR, sizeof(COUNT_STR));
77 zero_str = RedisModule_CreateStringFromLongLong(NULL, ZERO);
82 int getKeyType(RedisModuleCtx *ctx, RedisModuleString *key_str)
84 RedisModuleKey *key = RedisModule_OpenKey(ctx, key_str, REDISMODULE_READ);
85 int type = RedisModule_KeyType(key);
86 RedisModule_CloseKey(key);
90 bool replyContentsEqualString(RedisModuleCallReply *reply, RedisModuleString *expected_value)
92 size_t replylen = 0, expectedlen = 0;
93 const char *expectedval = RedisModule_StringPtrLen(expected_value, &expectedlen);
94 const char *replyval = RedisModule_CallReplyStringPtr(reply, &replylen);
96 expectedlen == replylen &&
97 !strncmp(expectedval, replyval, replylen);
100 typedef struct _SetParams {
101 RedisModuleString **key_val_pairs;
105 typedef struct _PubParams {
106 RedisModuleString **channel_msg_pairs;
110 typedef struct _DelParams {
111 RedisModuleString **keys;
115 typedef enum _ExstringsStatus {
116 EXSTRINGS_STATUS_NO_ERRORS = 0,
117 EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT,
118 EXSTRINGS_STATUS_NOT_SET
121 void readNgetArgs(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
122 NgetArgs* nget_args, ExstringsStatus* status)
128 nget_args->key = argv[1];
129 nget_args->count = def_count_str;
130 } else if (argc == 4) {
131 if (strcasecmp(RedisModule_StringPtrLen(argv[2], &str_len), "count")) {
132 RedisModule_ReplyWithError(ctx,"-ERR syntax error");
133 *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
137 int ret = RedisModule_StringToLongLong(argv[3], &number) != REDISMODULE_OK;
138 if (ret != REDISMODULE_OK || number < 1) {
139 RedisModule_ReplyWithError(ctx,"-ERR value is not an integer or out of range");
140 *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
144 nget_args->key = argv[1];
145 nget_args->count = argv[3];
147 /* In redis there is a bug (or undocumented feature see link)
148 * where calling 'RedisModule_WrongArity'
149 * within a blocked client will crash redis.
151 * Therefore we need to call this function to validate args
152 * before putting the client into blocking mode.
155 * https://github.com/antirez/redis/issues/6382
156 * 'If any thread tries to access the command arguments from
157 * within the ThreadSafeContext they will crash redis' */
158 RedisModule_WrongArity(ctx);
159 *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
163 *status = EXSTRINGS_STATUS_NO_ERRORS;
167 long long callReplyLongLong(RedisModuleCallReply* reply)
169 const char* cursor_str_ptr = RedisModule_CallReplyStringPtr(reply, NULL);
170 return strtoll(cursor_str_ptr, NULL, 10);
173 void forwardIfError(RedisModuleCtx *ctx, RedisModuleCallReply *reply, ExstringsStatus* status)
175 if (RedisModule_CallReplyType(reply) == REDISMODULE_REPLY_ERROR) {
176 RedisModule_ReplyWithCallReply(ctx, reply);
177 RedisModule_FreeCallReply(reply);
178 *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
180 *status = EXSTRINGS_STATUS_NO_ERRORS;
183 typedef struct _ScannedKeys {
184 RedisModuleString **keys;
188 ScannedKeys* allocScannedKeys(size_t len)
190 ScannedKeys *sk = RedisModule_Alloc(sizeof(ScannedKeys));
193 sk->keys = RedisModule_Alloc(sizeof(RedisModuleString *)*len);
198 void freeScannedKeys(RedisModuleCtx *ctx, ScannedKeys* sk)
202 for (j = 0; j < sk->len; j++)
203 RedisModule_FreeString(ctx, sk->keys[j]);
204 RedisModule_Free(sk->keys);
206 RedisModule_Free(sk);
209 typedef struct _ScanSomeState {
210 RedisModuleString *key;
211 RedisModuleString *count;
215 ScannedKeys *scanSome(RedisModuleCtx* ctx, ScanSomeState* state, ExstringsStatus* status)
217 RedisModuleString *scanargv[SCANARGC] = {NULL};
219 scanargv[0] = RedisModule_CreateStringFromLongLong(ctx, state->cursor);
220 scanargv[1] = match_str;
221 scanargv[2] = state->key;
222 scanargv[3] = count_str;
223 scanargv[4] = state->count;
225 RedisModuleCallReply *reply;
226 reply = RedisModule_Call(ctx, "SCAN", "v", scanargv, SCANARGC);
227 RedisModule_FreeString(ctx, scanargv[0]);
228 forwardIfError(ctx, reply, status);
229 if (*status == EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT)
232 state->cursor = callReplyLongLong(RedisModule_CallReplyArrayElement(reply, 0));
233 RedisModuleCallReply *cr_keys =
234 RedisModule_CallReplyArrayElement(reply, 1);
236 size_t scanned_keys_len = RedisModule_CallReplyLength(cr_keys);
237 if (scanned_keys_len == 0) {
238 RedisModule_FreeCallReply(reply);
239 *status = EXSTRINGS_STATUS_NO_ERRORS;
243 ScannedKeys *scanned_keys = allocScannedKeys(scanned_keys_len);
244 if (scanned_keys == NULL) {
245 RedisModule_FreeCallReply(reply);
246 RedisModule_ReplyWithError(ctx,"-ERR Out of memory");
247 *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
251 scanned_keys->len = scanned_keys_len;
253 for (j = 0; j < scanned_keys_len; j++) {
254 RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(cr_keys,j));
255 scanned_keys->keys[j] = rms;
257 RedisModule_FreeCallReply(reply);
258 *status = EXSTRINGS_STATUS_NO_ERRORS;
262 inline void unlockThreadsafeContext(RedisModuleCtx *ctx, bool using_threadsafe_context)
264 if (using_threadsafe_context)
265 RedisModule_ThreadSafeContextUnlock(ctx);
268 inline void lockThreadsafeContext(RedisModuleCtx *ctx, bool using_threadsafe_context)
270 if (using_threadsafe_context)
271 RedisModule_ThreadSafeContextLock(ctx);
274 void multiPubCommand(RedisModuleCtx *ctx, PubParams* pubParams)
276 RedisModuleCallReply *reply = NULL;
277 for (unsigned int i = 0 ; i < pubParams->length ; i += 2) {
278 reply = RedisModule_Call(ctx, "PUBLISH", "v", pubParams->channel_msg_pairs + i, 2);
279 RedisModule_FreeCallReply(reply);
283 int setStringGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
284 int argc, const int flag)
286 RedisModuleString *oldvalstr = NULL;
287 RedisModuleCallReply *reply = NULL;
290 return RedisModule_WrongArity(ctx);
294 /*Check if key type is string*/
295 RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
297 int type = RedisModule_KeyType(key);
298 RedisModule_CloseKey(key);
300 if (type == REDISMODULE_KEYTYPE_EMPTY) {
301 if (flag == OBJ_OP_IE){
302 RedisModule_ReplyWithNull(ctx);
303 return REDISMODULE_OK;
305 } else if (type != REDISMODULE_KEYTYPE_STRING) {
306 return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE);
310 reply = RedisModule_Call(ctx, "GET", "s", argv[1]);
311 ASSERT_NOERROR(reply)
312 size_t curlen=0, oldvallen=0;
313 const char *oldval = RedisModule_StringPtrLen(oldvalstr, &oldvallen);
314 const char *curval = RedisModule_CallReplyStringPtr(reply, &curlen);
315 if (((flag == OBJ_OP_IE) &&
316 (!curval || (oldvallen != curlen) || strncmp(oldval, curval, curlen)))
318 ((flag == OBJ_OP_NE) && curval && (oldvallen == curlen) &&
319 !strncmp(oldval, curval, curlen))) {
320 RedisModule_FreeCallReply(reply);
321 return RedisModule_ReplyWithNull(ctx);
323 RedisModule_FreeCallReply(reply);
325 /* Prepare the arguments for the command. */
326 int i, j=0, cmdargc=argc-2;
327 RedisModuleString *cmdargv[cmdargc];
328 for (i = 1; i < argc; i++) {
331 cmdargv[j++] = argv[i];
334 /* Call the command and pass back the reply. */
335 reply = RedisModule_Call(ctx, "SET", "v!", cmdargv, cmdargc);
336 ASSERT_NOERROR(reply)
337 RedisModule_ReplyWithCallReply(ctx, reply);
339 RedisModule_FreeCallReply(reply);
340 return REDISMODULE_OK;
343 int SetIE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
345 RedisModule_AutoMemory(ctx);
346 return setStringGenericCommand(ctx, argv, argc, OBJ_OP_IE);
349 int SetNE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
351 RedisModule_AutoMemory(ctx);
352 return setStringGenericCommand(ctx, argv, argc, OBJ_OP_NE);
355 int delStringGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
356 int argc, const int flag)
358 RedisModuleString *oldvalstr = NULL;
359 RedisModuleCallReply *reply = NULL;
364 return RedisModule_WrongArity(ctx);
366 /*Check if key type is string*/
367 RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
369 int type = RedisModule_KeyType(key);
370 RedisModule_CloseKey(key);
372 if (type == REDISMODULE_KEYTYPE_EMPTY) {
373 return RedisModule_ReplyWithLongLong(ctx, 0);
374 } else if (type != REDISMODULE_KEYTYPE_STRING) {
375 return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE);
379 reply = RedisModule_Call(ctx, "GET", "s", argv[1]);
380 ASSERT_NOERROR(reply)
381 size_t curlen = 0, oldvallen = 0;
382 const char *oldval = RedisModule_StringPtrLen(oldvalstr, &oldvallen);
383 const char *curval = RedisModule_CallReplyStringPtr(reply, &curlen);
384 if (((flag == OBJ_OP_IE) &&
385 (!curval || (oldvallen != curlen) || strncmp(oldval, curval, curlen)))
387 ((flag == OBJ_OP_NE) && curval && (oldvallen == curlen) &&
388 !strncmp(oldval, curval, curlen))) {
389 RedisModule_FreeCallReply(reply);
390 return RedisModule_ReplyWithLongLong(ctx, 0);
392 RedisModule_FreeCallReply(reply);
394 /* Prepare the arguments for the command. */
396 RedisModuleString *cmdargv[1];
397 cmdargv[0] = argv[1];
399 /* Call the command and pass back the reply. */
400 reply = RedisModule_Call(ctx, "UNLINK", "v!", cmdargv, cmdargc);
401 ASSERT_NOERROR(reply)
402 RedisModule_ReplyWithCallReply(ctx, reply);
404 RedisModule_FreeCallReply(reply);
405 return REDISMODULE_OK;
408 int DelIE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
410 RedisModule_AutoMemory(ctx);
411 return delStringGenericCommand(ctx, argv, argc, OBJ_OP_IE);
414 int DelNE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
416 RedisModule_AutoMemory(ctx);
417 return delStringGenericCommand(ctx, argv, argc, OBJ_OP_NE);
419 int setPubStringCommon(RedisModuleCtx *ctx, SetParams* setParamsPtr, PubParams* pubParamsPtr)
421 RedisModuleCallReply *setReply;
422 setReply = RedisModule_Call(ctx, "MSET", "v!", setParamsPtr->key_val_pairs, setParamsPtr->length);
423 ASSERT_NOERROR(setReply)
424 multiPubCommand(ctx, pubParamsPtr);
425 RedisModule_ReplyWithCallReply(ctx, setReply);
426 RedisModule_FreeCallReply(setReply);
427 return REDISMODULE_OK;
430 int SetPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
432 if (argc < 5 || (argc % 2) == 0)
433 return RedisModule_WrongArity(ctx);
435 RedisModule_AutoMemory(ctx);
436 SetParams setParams = {
437 .key_val_pairs = argv + 1,
440 PubParams pubParams = {
441 .channel_msg_pairs = argv + 1 + setParams.length,
445 return setPubStringCommon(ctx, &setParams, &pubParams);
448 int SetMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
450 if (argc < 7 || (argc % 2) == 0)
451 return RedisModule_WrongArity(ctx);
453 RedisModule_AutoMemory(ctx);
454 long long setPairsCount, pubPairsCount;
455 RedisModule_StringToLongLong(argv[1], &setPairsCount);
456 RedisModule_StringToLongLong(argv[2], &pubPairsCount);
457 if (setPairsCount < 1 || pubPairsCount < 1)
458 return RedisModule_ReplyWithError(ctx, "ERR SET_PAIR_COUNT and PUB_PAIR_COUNT must be greater than zero");
460 long long setLen, pubLen;
461 setLen = 2*setPairsCount;
462 pubLen = 2*pubPairsCount;
464 if (setLen + pubLen + 3 != argc)
465 return RedisModule_ReplyWithError(ctx, "ERR SET_PAIR_COUNT or PUB_PAIR_COUNT do not match the total pair count");
467 SetParams setParams = {
468 .key_val_pairs = argv + 3,
471 PubParams pubParams = {
472 .channel_msg_pairs = argv + 3 + setParams.length,
476 return setPubStringCommon(ctx, &setParams, &pubParams);
479 int setIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
481 SetParams setParams = {
482 .key_val_pairs = argv + 1,
485 PubParams pubParams = {
486 .channel_msg_pairs = argv + 4,
489 RedisModuleString *key = setParams.key_val_pairs[0];
490 RedisModuleString *oldvalstr = argv[3];
492 int type = getKeyType(ctx, key);
493 if (flag == OBJ_OP_IE && type == REDISMODULE_KEYTYPE_EMPTY) {
494 return RedisModule_ReplyWithNull(ctx);
495 } else if (type != REDISMODULE_KEYTYPE_STRING && type != REDISMODULE_KEYTYPE_EMPTY) {
496 return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
499 RedisModuleCallReply *reply = RedisModule_Call(ctx, "GET", "s", key);
500 ASSERT_NOERROR(reply)
501 bool is_equal = replyContentsEqualString(reply, oldvalstr);
502 RedisModule_FreeCallReply(reply);
503 if ((flag == OBJ_OP_IE && !is_equal) ||
504 (flag == OBJ_OP_NE && is_equal)) {
505 return RedisModule_ReplyWithNull(ctx);
508 return setPubStringCommon(ctx, &setParams, &pubParams);
511 int SetIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
514 return RedisModule_WrongArity(ctx);
516 RedisModule_AutoMemory(ctx);
517 return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
520 int SetIEMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
522 if (argc < 6 || (argc % 2) != 0)
523 return RedisModule_WrongArity(ctx);
525 RedisModule_AutoMemory(ctx);
526 return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
529 int SetNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
532 return RedisModule_WrongArity(ctx);
534 RedisModule_AutoMemory(ctx);
535 return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE);
538 int setXXNXPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
540 SetParams setParams = {
541 .key_val_pairs = argv + 1,
544 PubParams pubParams = {
545 .channel_msg_pairs = argv + 3,
548 RedisModuleString *key = setParams.key_val_pairs[0];
550 int type = getKeyType(ctx, key);
551 if ((flag == OBJ_OP_XX && type == REDISMODULE_KEYTYPE_EMPTY) ||
552 (flag == OBJ_OP_NX && type == REDISMODULE_KEYTYPE_STRING)) {
553 return RedisModule_ReplyWithNull(ctx);
554 } else if (type != REDISMODULE_KEYTYPE_STRING && type != REDISMODULE_KEYTYPE_EMPTY) {
555 RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
556 return REDISMODULE_OK;
559 return setPubStringCommon(ctx, &setParams, &pubParams);
562 int SetNXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
565 return RedisModule_WrongArity(ctx);
567 RedisModule_AutoMemory(ctx);
568 return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_NX);
571 int SetNXMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
573 if (argc < 5 || (argc % 2) == 0)
574 return RedisModule_WrongArity(ctx);
576 RedisModule_AutoMemory(ctx);
577 return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_NX);
580 int SetXXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
583 return RedisModule_WrongArity(ctx);
585 RedisModule_AutoMemory(ctx);
586 return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_XX);
589 int delPubStringCommon(RedisModuleCtx *ctx, DelParams *delParamsPtr, PubParams *pubParamsPtr)
591 RedisModuleCallReply *reply = RedisModule_Call(ctx, "UNLINK", "v!", delParamsPtr->keys, delParamsPtr->length);
592 ASSERT_NOERROR(reply)
593 int replytype = RedisModule_CallReplyType(reply);
594 if (replytype == REDISMODULE_REPLY_NULL) {
595 RedisModule_ReplyWithNull(ctx);
596 } else if (RedisModule_CallReplyInteger(reply) == 0) {
597 RedisModule_ReplyWithCallReply(ctx, reply);
599 RedisModule_ReplyWithCallReply(ctx, reply);
600 multiPubCommand(ctx, pubParamsPtr);
602 RedisModule_FreeCallReply(reply);
603 return REDISMODULE_OK;
606 int DelPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
609 return RedisModule_WrongArity(ctx);
611 RedisModule_AutoMemory(ctx);
612 DelParams delParams = {
616 PubParams pubParams = {
617 .channel_msg_pairs = argv + 1 + delParams.length,
621 return delPubStringCommon(ctx, &delParams, &pubParams);
624 int DelMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
627 return RedisModule_WrongArity(ctx);
629 RedisModule_AutoMemory(ctx);
630 long long delCount, pubPairsCount;
631 RedisModule_StringToLongLong(argv[1], &delCount);
632 RedisModule_StringToLongLong(argv[2], &pubPairsCount);
633 if (delCount < 1 || pubPairsCount < 1)
634 return RedisModule_ReplyWithError(ctx, "ERR DEL_COUNT and PUB_PAIR_COUNT must be greater than zero");
636 long long delLen, pubLen;
638 pubLen = 2*pubPairsCount;
639 if (delLen + pubLen + 3 != argc)
640 return RedisModule_ReplyWithError(ctx, "ERR DEL_COUNT or PUB_PAIR_COUNT do not match the total pair count");
642 DelParams delParams = {
646 PubParams pubParams = {
647 .channel_msg_pairs = argv + 3 + delParams.length,
651 return delPubStringCommon(ctx, &delParams, &pubParams);
654 int delIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
656 DelParams delParams = {
660 PubParams pubParams = {
661 .channel_msg_pairs = argv + 3,
664 RedisModuleString *key = argv[1];
665 RedisModuleString *oldvalstr = argv[2];
667 int type = getKeyType(ctx, key);
668 if (type == REDISMODULE_KEYTYPE_EMPTY) {
669 return RedisModule_ReplyWithLongLong(ctx, 0);
670 } else if (type != REDISMODULE_KEYTYPE_STRING) {
671 return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
674 RedisModuleCallReply *reply = RedisModule_Call(ctx, "GET", "s", key);
675 ASSERT_NOERROR(reply)
676 bool is_equal = replyContentsEqualString(reply, oldvalstr);
677 RedisModule_FreeCallReply(reply);
678 if ((flag == OBJ_OP_IE && !is_equal) ||
679 (flag == OBJ_OP_NE && is_equal)) {
680 return RedisModule_ReplyWithLongLong(ctx, 0);
683 return delPubStringCommon(ctx, &delParams, &pubParams);
686 int DelIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
689 return RedisModule_WrongArity(ctx);
691 RedisModule_AutoMemory(ctx);
692 return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
695 int DelIEMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
697 if (argc < 5 || (argc % 2) == 0)
698 return RedisModule_WrongArity(ctx);
700 RedisModule_AutoMemory(ctx);
701 return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
704 int DelNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
707 return RedisModule_WrongArity(ctx);
709 RedisModule_AutoMemory(ctx);
710 return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE);
713 int Nget_RedisCommand(RedisModuleCtx *ctx, NgetArgs* nget_args, bool using_threadsafe_context)
715 int ret = REDISMODULE_OK;
717 RedisModuleCallReply *reply = NULL;
718 ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
719 ScanSomeState scan_state;
720 ScannedKeys *scanned_keys;
722 scan_state.key = nget_args->key;
723 scan_state.count = nget_args->count;
724 scan_state.cursor = 0;
726 RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
728 lockThreadsafeContext(ctx, using_threadsafe_context);
730 status = EXSTRINGS_STATUS_NOT_SET;
731 scanned_keys = scanSome(ctx, &scan_state, &status);
733 if (status != EXSTRINGS_STATUS_NO_ERRORS) {
734 unlockThreadsafeContext(ctx, using_threadsafe_context);
735 ret = REDISMODULE_ERR;
737 } else if (scanned_keys == NULL) {
738 unlockThreadsafeContext(ctx, using_threadsafe_context);
742 reply = RedisModule_Call(ctx, "MGET", "v", scanned_keys->keys, scanned_keys->len);
744 unlockThreadsafeContext(ctx, using_threadsafe_context);
746 status = EXSTRINGS_STATUS_NOT_SET;
747 forwardIfError(ctx, reply, &status);
748 if (status != EXSTRINGS_STATUS_NO_ERRORS) {
749 freeScannedKeys(ctx, scanned_keys);
750 ret = REDISMODULE_ERR;
755 for (i = 0; i < scanned_keys->len; i++) {
756 RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(reply, i));
758 RedisModule_ReplyWithString(ctx, scanned_keys->keys[i]);
759 RedisModule_ReplyWithString(ctx, rms);
760 RedisModule_FreeString(ctx, rms);
764 RedisModule_FreeCallReply(reply);
765 freeScannedKeys(ctx, scanned_keys);
766 } while (scan_state.cursor != 0);
768 RedisModule_ReplySetArrayLength(ctx,replylen);
772 /* The thread entry point that actually executes the blocking part
773 * of the command nget.noatomic
775 void *NGet_NoAtomic_ThreadMain(void *arg)
777 pthread_detach(pthread_self());
779 RedisModuleBlockedClientArgs *bca = arg;
780 RedisModuleBlockedClient *bc = bca->bc;
781 RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
783 Nget_RedisCommand(ctx, &bca->nget_args, true);
784 RedisModule_FreeThreadSafeContext(ctx);
785 RedisModule_UnblockClient(bc, NULL);
786 RedisModule_Free(bca);
790 int NGet_NoAtomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
792 RedisModule_AutoMemory(ctx);
795 InitStaticVariable();
797 RedisModuleBlockedClientArgs *bca = RedisModule_Alloc(sizeof(RedisModuleBlockedClientArgs));
799 RedisModule_ReplyWithError(ctx,"-ERR Out of memory");
800 return REDISMODULE_ERR;
803 ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
804 readNgetArgs(ctx, argv, argc, &bca->nget_args, &status);
805 if (status != EXSTRINGS_STATUS_NO_ERRORS) {
806 RedisModule_Free(bca);
807 return REDISMODULE_ERR;
810 /* Note that when blocking the client we do not set any callback: no
811 * timeout is possible since we passed '0', nor we need a reply callback
812 * because we'll use the thread safe context to accumulate a reply. */
813 RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);
817 /* Now that we setup a blocking client, we need to pass the control
818 * to the thread. However we need to pass arguments to the thread:
819 * the reference to the blocked client handle. */
820 if (pthread_create(&tid,NULL,NGet_NoAtomic_ThreadMain,bca) != 0) {
821 RedisModule_AbortBlock(bc);
822 RedisModule_Free(bca);
823 return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
826 return REDISMODULE_OK;
829 int NGet_Atomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
831 RedisModule_AutoMemory(ctx);
833 ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
835 InitStaticVariable();
837 readNgetArgs(ctx, argv, argc, &nget_args, &status);
838 if (status != EXSTRINGS_STATUS_NO_ERRORS) {
839 return REDISMODULE_ERR;
842 return Nget_RedisCommand(ctx, &nget_args, false);
845 int NDel_Atomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
847 RedisModule_AutoMemory(ctx);
848 int ret = REDISMODULE_OK;
849 long long replylen = 0;
850 RedisModuleCallReply *reply = NULL;
851 ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
852 ScanSomeState scan_state;
853 ScannedKeys *scanned_keys = NULL;
855 InitStaticVariable();
857 return RedisModule_WrongArity(ctx);
859 scan_state.key = argv[1];
860 scan_state.count = def_count_str;
861 scan_state.cursor = 0;
864 status = EXSTRINGS_STATUS_NOT_SET;
865 scanned_keys = scanSome(ctx, &scan_state, &status);
867 if (status != EXSTRINGS_STATUS_NO_ERRORS) {
868 ret = REDISMODULE_ERR;
870 } else if (scanned_keys == NULL) {
874 reply = RedisModule_Call(ctx, "UNLINK", "v!", scanned_keys->keys, scanned_keys->len);
876 status = EXSTRINGS_STATUS_NOT_SET;
877 forwardIfError(ctx, reply, &status);
878 if (status != EXSTRINGS_STATUS_NO_ERRORS) {
879 freeScannedKeys(ctx, scanned_keys);
880 ret = REDISMODULE_ERR;
884 replylen += RedisModule_CallReplyInteger(reply);
885 RedisModule_FreeCallReply(reply);
886 freeScannedKeys(ctx, scanned_keys);
887 } while (scan_state.cursor != 0);
889 if (ret == REDISMODULE_OK) {
890 RedisModule_ReplyWithLongLong(ctx, replylen);
896 /* This function must be present on each Redis module. It is used in order to
897 * register the commands into the Redis server. */
898 int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
899 REDISMODULE_NOT_USED(argv);
900 REDISMODULE_NOT_USED(argc);
902 if (RedisModule_Init(ctx,"exstrings",1,REDISMODULE_APIVER_1)
903 == REDISMODULE_ERR) return REDISMODULE_ERR;
905 if (RedisModule_CreateCommand(ctx,"setie",
906 SetIE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
907 return REDISMODULE_ERR;
909 if (RedisModule_CreateCommand(ctx,"setne",
910 SetNE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
911 return REDISMODULE_ERR;
913 if (RedisModule_CreateCommand(ctx,"delie",
914 DelIE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
915 return REDISMODULE_ERR;
917 if (RedisModule_CreateCommand(ctx,"delne",
918 DelNE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
919 return REDISMODULE_ERR;
921 if (RedisModule_CreateCommand(ctx,"nget.atomic",
922 NGet_Atomic_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
923 return REDISMODULE_ERR;
925 if (RedisModule_CreateCommand(ctx,"nget.noatomic",
926 NGet_NoAtomic_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
927 return REDISMODULE_ERR;
929 if (RedisModule_CreateCommand(ctx,"ndel.atomic",
930 NDel_Atomic_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
931 return REDISMODULE_ERR;
933 if (RedisModule_CreateCommand(ctx,"msetpub",
934 SetPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
935 return REDISMODULE_ERR;
937 if (RedisModule_CreateCommand(ctx,"msetmpub",
938 SetMPub_RedisCommand,"write deny-oom pubsub",1,1,1) == REDISMODULE_ERR)
939 return REDISMODULE_ERR;
941 if (RedisModule_CreateCommand(ctx,"setiepub",
942 SetIEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
943 return REDISMODULE_ERR;
945 if (RedisModule_CreateCommand(ctx,"setiempub",
946 SetIEMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
947 return REDISMODULE_ERR;
949 if (RedisModule_CreateCommand(ctx,"setnepub",
950 SetNEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
951 return REDISMODULE_ERR;
953 if (RedisModule_CreateCommand(ctx,"setxxpub",
954 SetXXPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
955 return REDISMODULE_ERR;
957 if (RedisModule_CreateCommand(ctx,"setnxpub",
958 SetNXPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
959 return REDISMODULE_ERR;
961 if (RedisModule_CreateCommand(ctx,"setnxmpub",
962 SetNXMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
963 return REDISMODULE_ERR;
965 if (RedisModule_CreateCommand(ctx,"delpub",
966 DelPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
967 return REDISMODULE_ERR;
969 if (RedisModule_CreateCommand(ctx,"delmpub",
970 DelMPub_RedisCommand,"write deny-oom pubsub",1,1,1) == REDISMODULE_ERR)
971 return REDISMODULE_ERR;
973 if (RedisModule_CreateCommand(ctx,"deliepub",
974 DelIEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
975 return REDISMODULE_ERR;
977 if (RedisModule_CreateCommand(ctx,"deliempub",
978 DelIEMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
979 return REDISMODULE_ERR;
981 if (RedisModule_CreateCommand(ctx,"delnepub",
982 DelNEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
983 return REDISMODULE_ERR;
985 return REDISMODULE_OK;