4211f56392677cb87a143101b40cd5a0f5bcf9c2
[ric-plt/dbaas.git] / redismodule / src / exstrings.c
1 /*
2  * Copyright (c) 2018-2020 Nokia.
3  *
4  *   Licensed under the Apache License, Version 2.0 (the "License");
5  *   you may not use this file except in compliance with the License.
6  *   You may obtain a copy of the License at
7  *
8  *       http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *   Unless required by applicable law or agreed to in writing, software
11  *   distributed under the License is distributed on an "AS IS" BASIS,
12  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *   See the License for the specific language governing permissions and
14  *   limitations under the License.
15  */
16
17 /*
18  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19  * platform project (RICP).
20  */
21
22 #include "redismodule.h"
23 #include <pthread.h>
24 #include <stdbool.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <strings.h>
28
29 #ifdef __UT__
30 #include "exstringsStub.h"
31 #include "commonStub.h"
32 #endif
33
34
35 /* make sure the response is not NULL or an error.
36 sends the error to the client and exit the current function if its */
37 #define  ASSERT_NOERROR(r) \
38     if (r == NULL) { \
39         return RedisModule_ReplyWithError(ctx,"ERR reply is NULL"); \
40     } else if (RedisModule_CallReplyType(r) == REDISMODULE_REPLY_ERROR) { \
41         return RedisModule_ReplyWithCallReply(ctx,r); \
42     }
43
44 #define OBJ_OP_NO 0
45 #define OBJ_OP_XX (1<<1)     /* OP if key exist */
46 #define OBJ_OP_NX (1<<2)     /* OP if key not exist */
47 #define OBJ_OP_IE (1<<4)     /* OP if equal old value */
48 #define OBJ_OP_NE (1<<5)     /* OP if not equal old value */
49
50 #define DEF_COUNT     50
51 #define ZERO          0
52 #define MATCH_STR     "MATCH"
53 #define COUNT_STR     "COUNT"
54 #define SCANARGC      5
55
56 RedisModuleString *def_count_str = NULL, *match_str = NULL, *count_str = NULL, *zero_str = NULL;
57
58 typedef struct _NgetArgs {
59     RedisModuleString *key;
60     RedisModuleString *count;
61 } NgetArgs;
62
63 typedef struct RedisModuleBlockedClientArgs {
64     RedisModuleBlockedClient *bc;
65     NgetArgs nget_args;
66 } RedisModuleBlockedClientArgs;
67
68 void InitStaticVariable()
69 {
70     if (def_count_str == NULL)
71         def_count_str = RedisModule_CreateStringFromLongLong(NULL, DEF_COUNT);
72     if (match_str == NULL)
73         match_str = RedisModule_CreateString(NULL, MATCH_STR, sizeof(MATCH_STR));
74     if (count_str == NULL)
75         count_str = RedisModule_CreateString(NULL, COUNT_STR, sizeof(COUNT_STR));
76     if (zero_str == NULL)
77         zero_str = RedisModule_CreateStringFromLongLong(NULL, ZERO);
78
79     return;
80 }
81
82 int getKeyType(RedisModuleCtx *ctx, RedisModuleString *key_str)
83 {
84     RedisModuleKey *key = RedisModule_OpenKey(ctx, key_str, REDISMODULE_READ);
85     int type = RedisModule_KeyType(key);
86     RedisModule_CloseKey(key);
87     return type;
88 }
89
90 bool replyContentsEqualString(RedisModuleCallReply *reply, RedisModuleString *expected_value)
91 {
92     size_t replylen = 0, expectedlen = 0;
93     const char *expectedval = RedisModule_StringPtrLen(expected_value, &expectedlen);
94     const char *replyval = RedisModule_CallReplyStringPtr(reply, &replylen);
95     return replyval &&
96            expectedlen == replylen &&
97            !strncmp(expectedval, replyval, replylen);
98 }
99
100 typedef struct _SetParams {
101     RedisModuleString **key_val_pairs;
102     size_t length;
103 } SetParams;
104
105 typedef struct _PubParams {
106     RedisModuleString **channel_msg_pairs;
107     size_t length;
108 } PubParams;
109
110 typedef struct _DelParams {
111     RedisModuleString **keys;
112     size_t length;
113 } DelParams;
114
115 typedef enum _ExstringsStatus {
116     EXSTRINGS_STATUS_NO_ERRORS = 0,
117     EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT,
118     EXSTRINGS_STATUS_NOT_SET
119 } ExstringsStatus;
120
121 void readNgetArgs(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
122                   NgetArgs* nget_args, ExstringsStatus* status)
123 {
124     size_t str_len;
125     long long number;
126
127     if(argc == 2) {
128         nget_args->key = argv[1];
129         nget_args->count = def_count_str;
130     } else if (argc == 4) {
131         if (strcasecmp(RedisModule_StringPtrLen(argv[2], &str_len), "count")) {
132             RedisModule_ReplyWithError(ctx,"-ERR syntax error");
133             *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
134             return;
135         }
136
137         int ret = RedisModule_StringToLongLong(argv[3], &number) != REDISMODULE_OK;
138         if (ret != REDISMODULE_OK || number < 1) {
139             RedisModule_ReplyWithError(ctx,"-ERR value is not an integer or out of range");
140             *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
141             return;
142         }
143
144         nget_args->key = argv[1];
145         nget_args->count = argv[3];
146     } else {
147         /* In redis there is a bug (or undocumented feature see link)
148          * where calling 'RedisModule_WrongArity'
149          * within a blocked client will crash redis.
150          *
151          * Therefore we need to call this function to validate args
152          * before putting the client into blocking mode.
153          *
154          * Link to issue:
155          * https://github.com/antirez/redis/issues/6382
156          * 'If any thread tries to access the command arguments from
157          *  within the ThreadSafeContext they will crash redis' */
158         RedisModule_WrongArity(ctx);
159         *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
160         return;
161     }
162
163     *status = EXSTRINGS_STATUS_NO_ERRORS;
164     return;
165 }
166
167 long long callReplyLongLong(RedisModuleCallReply* reply)
168 {
169     const char* cursor_str_ptr = RedisModule_CallReplyStringPtr(reply, NULL);
170     return strtoll(cursor_str_ptr, NULL, 10);
171 }
172
173 void forwardIfError(RedisModuleCtx *ctx, RedisModuleCallReply *reply, ExstringsStatus* status)
174 {
175     if (RedisModule_CallReplyType(reply) == REDISMODULE_REPLY_ERROR) {
176         RedisModule_ReplyWithCallReply(ctx, reply);
177         RedisModule_FreeCallReply(reply);
178         *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
179     }
180     *status = EXSTRINGS_STATUS_NO_ERRORS;
181 }
182
183 typedef struct _ScannedKeys {
184     RedisModuleString **keys;
185     size_t len;
186 } ScannedKeys;
187
188 ScannedKeys* allocScannedKeys(size_t len)
189 {
190     ScannedKeys *sk = RedisModule_Alloc(sizeof(ScannedKeys));
191     if (sk) {
192         sk->len = len;
193         sk->keys = RedisModule_Alloc(sizeof(RedisModuleString *)*len);
194     }
195     return sk;
196 }
197
198 void freeScannedKeys(RedisModuleCtx *ctx, ScannedKeys* sk)
199 {
200     if (sk) {
201         size_t j;
202         for (j = 0; j < sk->len; j++)
203             RedisModule_FreeString(ctx, sk->keys[j]);
204         RedisModule_Free(sk->keys);
205     }
206     RedisModule_Free(sk);
207 }
208
209 typedef struct _ScanSomeState {
210     RedisModuleString *key;
211     RedisModuleString *count;
212     long long cursor;
213 } ScanSomeState;
214
215 ScannedKeys *scanSome(RedisModuleCtx* ctx, ScanSomeState* state, ExstringsStatus* status)
216 {
217     RedisModuleString *scanargv[SCANARGC] = {NULL};
218
219     scanargv[0] = RedisModule_CreateStringFromLongLong(ctx, state->cursor);
220     scanargv[1] = match_str;
221     scanargv[2] = state->key;
222     scanargv[3] = count_str;
223     scanargv[4] = state->count;
224
225     RedisModuleCallReply *reply;
226     reply = RedisModule_Call(ctx, "SCAN", "v", scanargv, SCANARGC);
227     RedisModule_FreeString(ctx, scanargv[0]);
228     forwardIfError(ctx, reply, status);
229     if (*status == EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT)
230         return NULL;
231
232     state->cursor = callReplyLongLong(RedisModule_CallReplyArrayElement(reply, 0));
233     RedisModuleCallReply *cr_keys =
234         RedisModule_CallReplyArrayElement(reply, 1);
235
236     size_t scanned_keys_len = RedisModule_CallReplyLength(cr_keys);
237     if (scanned_keys_len == 0) {
238         RedisModule_FreeCallReply(reply);
239         *status = EXSTRINGS_STATUS_NO_ERRORS;
240         return NULL;
241     }
242
243     ScannedKeys *scanned_keys = allocScannedKeys(scanned_keys_len);
244     if (scanned_keys == NULL) {
245         RedisModule_FreeCallReply(reply);
246         RedisModule_ReplyWithError(ctx,"-ERR Out of memory");
247         *status = EXSTRINGS_STATUS_ERROR_AND_REPLY_SENT;
248         return NULL;
249     }
250
251     scanned_keys->len = scanned_keys_len;
252     size_t j;
253     for (j = 0; j < scanned_keys_len; j++) {
254         RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(cr_keys,j));
255         scanned_keys->keys[j] = rms;
256     }
257     RedisModule_FreeCallReply(reply);
258     *status = EXSTRINGS_STATUS_NO_ERRORS;
259     return scanned_keys;
260 }
261
262 inline void unlockThreadsafeContext(RedisModuleCtx *ctx, bool using_threadsafe_context)
263 {
264     if (using_threadsafe_context)
265         RedisModule_ThreadSafeContextUnlock(ctx);
266 }
267
268 inline void lockThreadsafeContext(RedisModuleCtx *ctx, bool using_threadsafe_context)
269 {
270     if (using_threadsafe_context)
271         RedisModule_ThreadSafeContextLock(ctx);
272 }
273
274 void multiPubCommand(RedisModuleCtx *ctx, PubParams* pubParams)
275 {
276     RedisModuleCallReply *reply = NULL;
277     for (unsigned int i = 0 ; i < pubParams->length ; i += 2) {
278         reply = RedisModule_Call(ctx, "PUBLISH", "v", pubParams->channel_msg_pairs + i, 2);
279         RedisModule_FreeCallReply(reply);
280     }
281 }
282
283 int setStringGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
284                                        int argc, const int flag)
285 {
286     RedisModuleString *oldvalstr = NULL;
287     RedisModuleCallReply *reply = NULL;
288
289     if (argc < 4)
290         return RedisModule_WrongArity(ctx);
291     else
292         oldvalstr = argv[3];
293
294     /*Check if key type is string*/
295     RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
296         REDISMODULE_READ);
297     int type = RedisModule_KeyType(key);
298     RedisModule_CloseKey(key);
299
300     if (type == REDISMODULE_KEYTYPE_EMPTY) {
301         if (flag == OBJ_OP_IE){
302             RedisModule_ReplyWithNull(ctx);
303             return REDISMODULE_OK;
304         }
305     } else if (type != REDISMODULE_KEYTYPE_STRING) {
306         return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE);
307     }
308
309     /*Get the value*/
310     reply = RedisModule_Call(ctx, "GET", "s", argv[1]);
311     ASSERT_NOERROR(reply)
312     size_t curlen=0, oldvallen=0;
313     const char *oldval = RedisModule_StringPtrLen(oldvalstr, &oldvallen);
314     const char *curval = RedisModule_CallReplyStringPtr(reply, &curlen);
315     if (((flag == OBJ_OP_IE) &&
316         (!curval || (oldvallen != curlen) || strncmp(oldval, curval, curlen)))
317         ||
318         ((flag == OBJ_OP_NE) && curval && (oldvallen == curlen) &&
319           !strncmp(oldval, curval, curlen))) {
320         RedisModule_FreeCallReply(reply);
321         return RedisModule_ReplyWithNull(ctx);
322     }
323     RedisModule_FreeCallReply(reply);
324
325     /* Prepare the arguments for the command. */
326     int i, j=0, cmdargc=argc-2;
327     RedisModuleString *cmdargv[cmdargc];
328     for (i = 1; i < argc; i++) {
329         if (i == 3)
330             continue;
331         cmdargv[j++] = argv[i];
332     }
333
334     /* Call the command and pass back the reply. */
335     reply = RedisModule_Call(ctx, "SET", "v!", cmdargv, cmdargc);
336     ASSERT_NOERROR(reply)
337     RedisModule_ReplyWithCallReply(ctx, reply);
338
339     RedisModule_FreeCallReply(reply);
340     return REDISMODULE_OK;
341 }
342
343 int SetIE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
344 {
345     RedisModule_AutoMemory(ctx);
346     return setStringGenericCommand(ctx, argv, argc, OBJ_OP_IE);
347 }
348
349 int SetNE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
350 {
351     RedisModule_AutoMemory(ctx);
352     return setStringGenericCommand(ctx, argv, argc, OBJ_OP_NE);
353 }
354
355 int delStringGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
356                                        int argc, const int flag)
357 {
358     RedisModuleString *oldvalstr = NULL;
359     RedisModuleCallReply *reply = NULL;
360
361     if (argc == 3)
362         oldvalstr = argv[2];
363     else
364         return RedisModule_WrongArity(ctx);
365
366     /*Check if key type is string*/
367     RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
368         REDISMODULE_READ);
369     int type = RedisModule_KeyType(key);
370     RedisModule_CloseKey(key);
371
372     if (type == REDISMODULE_KEYTYPE_EMPTY) {
373         return RedisModule_ReplyWithLongLong(ctx, 0);
374     } else if (type != REDISMODULE_KEYTYPE_STRING) {
375         return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE);
376     }
377
378     /*Get the value*/
379     reply = RedisModule_Call(ctx, "GET", "s", argv[1]);
380     ASSERT_NOERROR(reply)
381     size_t curlen = 0, oldvallen = 0;
382     const char *oldval = RedisModule_StringPtrLen(oldvalstr, &oldvallen);
383     const char *curval = RedisModule_CallReplyStringPtr(reply, &curlen);
384     if (((flag == OBJ_OP_IE) &&
385         (!curval || (oldvallen != curlen) || strncmp(oldval, curval, curlen)))
386         ||
387         ((flag == OBJ_OP_NE) && curval && (oldvallen == curlen) &&
388           !strncmp(oldval, curval, curlen))) {
389         RedisModule_FreeCallReply(reply);
390         return RedisModule_ReplyWithLongLong(ctx, 0);
391     }
392     RedisModule_FreeCallReply(reply);
393
394     /* Prepare the arguments for the command. */
395     int cmdargc=1;
396     RedisModuleString *cmdargv[1];
397     cmdargv[0] = argv[1];
398
399     /* Call the command and pass back the reply. */
400     reply = RedisModule_Call(ctx, "UNLINK", "v!", cmdargv, cmdargc);
401     ASSERT_NOERROR(reply)
402     RedisModule_ReplyWithCallReply(ctx, reply);
403
404     RedisModule_FreeCallReply(reply);
405     return REDISMODULE_OK;
406 }
407
408 int DelIE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
409 {
410     RedisModule_AutoMemory(ctx);
411     return delStringGenericCommand(ctx, argv, argc, OBJ_OP_IE);
412 }
413
414 int DelNE_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
415 {
416     RedisModule_AutoMemory(ctx);
417     return delStringGenericCommand(ctx, argv, argc, OBJ_OP_NE);
418 }
419 int setPubStringCommon(RedisModuleCtx *ctx, SetParams* setParamsPtr, PubParams* pubParamsPtr)
420 {
421     RedisModuleCallReply *setReply;
422     setReply = RedisModule_Call(ctx, "MSET", "v!", setParamsPtr->key_val_pairs, setParamsPtr->length);
423     ASSERT_NOERROR(setReply)
424     multiPubCommand(ctx, pubParamsPtr);
425     RedisModule_ReplyWithCallReply(ctx, setReply);
426     RedisModule_FreeCallReply(setReply);
427     return REDISMODULE_OK;
428 }
429
430 int SetPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
431 {
432     if (argc < 5 || (argc % 2) == 0)
433         return RedisModule_WrongArity(ctx);
434
435     RedisModule_AutoMemory(ctx);
436     SetParams setParams = {
437                            .key_val_pairs = argv + 1,
438                            .length = argc - 3
439                           };
440     PubParams pubParams = {
441                            .channel_msg_pairs = argv + 1 + setParams.length,
442                            .length = 2
443                           };
444
445     return setPubStringCommon(ctx, &setParams, &pubParams);
446 }
447
448 int SetMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
449 {
450     if (argc < 7 || (argc % 2) == 0)
451         return RedisModule_WrongArity(ctx);
452
453     RedisModule_AutoMemory(ctx);
454     long long setPairsCount, pubPairsCount;
455     RedisModule_StringToLongLong(argv[1], &setPairsCount);
456     RedisModule_StringToLongLong(argv[2], &pubPairsCount);
457     if (setPairsCount < 1 || pubPairsCount < 1)
458         return RedisModule_ReplyWithError(ctx, "ERR SET_PAIR_COUNT and PUB_PAIR_COUNT must be greater than zero");
459
460     long long setLen, pubLen;
461     setLen = 2*setPairsCount;
462     pubLen = 2*pubPairsCount;
463
464     if (setLen + pubLen + 3 != argc)
465         return RedisModule_ReplyWithError(ctx, "ERR SET_PAIR_COUNT or PUB_PAIR_COUNT do not match the total pair count");
466
467     SetParams setParams = {
468                            .key_val_pairs = argv + 3,
469                            .length = setLen
470                           };
471     PubParams pubParams = {
472                            .channel_msg_pairs = argv + 3 + setParams.length,
473                            .length = pubLen
474                           };
475
476     return setPubStringCommon(ctx, &setParams, &pubParams);
477 }
478
479 int setIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
480 {
481     SetParams setParams = {
482                            .key_val_pairs = argv + 1,
483                            .length = 2
484                           };
485     PubParams pubParams = {
486                            .channel_msg_pairs = argv + 4,
487                            .length = argc - 4
488                           };
489     RedisModuleString *key = setParams.key_val_pairs[0];
490     RedisModuleString *oldvalstr = argv[3];
491
492     int type = getKeyType(ctx, key);
493     if (flag == OBJ_OP_IE && type == REDISMODULE_KEYTYPE_EMPTY) {
494         return RedisModule_ReplyWithNull(ctx);
495     } else if (type != REDISMODULE_KEYTYPE_STRING && type != REDISMODULE_KEYTYPE_EMPTY) {
496         return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
497     }
498
499     RedisModuleCallReply *reply = RedisModule_Call(ctx, "GET", "s", key);
500     ASSERT_NOERROR(reply)
501     bool is_equal = replyContentsEqualString(reply, oldvalstr);
502     RedisModule_FreeCallReply(reply);
503     if ((flag == OBJ_OP_IE && !is_equal) ||
504         (flag == OBJ_OP_NE && is_equal)) {
505         return RedisModule_ReplyWithNull(ctx);
506     }
507
508     return setPubStringCommon(ctx, &setParams, &pubParams);
509 }
510
511 int SetIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
512 {
513     if (argc != 6)
514         return RedisModule_WrongArity(ctx);
515
516     RedisModule_AutoMemory(ctx);
517     return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
518 }
519
520 int SetIEMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
521 {
522     if (argc < 6 || (argc % 2) != 0)
523         return RedisModule_WrongArity(ctx);
524
525     RedisModule_AutoMemory(ctx);
526     return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
527 }
528
529 int SetNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
530 {
531     if (argc != 6)
532         return RedisModule_WrongArity(ctx);
533
534     RedisModule_AutoMemory(ctx);
535     return setIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE);
536 }
537
538 int setXXNXPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
539 {
540     SetParams setParams = {
541                            .key_val_pairs = argv + 1,
542                            .length = 2
543                           };
544     PubParams pubParams = {
545                            .channel_msg_pairs = argv + 3,
546                            .length = argc - 3
547                           };
548     RedisModuleString *key = setParams.key_val_pairs[0];
549
550     int type = getKeyType(ctx, key);
551     if ((flag == OBJ_OP_XX && type == REDISMODULE_KEYTYPE_EMPTY) ||
552         (flag == OBJ_OP_NX && type == REDISMODULE_KEYTYPE_STRING)) {
553         return RedisModule_ReplyWithNull(ctx);
554     } else if (type != REDISMODULE_KEYTYPE_STRING && type != REDISMODULE_KEYTYPE_EMPTY) {
555         RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
556         return REDISMODULE_OK;
557     }
558
559     return setPubStringCommon(ctx, &setParams, &pubParams);
560 }
561
562 int SetNXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
563 {
564     if (argc != 5)
565         return RedisModule_WrongArity(ctx);
566
567     RedisModule_AutoMemory(ctx);
568     return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_NX);
569 }
570
571 int SetNXMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
572 {
573     if (argc < 5 || (argc % 2) == 0)
574         return RedisModule_WrongArity(ctx);
575
576     RedisModule_AutoMemory(ctx);
577     return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_NX);
578 }
579
580 int SetXXPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
581 {
582     if (argc != 5)
583         return RedisModule_WrongArity(ctx);
584
585     RedisModule_AutoMemory(ctx);
586     return setXXNXPubStringCommon(ctx, argv, argc, OBJ_OP_XX);
587 }
588
589 int delPubStringCommon(RedisModuleCtx *ctx, DelParams *delParamsPtr, PubParams *pubParamsPtr)
590 {
591     RedisModuleCallReply *reply = RedisModule_Call(ctx, "UNLINK", "v!", delParamsPtr->keys, delParamsPtr->length);
592     ASSERT_NOERROR(reply)
593     int replytype = RedisModule_CallReplyType(reply);
594     if (replytype == REDISMODULE_REPLY_NULL) {
595         RedisModule_ReplyWithNull(ctx);
596     } else if (RedisModule_CallReplyInteger(reply) == 0) {
597         RedisModule_ReplyWithCallReply(ctx, reply);
598     } else {
599         RedisModule_ReplyWithCallReply(ctx, reply);
600         multiPubCommand(ctx, pubParamsPtr);
601     }
602     RedisModule_FreeCallReply(reply);
603     return REDISMODULE_OK;
604 }
605
606 int DelPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
607 {
608     if (argc < 4)
609         return RedisModule_WrongArity(ctx);
610
611     RedisModule_AutoMemory(ctx);
612     DelParams delParams = {
613                            .keys = argv + 1,
614                            .length = argc - 3
615                           };
616     PubParams pubParams = {
617                            .channel_msg_pairs = argv + 1 + delParams.length,
618                            .length = 2
619                           };
620
621     return delPubStringCommon(ctx, &delParams, &pubParams);
622 }
623
624 int DelMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
625 {
626     if (argc < 6)
627         return RedisModule_WrongArity(ctx);
628
629     RedisModule_AutoMemory(ctx);
630     long long delCount, pubPairsCount;
631     RedisModule_StringToLongLong(argv[1], &delCount);
632     RedisModule_StringToLongLong(argv[2], &pubPairsCount);
633     if (delCount < 1 || pubPairsCount < 1)
634         return RedisModule_ReplyWithError(ctx, "ERR DEL_COUNT and PUB_PAIR_COUNT must be greater than zero");
635
636     long long delLen, pubLen;
637     delLen = delCount;
638     pubLen = 2*pubPairsCount;
639     if (delLen + pubLen + 3 != argc)
640         return RedisModule_ReplyWithError(ctx, "ERR DEL_COUNT or PUB_PAIR_COUNT do not match the total pair count");
641
642     DelParams delParams = {
643                            .keys = argv + 3,
644                            .length = delLen
645                           };
646     PubParams pubParams = {
647                            .channel_msg_pairs = argv + 3 + delParams.length,
648                            .length = pubLen
649                           };
650
651     return delPubStringCommon(ctx, &delParams, &pubParams);
652 }
653
654 int delIENEPubStringCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int flag)
655 {
656     DelParams delParams = {
657                            .keys = argv + 1,
658                            .length = 1
659                           };
660     PubParams pubParams = {
661                            .channel_msg_pairs = argv + 3,
662                            .length = argc - 3
663                           };
664     RedisModuleString *key = argv[1];
665     RedisModuleString *oldvalstr = argv[2];
666
667     int type = getKeyType(ctx, key);
668     if (type == REDISMODULE_KEYTYPE_EMPTY) {
669         return RedisModule_ReplyWithLongLong(ctx, 0);
670     } else if (type != REDISMODULE_KEYTYPE_STRING) {
671         return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
672     }
673
674     RedisModuleCallReply *reply = RedisModule_Call(ctx, "GET", "s", key);
675     ASSERT_NOERROR(reply)
676     bool is_equal = replyContentsEqualString(reply, oldvalstr);
677     RedisModule_FreeCallReply(reply);
678     if ((flag == OBJ_OP_IE && !is_equal) ||
679         (flag == OBJ_OP_NE && is_equal)) {
680         return RedisModule_ReplyWithLongLong(ctx, 0);
681     }
682
683     return delPubStringCommon(ctx, &delParams, &pubParams);
684 }
685
686 int DelIEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
687 {
688     if (argc != 5)
689         return RedisModule_WrongArity(ctx);
690
691     RedisModule_AutoMemory(ctx);
692     return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
693 }
694
695 int DelIEMPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
696 {
697     if (argc < 5 || (argc % 2) == 0)
698         return RedisModule_WrongArity(ctx);
699
700     RedisModule_AutoMemory(ctx);
701     return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_IE);
702 }
703
704 int DelNEPub_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
705 {
706     if (argc != 5)
707         return RedisModule_WrongArity(ctx);
708
709     RedisModule_AutoMemory(ctx);
710     return delIENEPubStringCommon(ctx, argv, argc, OBJ_OP_NE);
711 }
712
713 int Nget_RedisCommand(RedisModuleCtx *ctx, NgetArgs* nget_args, bool using_threadsafe_context)
714 {
715     int ret = REDISMODULE_OK;
716     size_t replylen = 0;
717     RedisModuleCallReply *reply = NULL;
718     ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
719     ScanSomeState scan_state;
720     ScannedKeys *scanned_keys;
721
722     scan_state.key = nget_args->key;
723     scan_state.count = nget_args->count;
724     scan_state.cursor = 0;
725
726     RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
727     do {
728         lockThreadsafeContext(ctx, using_threadsafe_context);
729
730         status = EXSTRINGS_STATUS_NOT_SET;
731         scanned_keys = scanSome(ctx, &scan_state, &status);
732
733         if (status != EXSTRINGS_STATUS_NO_ERRORS) {
734             unlockThreadsafeContext(ctx, using_threadsafe_context);
735             ret = REDISMODULE_ERR;
736             break;
737         } else if (scanned_keys == NULL) {
738             unlockThreadsafeContext(ctx, using_threadsafe_context);
739             continue;
740         }
741
742         reply = RedisModule_Call(ctx, "MGET", "v", scanned_keys->keys, scanned_keys->len);
743
744         unlockThreadsafeContext(ctx, using_threadsafe_context);
745
746         status = EXSTRINGS_STATUS_NOT_SET;
747         forwardIfError(ctx, reply, &status);
748         if (status != EXSTRINGS_STATUS_NO_ERRORS) {
749             freeScannedKeys(ctx, scanned_keys);
750             ret = REDISMODULE_ERR;
751             break;
752         }
753
754         size_t i;
755         for (i = 0; i < scanned_keys->len; i++) {
756             RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(reply, i));
757             if (rms) {
758                 RedisModule_ReplyWithString(ctx, scanned_keys->keys[i]);
759                 RedisModule_ReplyWithString(ctx, rms);
760                 RedisModule_FreeString(ctx, rms);
761                 replylen += 2;
762             }
763         }
764         RedisModule_FreeCallReply(reply);
765         freeScannedKeys(ctx, scanned_keys);
766     } while (scan_state.cursor != 0);
767
768     RedisModule_ReplySetArrayLength(ctx,replylen);
769     return ret;
770 }
771
772 /* The thread entry point that actually executes the blocking part
773  * of the command nget.noatomic
774  */
775 void *NGet_NoAtomic_ThreadMain(void *arg)
776 {
777     RedisModuleBlockedClientArgs *bca = arg;
778     RedisModuleBlockedClient *bc = bca->bc;
779     RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
780
781     Nget_RedisCommand(ctx, &bca->nget_args, true);
782     RedisModule_FreeThreadSafeContext(ctx);
783     RedisModule_UnblockClient(bc, NULL);
784     RedisModule_Free(bca);
785     return NULL;
786 }
787
788 int NGet_NoAtomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
789 {
790     RedisModule_AutoMemory(ctx);
791     pthread_t tid;
792
793     InitStaticVariable();
794
795     RedisModuleBlockedClientArgs *bca = RedisModule_Alloc(sizeof(RedisModuleBlockedClientArgs));
796     if (bca == NULL) {
797         RedisModule_ReplyWithError(ctx,"-ERR Out of memory");
798         return REDISMODULE_ERR;
799     }
800
801     ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
802     readNgetArgs(ctx, argv, argc, &bca->nget_args, &status);
803     if (status != EXSTRINGS_STATUS_NO_ERRORS) {
804         RedisModule_Free(bca);
805         return REDISMODULE_ERR;
806     }
807
808     /* Note that when blocking the client we do not set any callback: no
809      * timeout is possible since we passed '0', nor we need a reply callback
810      * because we'll use the thread safe context to accumulate a reply. */
811     RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);
812
813     bca->bc = bc;
814
815     /* Now that we setup a blocking client, we need to pass the control
816      * to the thread. However we need to pass arguments to the thread:
817      * the reference to the blocked client handle. */
818     if (pthread_create(&tid,NULL,NGet_NoAtomic_ThreadMain,bca) != 0) {
819         RedisModule_AbortBlock(bc);
820         RedisModule_Free(bca);
821         return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
822     }
823
824     return REDISMODULE_OK;
825 }
826
827 int NGet_Atomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
828 {
829     RedisModule_AutoMemory(ctx);
830     NgetArgs nget_args;
831     ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
832
833     InitStaticVariable();
834
835     readNgetArgs(ctx, argv, argc, &nget_args, &status);
836     if (status != EXSTRINGS_STATUS_NO_ERRORS) {
837         return REDISMODULE_ERR;
838     }
839
840     return Nget_RedisCommand(ctx, &nget_args, false);
841 }
842
843 int NDel_Atomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
844 {
845     RedisModule_AutoMemory(ctx);
846     int ret = REDISMODULE_OK;
847     long long replylen = 0;
848     RedisModuleCallReply *reply = NULL;
849     ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
850     ScanSomeState scan_state;
851     ScannedKeys *scanned_keys = NULL;
852
853     InitStaticVariable();
854     if (argc != 2)
855         return RedisModule_WrongArity(ctx);
856
857     scan_state.key = argv[1];
858     scan_state.count = def_count_str;
859     scan_state.cursor = 0;
860
861     do {
862         status = EXSTRINGS_STATUS_NOT_SET;
863         scanned_keys = scanSome(ctx, &scan_state, &status);
864
865         if (status != EXSTRINGS_STATUS_NO_ERRORS) {
866             ret = REDISMODULE_ERR;
867             break;
868         } else if (scanned_keys == NULL) {
869             continue;
870         }
871
872         reply = RedisModule_Call(ctx, "UNLINK", "v!", scanned_keys->keys, scanned_keys->len);
873
874         status = EXSTRINGS_STATUS_NOT_SET;
875         forwardIfError(ctx, reply, &status);
876         if (status != EXSTRINGS_STATUS_NO_ERRORS) {
877             freeScannedKeys(ctx, scanned_keys);
878             ret = REDISMODULE_ERR;
879             break;
880         }
881
882         replylen += RedisModule_CallReplyInteger(reply);
883         RedisModule_FreeCallReply(reply);
884         freeScannedKeys(ctx, scanned_keys);
885     } while (scan_state.cursor != 0);
886
887     if (ret == REDISMODULE_OK) {
888         RedisModule_ReplyWithLongLong(ctx, replylen);
889     }
890
891     return ret;
892 }
893
894 /* This function must be present on each Redis module. It is used in order to
895  * register the commands into the Redis server. */
896 int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
897     REDISMODULE_NOT_USED(argv);
898     REDISMODULE_NOT_USED(argc);
899
900     if (RedisModule_Init(ctx,"exstrings",1,REDISMODULE_APIVER_1)
901         == REDISMODULE_ERR) return REDISMODULE_ERR;
902
903     if (RedisModule_CreateCommand(ctx,"setie",
904         SetIE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
905         return REDISMODULE_ERR;
906
907     if (RedisModule_CreateCommand(ctx,"setne",
908         SetNE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
909         return REDISMODULE_ERR;
910
911     if (RedisModule_CreateCommand(ctx,"delie",
912         DelIE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
913         return REDISMODULE_ERR;
914
915     if (RedisModule_CreateCommand(ctx,"delne",
916         DelNE_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
917         return REDISMODULE_ERR;
918
919     if (RedisModule_CreateCommand(ctx,"nget.atomic",
920         NGet_Atomic_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
921         return REDISMODULE_ERR;
922
923     if (RedisModule_CreateCommand(ctx,"nget.noatomic",
924         NGet_NoAtomic_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
925         return REDISMODULE_ERR;
926
927     if (RedisModule_CreateCommand(ctx,"ndel.atomic",
928         NDel_Atomic_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
929         return REDISMODULE_ERR;
930
931     if (RedisModule_CreateCommand(ctx,"msetpub",
932         SetPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
933         return REDISMODULE_ERR;
934
935     if (RedisModule_CreateCommand(ctx,"msetmpub",
936         SetMPub_RedisCommand,"write deny-oom pubsub",1,1,1) == REDISMODULE_ERR)
937         return REDISMODULE_ERR;
938
939     if (RedisModule_CreateCommand(ctx,"setiepub",
940         SetIEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
941         return REDISMODULE_ERR;
942
943     if (RedisModule_CreateCommand(ctx,"setiempub",
944         SetIEMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
945         return REDISMODULE_ERR;
946
947     if (RedisModule_CreateCommand(ctx,"setnepub",
948         SetNEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
949         return REDISMODULE_ERR;
950
951     if (RedisModule_CreateCommand(ctx,"setxxpub",
952         SetXXPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
953         return REDISMODULE_ERR;
954
955     if (RedisModule_CreateCommand(ctx,"setnxpub",
956         SetNXPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
957         return REDISMODULE_ERR;
958
959     if (RedisModule_CreateCommand(ctx,"setnxmpub",
960         SetNXMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
961         return REDISMODULE_ERR;
962
963     if (RedisModule_CreateCommand(ctx,"delpub",
964         DelPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
965         return REDISMODULE_ERR;
966
967     if (RedisModule_CreateCommand(ctx,"delmpub",
968         DelMPub_RedisCommand,"write deny-oom pubsub",1,1,1) == REDISMODULE_ERR)
969         return REDISMODULE_ERR;
970
971     if (RedisModule_CreateCommand(ctx,"deliepub",
972         DelIEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
973         return REDISMODULE_ERR;
974
975     if (RedisModule_CreateCommand(ctx,"deliempub",
976         DelIEMPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
977         return REDISMODULE_ERR;
978
979     if (RedisModule_CreateCommand(ctx,"delnepub",
980         DelNEPub_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
981         return REDISMODULE_ERR;
982
983     return REDISMODULE_OK;
984 }