-
Notifications
You must be signed in to change notification settings - Fork 106
Multi-device parallel dag #424
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## master #424 +/- ##
==========================================
+ Coverage 73.26% 74.67% +1.40%
==========================================
Files 21 21
Lines 4579 4849 +270
==========================================
+ Hits 3355 3621 +266
- Misses 1224 1228 +4
Continue to review full report at Codecov.
|
c343883
to
d62fb4b
Compare
test/tests_tensorflow.py
Outdated
exception = e | ||
env.assertEqual(type(exception), redis.exceptions.ResponseError) | ||
env.assertEqual("tensor key is empty", exception.__str__()) | ||
env.assertEqual("Number of names given as OUTPUTS during MODELSET and keys given as INPUTS here do not match", exception.__str__()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
env.assertEqual("Number of names given as OUTPUTS during MODELSET and keys given as INPUTS here do not match", exception.__str__()) | |
env.assertEqual("Number of names given as OUTPUTS during MODELSET and keys given as OUTPUTS here do not match", exception.__str__()) |
test/tests_dag.py
Outdated
|
||
ret = con.execute_command( | ||
'AI.DAGRUN_RO', '|>', | ||
'AI.DAGRUN_RO', # '|>', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#
?
test/tests_dag.py
Outdated
exception = e | ||
env.assertEqual(type(exception), redis.exceptions.ResponseError) | ||
env.assertEqual("ERR unsupported command within DAG",exception.__str__()) | ||
env.assertEqual("INPUTS not specified",exception.__str__()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why INPUTS
not specified?
src/background_workers.c
Outdated
for (long long i=0; i<strlen(output); i++) { | ||
output[i] = toupper(output[i]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get the string length outside the for loop or change into while
loop
long long i = 0;
while(output[i]) {
output[i]=toupper(output[i]);
i++
}
src/background_workers.c
Outdated
if (use_local_context == 1) { | ||
pthread_mutex_lock(evicted_rinfo->dagMutex); | ||
dagError = *evicted_rinfo->dagError; | ||
pthread_mutex_unlock(evicted_rinfo->dagMutex); | ||
} | ||
if (use_local_context == 1 && dag_complete == 0 && !dagError) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all you care about here is RunInfo
with use_local_context == 1
right?
- can you collect them from the evicted items without locking the
run_queue_mutex
- you can avoid the second
if
statement since the condition above is not valid
I guess reintroducing items to the queue needs to be synched, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing is that dagError
is a pointer that is shared between worker threads (there are copies of rinfo in every pertinent queue that share some of the items, like dagMutex
, dagError
and the dagOps
- see https://github.com/RedisAI/RedisAI/blob/multidevice_dag/src/run_info.c#L147), so we indeed need to acquire the lock. The rest of the condition needs to happen only if no errors were generated in the step.
Would you simplify it further
src/dag.c
Outdated
RedisModule_UnblockClient(rinfo->client, rinfo); | ||
} | ||
pthread_mutex_unlock(rinfo->dagMutex); | ||
return NULL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change function signature to return void
src/dag.c
Outdated
int dag_error = 0; | ||
char *detail_oneline; | ||
|
||
for (size_t i = 0; i < array_len(rinfo->dagOps); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use array_len(rinfo->dagOps)
as variable
src/dag.c
Outdated
array_free(rinfo_copies); | ||
|
||
return REDISMODULE_OK; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I'm not sure I get this one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I get it now :-)
"ERR PERSIST key cannot be found in DAG"); | ||
} | ||
int *instance = AI_dictGetVal(mangled_entry); | ||
RedisModuleString *mangled_key = RedisModule_CreateStringPrintf(ctx, "%s%04d", key, *instance); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
possible leak?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch, now that we mangle we need to free the inkeys
and outkeys
strings when freeing the ops.
src/dag.c
Outdated
|
||
const char* master_device = rinfo->dagOps[array_len(rinfo->dagOps)-1]->devicestr; | ||
|
||
RedisAI_RunInfo **rinfo_copies = array_new(RedisAI_RunInfo*, 10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allocate with capacity as the number of devices
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lantiga lets push together increased testing on the uncovered parts added, and also include the documentation to the added methods ( you already describe the changes in the PR so we just need to add the relevant description to the code as well )
src/run_info.h
Outdated
int dagReplyLength; | ||
int dagNumberCommands; | ||
int *dagError; | ||
pthread_mutex_t* dagMutex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
explain the necessity of this addition
src/dag.c
Outdated
currentOp->result = REDISMODULE_ERR; | ||
break; | ||
} | ||
void *RedisAI_DagRunSession_TensorSet_Step(RedisAI_RunInfo *rinfo, RAI_DagOp *currentOp, int *progress) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document me
src/dag.c
Outdated
return NULL; | ||
} | ||
|
||
void *RedisAI_DagRunSession_TensorGet_Step(RedisAI_RunInfo *rinfo, RAI_DagOp *currentOp, int *progress) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document me
src/dag.c
Outdated
return NULL; | ||
} | ||
|
||
void *RedisAI_DagRunSession_ModelRun_Step(RedisAI_RunInfo *rinfo, RAI_DagOp *currentOp, int *progress) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document me
src/dag.c
Outdated
return NULL; | ||
} | ||
|
||
void *RedisAI_DagRunSession_ScriptRun_Step(RedisAI_RunInfo *rinfo, RAI_DagOp *currentOp, int *progress) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document me
RAI_Tensor *inputTensor; | ||
const int get_result = RAI_getTensorFromLocalContext( | ||
NULL, rinfo->dagTensorsContext, RedisModule_StringPtrLen(currentOp->inkeys[i], NULL), &inputTensor, currentOp->err); | ||
if (get_result == REDISMODULE_ERR) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add testcase for it. coverage report pointing this line as not covered
src/dag.c
Outdated
} | ||
|
||
for (int i=0; i<array_len(currentOp->outkeys); i++) { | ||
if (!RAI_ScriptRunCtxAddOutput(currentOp->sctx)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add testcase for it. coverage report pointing this line as not covered
src/dag.c
Outdated
|
||
if (*rinfo->dagError == 1 && rinfo->client != NULL) { | ||
RedisModule_UnblockClient(rinfo->client, rinfo); | ||
pthread_mutex_unlock(rinfo->dagMutex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add testcase for it. coverage report pointing this line as not covered
struct RedisAI_RunStats *rstats = NULL; | ||
const char *runkey = RedisModule_StringPtrLen(currentOp->runkey, NULL); | ||
RAI_GetRunStats(runkey,&rstats); | ||
if (currentOp->result == REDISMODULE_ERR) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add testcase for it. coverage report pointing this line as not covered
src/background_workers.c
Outdated
queueUnpop(run_queue_info->run_queue, evicted_rinfo); | ||
} | ||
else { | ||
if (queueLength(run_queue_info->run_queue) > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testcase for it. codecov report showcasing we're not covering this
During coverage increase I unearthed a race condition that can lead to a crash. There's a portion of a test in What happens is the following:
The script errors out and unblocks, but model2 is still running. We should indeed only unblock when all threads have done processing. This already happens under normal circumstances, but when errors are generated in-between with certain DAG topologies this is not explicitly checked for. We need to take care of this prior to merging the PR. |
The edge case has now been fixed. Unblocking is now controlled through ref counting. |
263f47c
to
a51b1f5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Lets merge it and address the pending issue with this already in-placed
This PR addresses #370, it brings
The way DAGs are handled has been refactored. This opens the door for #328 (enabling batching for individual DAG commands).
This is a WIP, a few items are still TODO:The way we allow DAG operations to run on different devices in parallel (when possible) is the following: instead of running the whole DAG in one swoop as in the previous implementation, the DAG run info is created on one queue/device and shallow copied (appropriately) across other queues/devices as indicated by the DAG specification. A DAG mutex is shared across all copies.
The DAG run info is placed on the queue for each device and evicted for execution. Execution happens one DAG op at a time: once the individual op has executed, it is marked as such and the DAG run info is placed back on the queue.
The current un-executed op is checked for its inputs. If all inputs are found in the tensor context, then the DAG op can be executed. If not, the execution quits and control is given back to the worker. If there are other items in the queue the op is placed after the next item.
When all ops for a device have been executed, the DAG is not placed back on the queue. When all ops in a DAG have been executed or an error occurs, the client is unblocked.