Free resources when 'nget.noatomic' thread terminates
[ric-plt/dbaas.git] / redismodule / src / exstrings.c
1 /*
2  * Copyright (c) 2018-2020 Nokia.
3  *
4  *   Licensed under the Apache License, Version 2.0 (the "License");
5  *   you may not use this file except in compliance with the License.
6  *   You may obtain a copy of the License at
7  *
8  *       http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *   Unless required by applicable law or agreed to in writing, software
11  *   distributed under the License is distributed on an "AS IS" BASIS,
12  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *   See the License for the specific language governing permissions and
14  *   limitations under the License.
15  */
16
17 /*
18  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19  * platform project (RICP).
20  */
21
22 #include "redismodule.h"
23 #include <pthread.h>
24 #include <stdbool.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <strings.h>
28
29 #ifdef __UT__
30 #include "exstringsStub.h"
31 #include "commonStub.h"
32 #endif
33
34
35 /* make sure the response is not NULL or an error.
36 sends the error to the client and exit the current function if its */
37 #define  ASSERT_NOERROR(r) \
38     if (r == NULL) { \
39         return RedisModule_ReplyWithError(ctx,"ERR reply is NULL"); \
40     } else if (RedisModule_CallReplyType(r) == REDISMODULE_REPLY_ERROR) { \
41         return RedisModule_ReplyWithCallReply(ctx,r); \
42     }
43
44 #define OBJ_OP_NO 0
45 #define OBJ_OP_XX (1<<1)     /* OP if key exist */
46 #define OBJ_OP_NX (1<<2)     /* OP if key not exist */
47 #define OBJ_OP_IE (1<<4)     /* OP if equal old value */
48 #define OBJ_OP_NE (1<<5)     /* OP if not equal old value */
49
50 #define DEF_COUNT     50
51 #define ZERO          0
52 #define MATCH_STR     "MATCH"
53 #define COUNT_STR     "COUNT"
54 #define SCANARGC      5
55
56 RedisModuleString *def_count_str = NULL, *match_str = NULL, *count_str = NULL, *zero_str = NULL;
57
58 typedef struct _NgetArgs {
59     RedisModuleString *key;
60     RedisModuleString *count;
61 } NgetArgs;
62
63 typedef struct RedisModuleBlockedClientArgs {
64     RedisModuleBlockedClient *bc;
65     NgetArgs nget_args;
66 } RedisModuleBlockedClientArgs;
67
68 void InitStaticVariable()
69 {
70     if (def_count_str == NULL)
71         def_count_str = RedisModule_CreateStringFromLongLong(NULL, DEF_COUNT);
72     if (match_str == NULL)
73         match_str = RedisModule_CreateString(NULL, MATCH_STR, sizeof(MATCH_STR));
74     if (count_str == NULL)
75         count_str = RedisModule_CreateString(NULL, COUNT_STR, sizeof(COUNT_STR));
76     if (zero_str == NULL)
77         zero_str = RedisModule_CreateStringFromLongLong(NULL, ZERO);
78
79     return;
80 }
81
82 int getKeyType(RedisModuleCtx *ctx, RedisModuleString *key_str)
83 {
84     RedisModuleKey *key = RedisModule_OpenKey(ctx, key_str, REDISMODULE_READ);
85     int type = RedisModule_KeyType(key);
86     RedisModule_CloseKey(key);
87     return type;
88 }
89
90 bool replyContentsEqualString(RedisModuleCallReply *reply, RedisModuleString *expected_value)
91 {
92     size_t replylen = 0, expectedlen = 0;
93     const char *expectedval = RedisModule_StringPtrLen(expected_value, &expectedlen);
94     const char *replyval = RedisModule_CallReplyStringPtr(reply, &replylen);
95     return replyval &&
96            expectedlen == replylen &&
97            !strncmp(expectedval, replyval, replylen);
98 }
99
100 typedef struct _SetParams {
101     RedisModuleString **key_val_pairs;
102     size_t length;
103 } SetParams;
104
105 typedef struct _PubParams {
106     RedisModuleString **channel_msg_pairs;
107     size_t length;
108 } PubParams;
109
110 typedef struct _DelParams {
111     RedisModuleString **keys;
112     size_t length;
113 } DelParams;
114
115 typedef enum _ExstringsStatus {
116     EXSTRINGS_STATUS_NO_ERRORS = 0,
117     EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT,
118     EXSTRINGS_STATUS_NOT_SET
119 } ExstringsStatus;
120
121 void readNgetArgs(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
122                   NgetArgs* nget_args, ExstringsStatus* status)
123 {
124     size_t str_len;
125     long long number;
126
127     if(argc == 2) {
128         nget_args->key = argv[1];
129         nget_args->count = def_count_str;
130     } else if (argc == 4) {
131         if (strcasecmp(RedisModule_StringPtrLen(argv[2], &str_len), "count")) {
132             RedisModule_ReplyWithError(ctx,"-ERR syntax error");
133             *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
134             return;
135         }
136
137         int ret = RedisModule_StringToLongLong(argv[3], &number) != REDISMODULE_OK;
138         if (ret != REDISMODULE_OK || number < 1) {
139             RedisModule_ReplyWithError(ctx,"-ERR value is not an integer or out of range");
140             *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
141             return;
142         }
143
144         nget_args->key = argv[1];
145         nget_args->count = argv[3];
146     } else {
147         /* In redis there is a bug (or undocumented feature see link)
148          * where calling 'RedisModule_WrongArity'
149          * within a blocked client will crash redis.
150          *
151          * Therefore we need to call this function to validate args
152          * before putting the client into blocking mode.
153          *
154          * Link to issue:
155          * https://github.com/antirez/redis/issues/6382
156          * 'If any thread tries to access the command arguments from
157          *  within the ThreadSafeContext they will crash redis' */
158         RedisModule_WrongArity(ctx);
159         *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
160         return;
161     }
162
163     *status = EXSTRINGS_STATUS_NO_ERRORS;
164     return;
165 }
166
167 long long callReplyLongLong(RedisModuleCallReply* reply)
168 {
169     const char* cursor_str_ptr = RedisModule_CallReplyStringPtr(reply, NULL);
170     return strtoll(cursor_str_ptr, NULL, 10);
171 }
172
173 void forwardIfError(RedisModuleCtx *ctx, RedisModuleCallReply *reply, ExstringsStatus* status)
174 {
175     if (RedisModule_CallReplyType(reply) == REDISMODULE_REPLY_ERROR) {
176         RedisModule_ReplyWithCallReply(ctx, reply);
177         RedisModule_FreeCallReply(reply);
178         *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
179     }
180     *status = EXSTRINGS_STATUS_NO_ERRORS;
181 }
182
183 typedef struct _ScannedKeys {
184     RedisModuleString **keys;
185     size_t len;
186 } ScannedKeys;
187
188 ScannedKeys* allocScannedKeys(size_t len)
189 {
190     ScannedKeys *sk = RedisModule_Alloc(sizeof(ScannedKeys));
191     if (sk) {
192         sk->len = len;
193         sk->keys = RedisModule_Alloc(sizeof(RedisModuleString *)*len);
194     }
195     return sk;
196 }
197
198 void freeScannedKeys(RedisModuleCtx *ctx, ScannedKeys* sk)
199 {
200     if (sk) {
201         size_t j;
202         for (j = 0; j < sk->len; j++)
203             RedisModule_FreeString(ctx, sk->keys[j]);
204         RedisModule_Free(sk->keys);
205     }
206     RedisModule_Free(sk);
207 }
208
209 typedef struct _ScanSomeState {
210     RedisModuleString *key;
211     RedisModuleString *count;
212     long long cursor;
213 } ScanSomeState;
214
215 ScannedKeys *scanSome(RedisModuleCtx* ctx, ScanSomeState* state, ExstringsStatus* status)
216 {
217     RedisModuleString *scanargv[SCANARGC] = {NULL};
218
219     scanargv[0] = RedisModule_CreateStringFromLongLong(ctx, state->cursor);
220     scanargv[1] = match_str;
221     scanargv[2] = state->key;
222     scanargv[3] = count_str;
223     scanargv[4] = state->count;
224
225     RedisModuleCallReply *reply;
226     reply = RedisModule_Call(ctx, "SCAN", "v", scanargv, SCANARGC);
227     RedisModule_FreeString(ctx, scanargv[0]);
228     forwardIfError(ctx, reply, status);
229     if (*status == EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT)
230         return NULL;
231
232     state->cursor = callReplyLongLong(RedisModule_CallReplyArrayElement(reply, 0));
233     RedisModuleCallReply *cr_keys =
234         RedisModule_CallReplyArrayElement(reply, 1);
235
236     size_t scanned_keys_len = RedisModule_CallReplyLength(cr_keys);
237     if (scanned_keys_len == 0) {
238         RedisModule_FreeCallReply(reply);
239         *status = EXSTRINGS_STATUS_NO_ERRORS;
240         return NULL;
241     }
242
243     ScannedKeys *scanned_keys = allocScannedKeys(scanned_keys_len);
244     if (scanned_keys == NULL) {
245         RedisModule_FreeCallReply(reply);
246         RedisModule_ReplyWithError(ctx,"-ERR Out of memory");
247         *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
248         return NULL;
249     }
250
251     scanned_keys->len = scanned_keys_len;
252     size_t j;
253     for (j = 0; j < scanned_keys_len; j++) {
254         RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(cr_keys,j));
255         scanned_keys->keys[j] = rms;
256     }
257     RedisModule_FreeCallReply(reply);
258     *status = EXSTRINGS_STATUS_NO_ERRORS;
259     return scanned_keys;
260 }
261
262 inline void unlockThreadsafeContext(RedisModuleCtx *ctx, bool using_threadsafe_context)
263 {
264     if (using_threadsafe_context)
265         RedisModule_ThreadSafeContextUnlock(ctx);
266 }
267
268 inline void lockThreadsafeContext(RedisModuleCtx *ctx, bool using_threadsafe_context)
269 {
270     if (using_threadsafe_context)
271         RedisModule_ThreadSafeContextLock(ctx);
272 }
273
274 void multiPubCommand(RedisModuleCtx *ctx, PubParams* pubParams)
275 {
276     RedisModuleCallReply *reply = NULL;
277     for (unsigned int i = 0 ; i < pubParams->length ; i += 2) {
278         reply = RedisModule_Call(ctx, "PUBLISH", "v", pubParams->channel_msg_pairs + i, 2);
279         RedisModule_FreeCallReply(reply);
280     }
281 }
282
283 int setStringGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
284                                        int argc, const int flag)
285 {
286     RedisModuleString *oldvalstr = NULL;
287     RedisModuleCallReply *reply = NULL;
288
289     if (argc < 4)
290         return RedisModule_WrongArity(ctx);
291     else
292         oldvalstr = argv[3];
293
294     /*Check if key type is string*/
295     RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
296         REDISMODULE_READ);
297     int type = RedisModule_KeyType(key);
298     RedisModule_CloseKey(key);
299
300     if (type == REDISMODULE_KEYTYPE_EMPTY) {
301         if (flag == OBJ_OP_IE){
302             RedisModule_ReplyWithNull(ctx);
303             return REDISMODULE_OK;
304         }
305     } else if (type != REDISMODULE_KEYTYPE_STRING) {
306         return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE);
307     }
308
309     /*Get the value*/
310     reply = RedisModule_Call(ctx, "GET", "s", argv[1]);
311     ASSERT_NOERROR(reply)
312     size_t curlen=0, oldvallen=0;
313     const char *oldval = RedisModule_StringPtrLen(oldvalstr, &oldvallen);
314     const char *curval = RedisModule_CallReplyStringPtr(reply, &curlen);
315     if (((flag == OBJ_OP_IE) &&
316         (!curval || (oldvallen != curlen) || strncmp(oldval, curval, curlen)))
317         ||
318         ((flag == OBJ_OP_NE) && curval && (oldvallen == curlen) &&
319           !strncmp(oldval, curval, curlen))) {
320         RedisModule_FreeCallReply(reply);
321         return RedisModule_ReplyWithNull(ctx);
322     }
323     RedisModule_FreeCallReply(reply);
324
325     /* Prepare the arguments for the command. */
326     int i, j=0, cmdargc=argc-2;
327     RedisModuleString *cmdargv[cmdargc];
328     for (i = 1; i < argc; i++) {
329         if (i == 3)
330             continue;
331         cmdargv[j++] = argv[i];
332     }
333
334     /* Call the command and pass back the reply. */
335     reply = RedisModule_Call(ctx, "SET", "v!", cmdargv, cmdargc);
336     ASSERT_NOERROR(reply)
337     RedisModule_ReplyWithCallReply(ctx, reply);
338
339     RedisModule_FreeCallReply(reply);
340     return REDISMODULE_OK;
341 }
342
343 int SetIE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
344 {
345     RedisModule_AutoMemory(ctx);
346     return setStringGenericCommand(ctx, argv, argc, OBJ_OP_IE);
347 }
348
349 int SetNE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
350 {
351     RedisModule_AutoMemory(ctx);
352     return setStringGenericCommand(ctx, argv, argc, OBJ_OP_NE);
353 }
354
355 int delStringGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
356                                        int argc, const int flag)
357 {
358     RedisModuleString *oldvalstr = NULL;
359     RedisModuleCallReply *reply = NULL;
360
361     if (argc == 3)
362         oldvalstr = argv[2];
363     else
364         return RedisModule_WrongArity(ctx);
365
366     /*Check if key type is string*/
367     RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
368         REDISMODULE_READ);
369     int type = RedisModule_KeyType(key);
370     RedisModule_CloseKey(key);
371
372     if (type == REDISMODULE_KEYTYPE_EMPTY) {
373         return RedisModule_ReplyWithLongLong(ctx, 0);
374     } else if (type != REDISMODULE_KEYTYPE_STRING) {
375         return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE);
376     }
377
378     /*Get the value*/
379     reply = RedisModule_Call(ctx, "GET", "s", argv[1]);
380     ASSERT_NOERROR(reply)
381     size_t curlen = 0, oldvallen = 0;
382     const char *oldval = RedisModule_StringPtrLen(oldvalstr, &oldvallen);
383     const char *curval = RedisModule_CallReplyStringPtr(reply, &curlen);
384     if (((flag == OBJ_OP_IE) &&
385         (!curval || (oldvallen != curlen) || strncmp(oldval, curval, curlen)))
386         ||
387         ((flag == OBJ_OP_NE) && curval && (oldvallen == curlen) &&
388           !strncmp(oldval, curval, curlen))) {
389         RedisModule_FreeCallReply(reply);
390         return RedisModule_ReplyWithLongLong(ctx, 0);
391     }
392     RedisModule_FreeCallReply(reply);
393
394     /* Prepare the arguments for the command. */
395     int cmdargc=1;
396     RedisModuleString *cmdargv[1];
397     cmdargv[0] = argv[1];
398
399     /* Call the command and pass back the reply. */
400     reply = RedisModule_Call(ctx, "UNLINK", "v!", cmdargv, cmdargc);
401     ASSERT_NOERROR(reply)
402     RedisModule_ReplyWithCallReply(ctx, reply);
403
404     RedisModule_FreeCallReply(reply);
405     return REDISMODULE_OK;
406 }
407
408 int DelIE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
409 {
410     RedisModule_AutoMemory(ctx);
411     return delStringGenericCommand(ctx, argv, argc, OBJ_OP_IE);
412 }
413
414 int DelNE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
415 {
416     RedisModule_AutoMemory(ctx);
417     return delStringGenericCommand(ctx, argv, argc, OBJ_OP_NE);
418 }
419 int setPubStringCommon(RedisModuleCtx *ctx, SetParams* setParamsPtr, PubParams* pubParamsPtr)
420 {
421     RedisModuleCallReply *setReply;
422     setReply = RedisModule_Call(ctx, "MSET", "v!", setParamsPtr->key_val_pairs, setParamsPtr->length);
423     ASSERT_NOERROR(setReply)
424     multiPubCommand(ctx, pubParamsPtr);
425     RedisModule_ReplyWithCallReply(ctx, setReply);
426     RedisModule_FreeCallReply(setReply);
427     return REDISMODULE_OK;
428 }
429
430 int SetPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
431 {
432     if (argc < 5 || (argc % 2) == 0)
433         return RedisModule_WrongArity(ctx);
434
435     RedisModule_AutoMemory(ctx);
436     SetParams setParams = {
437                            .key_val_pairs = argv + 1,
438                            .length = argc - 3
439                           };
440     PubParams pubParams = {
441                            .channel_msg_pairs = argv + 1 + setParams.length,
442                            .length = 2
443                           };
444
445     return setPubStringCommon(ctx, &setParams, &pubParams);
446 }
447
448 int SetMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
449 {
450     if (argc < 7 || (argc % 2) == 0)
451         return RedisModule_WrongArity(ctx);
452
453     RedisModule_AutoMemory(ctx);
454     long long setPairsCount, pubPairsCount;
455     RedisModule_StringToLongLong(argv[1], &setPairsCount);
456     RedisModule_StringToLongLong(argv[2], &pubPairsCount);
457     if (setPairsCount < 1 || pubPairsCount < 1)
458         return RedisModule_ReplyWithError(ctx, "ERR SET_PAIR_COUNT and PUB_PAIR_COUNT must be greater than zero");
459
460     long long setLen, pubLen;
461     setLen = 2*setPairsCount;
462     pubLen = 2*pubPairsCount;
463
464     if (setLen + pubLen + 3 != argc)
465         return RedisModule_ReplyWithError(ctx, "ERR SET_PAIR_COUNT or PUB_PAIR_COUNT do not match the total pair count");
466
467     SetParams setParams = {
468                            .key_val_pairs = argv + 3,
469                            .length = setLen
470                           };
471     PubParams pubParams = {
472                            .channel_msg_pairs = argv + 3 + setParams.length,
473                            .length = pubLen
474                           };
475
476     return setPubStringCommon(ctx, &setParams, &pubParams);
477 }
478
479 int setIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
480 {
481     SetParams setParams = {
482                            .key_val_pairs = argv + 1,
483                            .length = 2
484                           };
485     PubParams pubParams = {
486                            .channel_msg_pairs = argv + 4,
487                            .length = argc - 4
488                           };
489     RedisModuleString *key = setParams.key_val_pairs[0];
490     RedisModuleString *oldvalstr = argv[3];
491
492     int type = getKeyType(ctx, key);
493     if (flag == OBJ_OP_IE && type == REDISMODULE_KEYTYPE_EMPTY) {
494         return RedisModule_ReplyWithNull(ctx);
495     } else if (type != REDISMODULE_KEYTYPE_STRING && type != REDISMODULE_KEYTYPE_EMPTY) {
496         return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
497     }
498
499     RedisModuleCallReply *reply = RedisModule_Call(ctx, "GET", "s", key);
500     ASSERT_NOERROR(reply)
501     bool is_equal = replyContentsEqualString(reply, oldvalstr);
502     RedisModule_FreeCallReply(reply);
503     if ((flag == OBJ_OP_IE && !is_equal) ||
504         (flag == OBJ_OP_NE && is_equal)) {
505         return RedisModule_ReplyWithNull(ctx);
506     }
507
508     return setPubStringCommon(ctx, &setParams, &pubParams);
509 }
510
511 int SetIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
512 {
513     if (argc != 6)
514         return RedisModule_WrongArity(ctx);
515
516     RedisModule_AutoMemory(ctx);
517     return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
518 }
519
520 int SetIEMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
521 {
522     if (argc < 6 || (argc % 2) != 0)
523         return RedisModule_WrongArity(ctx);
524
525     RedisModule_AutoMemory(ctx);
526     return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
527 }
528
529 int SetNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
530 {
531     if (argc != 6)
532         return RedisModule_WrongArity(ctx);
533
534     RedisModule_AutoMemory(ctx);
535     return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE);
536 }
537
538 int setXXNXPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
539 {
540     SetParams setParams = {
541                            .key_val_pairs = argv + 1,
542                            .length = 2
543                           };
544     PubParams pubParams = {
545                            .channel_msg_pairs = argv + 3,
546                            .length = argc - 3
547                           };
548     RedisModuleString *key = setParams.key_val_pairs[0];
549
550     int type = getKeyType(ctx, key);
551     if ((flag == OBJ_OP_XX && type == REDISMODULE_KEYTYPE_EMPTY) ||
552         (flag == OBJ_OP_NX && type == REDISMODULE_KEYTYPE_STRING)) {
553         return RedisModule_ReplyWithNull(ctx);
554     } else if (type != REDISMODULE_KEYTYPE_STRING && type != REDISMODULE_KEYTYPE_EMPTY) {
555         RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
556         return REDISMODULE_OK;
557     }
558
559     return setPubStringCommon(ctx, &setParams, &pubParams);
560 }
561
562 int SetNXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
563 {
564     if (argc != 5)
565         return RedisModule_WrongArity(ctx);
566
567     RedisModule_AutoMemory(ctx);
568     return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_NX);
569 }
570
571 int SetNXMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
572 {
573     if (argc < 5 || (argc % 2) == 0)
574         return RedisModule_WrongArity(ctx);
575
576     RedisModule_AutoMemory(ctx);
577     return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_NX);
578 }
579
580 int SetXXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
581 {
582     if (argc != 5)
583         return RedisModule_WrongArity(ctx);
584
585     RedisModule_AutoMemory(ctx);
586     return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_XX);
587 }
588
589 int delPubStringCommon(RedisModuleCtx *ctx, DelParams *delParamsPtr, PubParams *pubParamsPtr)
590 {
591     RedisModuleCallReply *reply = RedisModule_Call(ctx, "UNLINK", "v!", delParamsPtr->keys, delParamsPtr->length);
592     ASSERT_NOERROR(reply)
593     int replytype = RedisModule_CallReplyType(reply);
594     if (replytype == REDISMODULE_REPLY_NULL) {
595         RedisModule_ReplyWithNull(ctx);
596     } else if (RedisModule_CallReplyInteger(reply) == 0) {
597         RedisModule_ReplyWithCallReply(ctx, reply);
598     } else {
599         RedisModule_ReplyWithCallReply(ctx, reply);
600         multiPubCommand(ctx, pubParamsPtr);
601     }
602     RedisModule_FreeCallReply(reply);
603     return REDISMODULE_OK;
604 }
605
606 int DelPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
607 {
608     if (argc < 4)
609         return RedisModule_WrongArity(ctx);
610
611     RedisModule_AutoMemory(ctx);
612     DelParams delParams = {
613                            .keys = argv + 1,
614                            .length = argc - 3
615                           };
616     PubParams pubParams = {
617                            .channel_msg_pairs = argv + 1 + delParams.length,
618                            .length = 2
619                           };
620
621     return delPubStringCommon(ctx, &delParams, &pubParams);
622 }
623
624 int DelMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
625 {
626     if (argc < 6)
627         return RedisModule_WrongArity(ctx);
628
629     RedisModule_AutoMemory(ctx);
630     long long delCount, pubPairsCount;
631     RedisModule_StringToLongLong(argv[1], &delCount);
632     RedisModule_StringToLongLong(argv[2], &pubPairsCount);
633     if (delCount < 1 || pubPairsCount < 1)
634         return RedisModule_ReplyWithError(ctx, "ERR DEL_COUNT and PUB_PAIR_COUNT must be greater than zero");
635
636     long long delLen, pubLen;
637     delLen = delCount;
638     pubLen = 2*pubPairsCount;
639     if (delLen + pubLen + 3 != argc)
640         return RedisModule_ReplyWithError(ctx, "ERR DEL_COUNT or PUB_PAIR_COUNT do not match the total pair count");
641
642     DelParams delParams = {
643                            .keys = argv + 3,
644                            .length = delLen
645                           };
646     PubParams pubParams = {
647                            .channel_msg_pairs = argv + 3 + delParams.length,
648                            .length = pubLen
649                           };
650
651     return delPubStringCommon(ctx, &delParams, &pubParams);
652 }
653
654 int delIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
655 {
656     DelParams delParams = {
657                            .keys = argv + 1,
658                            .length = 1
659                           };
660     PubParams pubParams = {
661                            .channel_msg_pairs = argv + 3,
662                            .length = argc - 3
663                           };
664     RedisModuleString *key = argv[1];
665     RedisModuleString *oldvalstr = argv[2];
666
667     int type = getKeyType(ctx, key);
668     if (type == REDISMODULE_KEYTYPE_EMPTY) {
669         return RedisModule_ReplyWithLongLong(ctx, 0);
670     } else if (type != REDISMODULE_KEYTYPE_STRING) {
671         return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
672     }
673
674     RedisModuleCallReply *reply = RedisModule_Call(ctx, "GET", "s", key);
675     ASSERT_NOERROR(reply)
676     bool is_equal = replyContentsEqualString(reply, oldvalstr);
677     RedisModule_FreeCallReply(reply);
678     if ((flag == OBJ_OP_IE && !is_equal) ||
679         (flag == OBJ_OP_NE && is_equal)) {
680         return RedisModule_ReplyWithLongLong(ctx, 0);
681     }
682
683     return delPubStringCommon(ctx, &delParams, &pubParams);
684 }
685
686 int DelIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
687 {
688     if (argc != 5)
689         return RedisModule_WrongArity(ctx);
690
691     RedisModule_AutoMemory(ctx);
692     return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
693 }
694
695 int DelIEMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
696 {
697     if (argc < 5 || (argc % 2) == 0)
698         return RedisModule_WrongArity(ctx);
699
700     RedisModule_AutoMemory(ctx);
701     return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
702 }
703
704 int DelNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
705 {
706     if (argc != 5)
707         return RedisModule_WrongArity(ctx);
708
709     RedisModule_AutoMemory(ctx);
710     return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE);
711 }
712
713 int Nget_RedisCommand(RedisModuleCtx *ctx, NgetArgs* nget_args, bool using_threadsafe_context)
714 {
715     int ret = REDISMODULE_OK;
716     size_t replylen = 0;
717     RedisModuleCallReply *reply = NULL;
718     ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
719     ScanSomeState scan_state;
720     ScannedKeys *scanned_keys;
721
722     scan_state.key = nget_args->key;
723     scan_state.count = nget_args->count;
724     scan_state.cursor = 0;
725
726     RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
727     do {
728         lockThreadsafeContext(ctx, using_threadsafe_context);
729
730         status = EXSTRINGS_STATUS_NOT_SET;
731         scanned_keys = scanSome(ctx, &scan_state, &status);
732
733         if (status != EXSTRINGS_STATUS_NO_ERRORS) {
734             unlockThreadsafeContext(ctx, using_threadsafe_context);
735             ret = REDISMODULE_ERR;
736             break;
737         } else if (scanned_keys == NULL) {
738             unlockThreadsafeContext(ctx, using_threadsafe_context);
739             continue;
740         }
741
742         reply = RedisModule_Call(ctx, "MGET", "v", scanned_keys->keys, scanned_keys->len);
743
744         unlockThreadsafeContext(ctx, using_threadsafe_context);
745
746         status = EXSTRINGS_STATUS_NOT_SET;
747         forwardIfError(ctx, reply, &status);
748         if (status != EXSTRINGS_STATUS_NO_ERRORS) {
749             freeScannedKeys(ctx, scanned_keys);
750             ret = REDISMODULE_ERR;
751             break;
752         }
753
754         size_t i;
755         for (i = 0; i < scanned_keys->len; i++) {
756             RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(reply, i));
757             if (rms) {
758                 RedisModule_ReplyWithString(ctx, scanned_keys->keys[i]);
759                 RedisModule_ReplyWithString(ctx, rms);
760                 RedisModule_FreeString(ctx, rms);
761                 replylen += 2;
762             }
763         }
764         RedisModule_FreeCallReply(reply);
765         freeScannedKeys(ctx, scanned_keys);
766     } while (scan_state.cursor != 0);
767
768     RedisModule_ReplySetArrayLength(ctx,replylen);
769     return ret;
770 }
771
772 /* The thread entry point that actually executes the blocking part
773  * of the command nget.noatomic
774  */
775 void *NGet_NoAtomic_ThreadMain(void *arg)
776 {
777     pthread_detach(pthread_self());
778
779     RedisModuleBlockedClientArgs *bca = arg;
780     RedisModuleBlockedClient *bc = bca->bc;
781     RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
782
783     Nget_RedisCommand(ctx, &bca->nget_args, true);
784     RedisModule_FreeThreadSafeContext(ctx);
785     RedisModule_UnblockClient(bc, NULL);
786     RedisModule_Free(bca);
787     return NULL;
788 }
789
790 int NGet_NoAtomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
791 {
792     RedisModule_AutoMemory(ctx);
793     pthread_t tid;
794
795     InitStaticVariable();
796
797     RedisModuleBlockedClientArgs *bca = RedisModule_Alloc(sizeof(RedisModuleBlockedClientArgs));
798     if (bca == NULL) {
799         RedisModule_ReplyWithError(ctx,"-ERR Out of memory");
800         return REDISMODULE_ERR;
801     }
802
803     ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
804     readNgetArgs(ctx, argv, argc, &bca->nget_args, &status);
805     if (status != EXSTRINGS_STATUS_NO_ERRORS) {
806         RedisModule_Free(bca);
807         return REDISMODULE_ERR;
808     }
809
810     /* Note that when blocking the client we do not set any callback: no
811      * timeout is possible since we passed '0', nor we need a reply callback
812      * because we'll use the thread safe context to accumulate a reply. */
813     RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);
814
815     bca->bc = bc;
816
817     /* Now that we setup a blocking client, we need to pass the control
818      * to the thread. However we need to pass arguments to the thread:
819      * the reference to the blocked client handle. */
820     if (pthread_create(&tid,NULL,NGet_NoAtomic_ThreadMain,bca) != 0) {
821         RedisModule_AbortBlock(bc);
822         RedisModule_Free(bca);
823         return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
824     }
825
826     return REDISMODULE_OK;
827 }
828
829 int NGet_Atomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
830 {
831     RedisModule_AutoMemory(ctx);
832     NgetArgs nget_args;
833     ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
834
835     InitStaticVariable();
836
837     readNgetArgs(ctx, argv, argc, &nget_args, &status);
838     if (status != EXSTRINGS_STATUS_NO_ERRORS) {
839         return REDISMODULE_ERR;
840     }
841
842     return Nget_RedisCommand(ctx, &nget_args, false);
843 }
844
845 int NDel_Atomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
846 {
847     RedisModule_AutoMemory(ctx);
848     int ret = REDISMODULE_OK;
849     long long replylen = 0;
850     RedisModuleCallReply *reply = NULL;
851     ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
852     ScanSomeState scan_state;
853     ScannedKeys *scanned_keys = NULL;
854
855     InitStaticVariable();
856     if (argc != 2)
857         return RedisModule_WrongArity(ctx);
858
859     scan_state.key = argv[1];
860     scan_state.count = def_count_str;
861     scan_state.cursor = 0;
862
863     do {
864         status = EXSTRINGS_STATUS_NOT_SET;
865         scanned_keys = scanSome(ctx, &scan_state, &status);
866
867         if (status != EXSTRINGS_STATUS_NO_ERRORS) {
868             ret = REDISMODULE_ERR;
869             break;
870         } else if (scanned_keys == NULL) {
871             continue;
872         }
873
874         reply = RedisModule_Call(ctx, "UNLINK", "v!", scanned_keys->keys, scanned_keys->len);
875
876         status = EXSTRINGS_STATUS_NOT_SET;
877         forwardIfError(ctx, reply, &status);
878         if (status != EXSTRINGS_STATUS_NO_ERRORS) {
879             freeScannedKeys(ctx, scanned_keys);
880             ret = REDISMODULE_ERR;
881             break;
882         }
883
884         replylen += RedisModule_CallReplyInteger(reply);
885         RedisModule_FreeCallReply(reply);
886         freeScannedKeys(ctx, scanned_keys);
887     } while (scan_state.cursor != 0);
888
889     if (ret == REDISMODULE_OK) {
890         RedisModule_ReplyWithLongLong(ctx, replylen);
891     }
892
893     return ret;
894 }
895
896 /* This function must be present on each Redis module. It is used in order to
897  * register the commands into the Redis server. */
898 int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
899     REDISMODULE_NOT_USED(argv);
900     REDISMODULE_NOT_USED(argc);
901
902     if (RedisModule_Init(ctx,"exstrings",1,REDISMODULE_APIVER_1)
903         == REDISMODULE_ERR) return REDISMODULE_ERR;
904
905     if (RedisModule_CreateCommand(ctx,"setie",
906         SetIE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
907         return REDISMODULE_ERR;
908
909     if (RedisModule_CreateCommand(ctx,"setne",
910         SetNE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
911         return REDISMODULE_ERR;
912
913     if (RedisModule_CreateCommand(ctx,"delie",
914         DelIE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
915         return REDISMODULE_ERR;
916
917     if (RedisModule_CreateCommand(ctx,"delne",
918         DelNE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
919         return REDISMODULE_ERR;
920
921     if (RedisModule_CreateCommand(ctx,"nget.atomic",
922         NGet_Atomic_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
923         return REDISMODULE_ERR;
924
925     if (RedisModule_CreateCommand(ctx,"nget.noatomic",
926         NGet_NoAtomic_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
927         return REDISMODULE_ERR;
928
929     if (RedisModule_CreateCommand(ctx,"ndel.atomic",
930         NDel_Atomic_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
931         return REDISMODULE_ERR;
932
933     if (RedisModule_CreateCommand(ctx,"msetpub",
934         SetPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
935         return REDISMODULE_ERR;
936
937     if (RedisModule_CreateCommand(ctx,"msetmpub",
938         SetMPub_RedisCommand,"write deny-oom pubsub",1,1,1) == REDISMODULE_ERR)
939         return REDISMODULE_ERR;
940
941     if (RedisModule_CreateCommand(ctx,"setiepub",
942         SetIEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
943         return REDISMODULE_ERR;
944
945     if (RedisModule_CreateCommand(ctx,"setiempub",
946         SetIEMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
947         return REDISMODULE_ERR;
948
949     if (RedisModule_CreateCommand(ctx,"setnepub",
950         SetNEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
951         return REDISMODULE_ERR;
952
953     if (RedisModule_CreateCommand(ctx,"setxxpub",
954         SetXXPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
955         return REDISMODULE_ERR;
956
957     if (RedisModule_CreateCommand(ctx,"setnxpub",
958         SetNXPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
959         return REDISMODULE_ERR;
960
961     if (RedisModule_CreateCommand(ctx,"setnxmpub",
962         SetNXMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
963         return REDISMODULE_ERR;
964
965     if (RedisModule_CreateCommand(ctx,"delpub",
966         DelPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
967         return REDISMODULE_ERR;
968
969     if (RedisModule_CreateCommand(ctx,"delmpub",
970         DelMPub_RedisCommand,"write deny-oom pubsub",1,1,1) == REDISMODULE_ERR)
971         return REDISMODULE_ERR;
972
973     if (RedisModule_CreateCommand(ctx,"deliepub",
974         DelIEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
975         return REDISMODULE_ERR;
976
977     if (RedisModule_CreateCommand(ctx,"deliempub",
978         DelIEMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
979         return REDISMODULE_ERR;
980
981     if (RedisModule_CreateCommand(ctx,"delnepub",
982         DelNEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
983         return REDISMODULE_ERR;
984
985     return REDISMODULE_OK;
986 }