+int Nget_RedisCommand(RedisModuleCtx *ctx, NgetArgs* nget_args, bool using_threadsafe_context)
+{
+ int ret = REDISMODULE_OK;
+ size_t replylen = 0;
+ RedisModuleCallReply *reply = NULL;
+ ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
+ ScanSomeState scan_state;
+ ScannedKeys *scanned_keys;
+
+ scan_state.key = nget_args->key;
+ scan_state.count = nget_args->count;
+ scan_state.cursor = 0;
+
+ RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
+ do {
+ lockThreadsafeContext(ctx, using_threadsafe_context);
+
+ status = EXSTRINGS_STATUS_NOT_SET;
+ scanned_keys = scanSome(ctx, &scan_state, &status);
+
+ if (status != EXSTRINGS_STATUS_NO_ERRORS) {
+ unlockThreadsafeContext(ctx, using_threadsafe_context);
+ ret = REDISMODULE_ERR;
+ break;
+ } else if (scanned_keys == NULL) {
+ unlockThreadsafeContext(ctx, using_threadsafe_context);
+ continue;
+ }
+
+ reply = RedisModule_Call(ctx, "MGET", "v", scanned_keys->keys, scanned_keys->len);
+
+ unlockThreadsafeContext(ctx, using_threadsafe_context);
+
+ status = EXSTRINGS_STATUS_NOT_SET;
+ forwardIfError(ctx, reply, &status);
+ if (status != EXSTRINGS_STATUS_NO_ERRORS) {
+ freeScannedKeys(ctx, scanned_keys);
+ ret = REDISMODULE_ERR;
+ break;
+ }
+
+ size_t i;
+ for (i = 0; i < scanned_keys->len; i++) {
+ RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(reply, i));
+ if (rms) {
+ RedisModule_ReplyWithString(ctx, scanned_keys->keys[i]);
+ RedisModule_ReplyWithString(ctx, rms);
+ RedisModule_FreeString(ctx, rms);
+ replylen += 2;
+ }
+ }
+ RedisModule_FreeCallReply(reply);
+ freeScannedKeys(ctx, scanned_keys);
+ } while (scan_state.cursor != 0);
+
+ RedisModule_ReplySetArrayLength(ctx,replylen);
+ return ret;
+}
+
+/* The thread entry point that actually executes the blocking part
+ * of the command nget.noatomic
+ */
+void *NGet_NoAtomic_ThreadMain(void *arg)
+{
+ pthread_detach(pthread_self());
+
+ RedisModuleBlockedClientArgs *bca = arg;
+ RedisModuleBlockedClient *bc = bca->bc;
+ RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
+
+ Nget_RedisCommand(ctx, &bca->nget_args, true);
+ RedisModule_FreeThreadSafeContext(ctx);
+ RedisModule_UnblockClient(bc, NULL);
+ RedisModule_Free(bca);
+ return NULL;
+}
+
+int NGet_NoAtomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ RedisModule_AutoMemory(ctx);
+ pthread_t tid;
+
+ InitStaticVariable();
+
+ RedisModuleBlockedClientArgs *bca = RedisModule_Alloc(sizeof(RedisModuleBlockedClientArgs));
+ if (bca == NULL) {
+ RedisModule_ReplyWithError(ctx,"-ERR Out of memory");
+ return REDISMODULE_ERR;
+ }
+
+ ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
+ readNgetArgs(ctx, argv, argc, &bca->nget_args, &status);
+ if (status != EXSTRINGS_STATUS_NO_ERRORS) {
+ RedisModule_Free(bca);
+ return REDISMODULE_ERR;
+ }
+
+ /* Note that when blocking the client we do not set any callback: no
+ * timeout is possible since we passed '0', nor we need a reply callback
+ * because we'll use the thread safe context to accumulate a reply. */
+ RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);
+
+ bca->bc = bc;
+
+ /* Now that we setup a blocking client, we need to pass the control
+ * to the thread. However we need to pass arguments to the thread:
+ * the reference to the blocked client handle. */
+ if (pthread_create(&tid,NULL,NGet_NoAtomic_ThreadMain,bca) != 0) {
+ RedisModule_AbortBlock(bc);
+ RedisModule_Free(bca);
+ return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
+ }
+
+ return REDISMODULE_OK;
+}
+
+int NGet_Atomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ RedisModule_AutoMemory(ctx);
+ NgetArgs nget_args;
+ ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
+
+ InitStaticVariable();
+
+ readNgetArgs(ctx, argv, argc, &nget_args, &status);
+ if (status != EXSTRINGS_STATUS_NO_ERRORS) {
+ return REDISMODULE_ERR;
+ }
+
+ return Nget_RedisCommand(ctx, &nget_args, false);
+}
+
+int NDel_Atomic_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ RedisModule_AutoMemory(ctx);
+ int ret = REDISMODULE_OK;
+ long long replylen = 0;
+ RedisModuleCallReply *reply = NULL;
+ ExstringsStatus status = EXSTRINGS_STATUS_NOT_SET;
+ ScanSomeState scan_state;
+ ScannedKeys *scanned_keys = NULL;
+
+ InitStaticVariable();
+ if (argc != 2)
+ return RedisModule_WrongArity(ctx);
+
+ scan_state.key = argv[1];
+ scan_state.count = def_count_str;
+ scan_state.cursor = 0;
+
+ do {
+ status = EXSTRINGS_STATUS_NOT_SET;
+ scanned_keys = scanSome(ctx, &scan_state, &status);
+
+ if (status != EXSTRINGS_STATUS_NO_ERRORS) {
+ ret = REDISMODULE_ERR;
+ break;
+ } else if (scanned_keys == NULL) {
+ continue;
+ }
+
+ reply = RedisModule_Call(ctx, "UNLINK", "v!", scanned_keys->keys, scanned_keys->len);
+
+ status = EXSTRINGS_STATUS_NOT_SET;
+ forwardIfError(ctx, reply, &status);
+ if (status != EXSTRINGS_STATUS_NO_ERRORS) {
+ freeScannedKeys(ctx, scanned_keys);
+ ret = REDISMODULE_ERR;
+ break;
+ }
+
+ replylen += RedisModule_CallReplyInteger(reply);
+ RedisModule_FreeCallReply(reply);
+ freeScannedKeys(ctx, scanned_keys);
+ } while (scan_state.cursor != 0);
+
+ if (ret == REDISMODULE_OK) {
+ RedisModule_ReplyWithLongLong(ctx, replylen);
+ }
+
+ return ret;
+}
+