Skip to content

Commit 9624857

Browse files
committed
Refactor - Create a function that parses all run commands in command parser file. The parsing is split according to the original command, and at the end we always create run info and sent it to queues.
1 parent ae48edc commit 9624857

17 files changed

+284
-298
lines changed

src/CMakeLists.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ ADD_LIBRARY(redisai_obj OBJECT
66
util/dict.c
77
util/queue.c
88
redisai.c
9+
command_parser.c
910
run_info.c
1011
background_workers.c
1112
config.c
12-
dag.c
13-
dag_parser.c
13+
DAG/dag.c
14+
DAG/dag_parser.c
1415
modelRun_ctx.c
15-
async_llapi.c
1616
backends.c
1717
backends/util.c
1818
model.c

src/dag.c renamed to src/DAG/dag.c

Lines changed: 8 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -874,53 +874,21 @@ void DAG_ReplyAndUnblock(RedisAI_OnFinishCtx *ctx, void *private_data) {
874874
RedisModule_UnblockClient(rinfo->client, rinfo);
875875
}
876876

877-
int RedisAI_ProcessDagRunCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
878-
int dagMode) {
879-
880-
int flags = RedisModule_GetContextFlags(ctx);
881-
bool blocking_not_allowed = (flags & (REDISMODULE_CTX_FLAGS_MULTI | REDISMODULE_CTX_FLAGS_LUA));
882-
if (blocking_not_allowed)
883-
return RedisModule_ReplyWithError(
884-
ctx, "ERR Cannot run RedisAI command within a transaction or a LUA script");
885-
RedisAI_RunInfo *rinfo = NULL;
886-
if (RAI_InitRunInfo(&rinfo) == REDISMODULE_ERR) {
887-
RedisModule_ReplyWithError(
888-
ctx, "ERR Unable to allocate the memory and initialise the RedisAI_RunInfo structure");
889-
return REDISMODULE_ERR;
890-
}
891-
// Parse DAG string command and store the data in rinfo obj.
892-
int status = DAG_CommandParser(ctx, argv, argc, dagMode, &rinfo);
893-
if (status == REDISMODULE_ERR)
894-
return REDISMODULE_OK;
895-
// Block the client before adding rinfo to the run queues (sync call).
896-
rinfo->client = RedisModule_BlockClient(ctx, RedisAI_DagRun_Reply, NULL, RunInfo_FreeData, 0);
897-
RedisModule_SetDisconnectCallback(rinfo->client, RedisAI_Disconnected);
898-
rinfo->OnFinish = DAG_ReplyAndUnblock;
899-
return DAG_InsertDAGToQueue(rinfo);
900-
}
877+
int Dag_PopulateSingleModelRunOp(RedisAI_RunInfo *rinfo, RAI_ModelRunCtx *mctx,
878+
RedisModuleString **inkeys, RedisModuleString **outkeys,
879+
RedisModuleString *runkey, long long timeout) {
901880

902-
RedisAI_RunInfo *Dag_CreateFromSingleModelRunOp(RAI_ModelRunCtx *mctx, RAI_Error *error,
903-
RedisModuleString **inkeys,
904-
RedisModuleString **outkeys,
905-
RedisModuleString *runkey, long long timeout) {
906-
RedisAI_RunInfo *rinfo = NULL;
907-
if (RAI_InitRunInfo(&rinfo) == REDISMODULE_ERR) {
908-
RAI_SetError(
909-
error, RedisAI_ErrorCode_EMODELRUN,
910-
"ERR Unable to allocate the memory and initialise the RedisAI_RunInfo structure");
911-
return NULL;
912-
}
881+
assert(rinfo->single_op_dag);
913882
rinfo->single_device_dag = 1;
914-
rinfo->single_op_dag = 1;
915883
rinfo->dagOpCount = 1;
916884
rinfo->timeout = timeout;
917885

918886
RAI_DagOp *currentDagOp;
919887
if (RAI_InitDagOp(&currentDagOp) == REDISMODULE_ERR) {
920-
RAI_SetError(error, RedisAI_ErrorCode_EMODELRUN,
888+
RAI_SetError(rinfo->err, RedisAI_ErrorCode_EMODELRUN,
921889
"ERR Unable to allocate the memory and initialise the RAI_dagOp structure");
922-
return NULL;
923-
};
890+
return REDISMODULE_ERR;
891+
}
924892
rinfo->dagOps = array_append(rinfo->dagOps, currentDagOp);
925893

926894
RAI_DagOp *currentOp = rinfo->dagOps[0];
@@ -931,5 +899,5 @@ RedisAI_RunInfo *Dag_CreateFromSingleModelRunOp(RAI_ModelRunCtx *mctx, RAI_Error
931899
currentOp->outkeys = outkeys;
932900
currentOp->runkey = runkey;
933901

934-
return rinfo;
902+
return REDISMODULE_OK;
935903
}

src/dag.h renamed to src/DAG/dag.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,18 +162,17 @@ void RunInfo_FreeData(RedisModuleCtx *ctx, void *rinfo);
162162
void RedisAI_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);
163163

164164
/**
165-
* @brief Create a DAG with a single operation of MODELRUN.
165+
* @brief Populate a DAG with a single operation of MODELRUN.
166+
* @param rinfo An existing DAG to populate.
166167
* @param mctx ModelRunCtx that represents the single MODELRUN op.
167-
* @param error RAI_Error object to store error if one occurs.
168168
* @param inkeys The DAG operation inkeys (the input tensors).
169169
* @param outkeys The DAG operation outkeys (the output tensors).
170170
* @param runkey The model key.
171171
* @param timeout The operation timeout (if given, otherwise it is zero).
172172
*/
173173

174-
RedisAI_RunInfo *Dag_CreateFromSingleModelRunOp(RAI_ModelRunCtx *mctx, RAI_Error *error,
175-
RedisModuleString **inkeys,
176-
RedisModuleString **outkeys,
177-
RedisModuleString *runkey, long long timeout);
174+
int Dag_PopulateSingleModelRunOp(RedisAI_RunInfo *rinfo, RAI_ModelRunCtx *mctx,
175+
RedisModuleString **inkeys, RedisModuleString **outkeys,
176+
RedisModuleString *runkey, long long timeout);
178177

179178
#endif /* SRC_DAG_H_ */

src/dag_parser.c renamed to src/DAG/dag_parser.c

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,21 @@
77

88
/**
99
* DAGRUN Building Block to parse [LOAD <nkeys> key1 key2... ]
10+
*
11+
* @param ctx Context in which Redis modules operate
12+
* @param argv Redis command arguments, as an array of strings
13+
* @param argc Redis command number of arguments
14+
* @param loadedContextDict local non-blocking hash table containing key names
15+
* loaded from the keyspace tensors
16+
* @param localContextDict local non-blocking hash table containing DAG's
17+
* tensors
18+
* @param chaining_operator operator used to split operations. Any command
19+
* argument after the chaining operator is not considered
20+
* @return processed number of arguments on success, or -1 if the parsing failed
1021
*/
11-
int RAI_parseDAGLoadArgs(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
12-
AI_dict **loadedContextDict, AI_dict **localContextDict,
13-
const char *chaining_operator) {
22+
static int DAG_ParseLoadArgs(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
23+
AI_dict **loadedContextDict, AI_dict **localContextDict,
24+
const char *chaining_operator) {
1425
if (argc < 3) {
1526
RedisModule_WrongArity(ctx);
1627
return -1;
@@ -60,9 +71,18 @@ int RAI_parseDAGLoadArgs(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
6071

6172
/**
6273
* DAGRUN Building Block to parse [PERSIST <nkeys> key1 key2... ]
74+
*
75+
* @param ctx Context in which Redis modules operate
76+
* @param argv Redis command arguments, as an array of strings
77+
* @param argc Redis command number of arguments
78+
* @param localContextDict local non-blocking hash table containing DAG's
79+
* keynames marked as persistent
80+
* @param chaining_operator operator used to split operations. Any command
81+
* argument after the chaining operator is not considered
82+
* @return processed number of arguments on success, or -1 if the parsing failed
6383
*/
64-
int RAI_parseDAGPersistArgs(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
65-
AI_dict **persistContextDict, const char *chaining_operator) {
84+
static int DAG_ParsePersistArgs(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
85+
AI_dict **persistContextDict, const char *chaining_operator) {
6686
if (argc < 3) {
6787
RedisModule_WrongArity(ctx);
6888
return -1;
@@ -129,9 +149,9 @@ int DAG_CommandParser(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, i
129149
if (!strcasecmp(arg_string, "LOAD") && !load_complete) {
130150
/* Load the required tensors from key space and store them in both
131151
dagTensorsLoadedContext and dagTensorsContext dicts. */
132-
const int parse_result = RAI_parseDAGLoadArgs(ctx, &argv[arg_pos], argc - arg_pos,
133-
&(rinfo->dagTensorsLoadedContext),
134-
&(rinfo->dagTensorsContext), "|>");
152+
const int parse_result = DAG_ParseLoadArgs(ctx, &argv[arg_pos], argc - arg_pos,
153+
&(rinfo->dagTensorsLoadedContext),
154+
&(rinfo->dagTensorsContext), "|>");
135155
if (parse_result > 0) {
136156
arg_pos += parse_result - 1;
137157
load_complete = true;
@@ -148,7 +168,7 @@ int DAG_CommandParser(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, i
148168
}
149169
/* Store the keys to persist in dagTensorsPersistedContext dict.
150170
These keys will be populated late on with actual tensors. */
151-
const int parse_result = RAI_parseDAGPersistArgs(
171+
const int parse_result = DAG_ParsePersistArgs(
152172
ctx, &argv[arg_pos], argc - arg_pos, &(rinfo->dagTensorsPersistedContext), "|>");
153173
if (parse_result > 0) {
154174
arg_pos += parse_result - 1;

src/DAG/dag_parser.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#ifndef REDISAI_DAG_PARSER_H
2+
#define REDISAI_DAG_PARSER_H
3+
4+
#include "run_info.h"
5+
6+
int DAG_CommandParser(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int dagMode,
7+
RedisAI_RunInfo **rinfo_ptr);
8+
9+
#endif // REDISAI_DAG_PARSER_H

src/async_llapi.c

Lines changed: 0 additions & 20 deletions
This file was deleted.

src/async_llapi.h

Lines changed: 0 additions & 19 deletions
This file was deleted.

src/background_workers.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
*/
1010

1111
#include "background_workers.h"
12-
#include "dag.h"
12+
#include "DAG/dag.h"
1313
#include "model.h"
1414
#include "redisai.h"
1515
#include "rmutil/alloc.h"

src/background_workers.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
#include <pthread.h>
2020

2121
#include "config.h"
22-
#include "dag.h"
22+
#include "DAG/dag.h"
2323
#include "model.h"
2424
#include "redisai.h"
2525
#include "rmutil/alloc.h"

0 commit comments

Comments
 (0)