Skip to content

Commit f6eadf0

Browse files
authored
Merge pull request #4577 from shivammonaka/Threading_Callback
Introduced callback to Pthread, Win32 and OpenMP backend
2 parents 61214fc + ddcd7d6 commit f6eadf0

File tree

8 files changed

+345
-213
lines changed

8 files changed

+345
-213
lines changed

cblas.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ char* openblas_get_config(void);
2626
/*Get the CPU corename on runtime.*/
2727
char* openblas_get_corename(void);
2828

29+
/*Set the threading backend to a custom callback.*/
30+
typedef void (*openblas_dojob_callback)(int thread_num, void *jobdata, int dojob_data);
31+
typedef void (*openblas_threads_callback)(int sync, openblas_dojob_callback dojob, int numjobs, size_t jobdata_elsize, void *jobdata, int dojob_data);
32+
void openblas_set_threads_callback_function(openblas_threads_callback callback);
33+
2934
#ifdef OPENBLAS_OS_LINUX
3035
/* Sets thread affinity for OpenBLAS threads. `thread_idx` is in [0, openblas_get_num_threads()-1]. */
3136
int openblas_setaffinity(int thread_idx, size_t cpusetsize, cpu_set_t* cpu_set);

common_interface.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ int BLASFUNC(xerbla)(char *, blasint *info, blasint);
4747

4848
void openblas_set_num_threads_(int *);
4949

50+
/*Set the threading backend to a custom callback.*/
51+
typedef void (*openblas_dojob_callback)(int thread_num, void *jobdata, int dojob_data);
52+
typedef void (*openblas_threads_callback)(int sync, openblas_dojob_callback dojob, int numjobs, size_t jobdata_elsize, void *jobdata, int dojob_data);
53+
extern openblas_threads_callback openblas_threads_callback_;
54+
5055
FLOATRET BLASFUNC(sdot) (blasint *, float *, blasint *, float *, blasint *);
5156
FLOATRET BLASFUNC(sdsdot)(blasint *, float *, float *, blasint *, float *, blasint *);
5257

driver/others/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ if (USE_THREAD)
2525
${BLAS_SERVER}
2626
divtable.c # TODO: Makefile has -UDOUBLE
2727
blas_l1_thread.c
28+
blas_server_callback.c
2829
)
2930

3031
if (NOT NO_AFFINITY)

driver/others/Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ COMMONOBJS = memory.$(SUFFIX) xerbla.$(SUFFIX) c_abs.$(SUFFIX) z_abs.$(SUFFIX)
66
#COMMONOBJS += slamch.$(SUFFIX) slamc3.$(SUFFIX) dlamch.$(SUFFIX) dlamc3.$(SUFFIX)
77

88
ifdef SMP
9-
COMMONOBJS += blas_server.$(SUFFIX) divtable.$(SUFFIX) blasL1thread.$(SUFFIX)
9+
COMMONOBJS += blas_server.$(SUFFIX) divtable.$(SUFFIX) blasL1thread.$(SUFFIX) blas_server_callback.$(SUFFIX)
1010
ifneq ($(NO_AFFINITY), 1)
1111
COMMONOBJS += init.$(SUFFIX)
1212
endif
@@ -140,6 +140,9 @@ memory.$(SUFFIX) : $(MEMORY) ../../common.h ../../param.h
140140
blas_server.$(SUFFIX) : $(BLAS_SERVER) ../../common.h ../../common_thread.h ../../param.h
141141
$(CC) $(CFLAGS) -c $< -o $(@F)
142142

143+
blas_server_callback.$(SUFFIX) : blas_server_callback.c ../../common.h
144+
$(CC) $(CFLAGS) -c $< -o $(@F)
145+
143146
openblas_set_num_threads.$(SUFFIX) : openblas_set_num_threads.c
144147
$(CC) $(CFLAGS) -c $< -o $(@F)
145148

driver/others/blas_server.c

Lines changed: 162 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ int blas_server_avail __attribute__((aligned(ATTRIBUTE_SIZE))) = 0;
115115

116116
int blas_omp_threads_local = 1;
117117

118+
static void * blas_thread_buffer[MAX_CPU_NUMBER];
119+
118120
/* Local Variables */
119121
#if defined(USE_PTHREAD_LOCK)
120122
static pthread_mutex_t server_lock = PTHREAD_MUTEX_INITIALIZER;
@@ -190,6 +192,10 @@ static int main_status[MAX_CPU_NUMBER];
190192
BLASLONG exit_time[MAX_CPU_NUMBER];
191193
#endif
192194

195+
//Prototypes
196+
static void exec_threads(int , blas_queue_t *, int);
197+
static void adjust_thread_buffers();
198+
193199
static void legacy_exec(void *func, int mode, blas_arg_t *args, void *sb){
194200

195201
if (!(mode & BLAS_COMPLEX)){
@@ -375,7 +381,6 @@ static void* blas_thread_server(void *arg){
375381
/* Thread identifier */
376382
BLASLONG cpu = (BLASLONG)arg;
377383
unsigned int last_tick;
378-
void *buffer, *sa, *sb;
379384
blas_queue_t *queue;
380385

381386
blas_queue_t *tscq;
@@ -395,8 +400,6 @@ blas_queue_t *tscq;
395400
main_status[cpu] = MAIN_ENTER;
396401
#endif
397402

398-
buffer = blas_memory_alloc(2);
399-
400403
#ifdef SMP_DEBUG
401404
fprintf(STDERR, "Server[%2ld] Thread has just been spawned!\n", cpu);
402405
#endif
@@ -456,117 +459,9 @@ blas_queue_t *tscq;
456459
start = rpcc();
457460
#endif
458461

459-
if (queue) {
460-
int (*routine)(blas_arg_t *, void *, void *, void *, void *, BLASLONG) = (int (*)(blas_arg_t *, void *, void *, void *, void *, BLASLONG))queue -> routine;
461-
462-
atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)1);
463-
464-
sa = queue -> sa;
465-
sb = queue -> sb;
466-
467-
#ifdef SMP_DEBUG
468-
if (queue -> args) {
469-
fprintf(STDERR, "Server[%2ld] Calculation started. Mode = 0x%03x M = %3ld N=%3ld K=%3ld\n",
470-
cpu, queue->mode, queue-> args ->m, queue->args->n, queue->args->k);
471-
}
472-
#endif
473-
474-
#ifdef CONSISTENT_FPCSR
475-
#ifdef __aarch64__
476-
__asm__ __volatile__ ("msr fpcr, %0" : : "r" (queue -> sse_mode));
477-
#else
478-
__asm__ __volatile__ ("ldmxcsr %0" : : "m" (queue -> sse_mode));
479-
__asm__ __volatile__ ("fldcw %0" : : "m" (queue -> x87_mode));
480-
#endif
481-
#endif
482-
483-
#ifdef MONITOR
484-
main_status[cpu] = MAIN_RUNNING1;
485-
#endif
486-
487-
//For Loongson servers, like the 3C5000 (featuring 16 cores), applying an
488-
//offset to the buffer is essential for minimizing cache conflicts and optimizing performance.
489-
#if defined(LOONGSON3R5) && !defined(NO_AFFINITY)
490-
char model_name[128];
491-
get_cpu_model(model_name);
492-
if ((strstr(model_name, "3C5000") != NULL) || (strstr(model_name, "3D5000") != NULL))
493-
if (sa == NULL) sa = (void *)((BLASLONG)buffer + (WhereAmI() & 0xf) * GEMM_OFFSET_A);
494-
#endif
495-
if (sa == NULL) sa = (void *)((BLASLONG)buffer + GEMM_OFFSET_A);
496-
497-
if (sb == NULL) {
498-
if (!(queue -> mode & BLAS_COMPLEX)){
499-
#ifdef EXPRECISION
500-
if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE){
501-
sb = (void *)(((BLASLONG)sa + ((QGEMM_P * QGEMM_Q * sizeof(xdouble)
502-
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
503-
} else
504-
#endif
505-
if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE) {
506-
#ifdef BUILD_DOUBLE
507-
sb = (void *)(((BLASLONG)sa + ((DGEMM_P * DGEMM_Q * sizeof(double)
508-
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
509-
#endif
510-
} else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) {
511-
#ifdef BUILD_SINGLE
512-
sb = (void *)(((BLASLONG)sa + ((SGEMM_P * SGEMM_Q * sizeof(float)
513-
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
514-
#endif
515-
} else {
516-
/* Other types in future */
517-
}
518-
} else {
519-
#ifdef EXPRECISION
520-
if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE){
521-
sb = (void *)(((BLASLONG)sa + ((XGEMM_P * XGEMM_Q * 2 * sizeof(xdouble)
522-
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
523-
} else
524-
#endif
525-
if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE){
526-
#ifdef BUILD_COMPLEX16
527-
sb = (void *)(((BLASLONG)sa + ((ZGEMM_P * ZGEMM_Q * 2 * sizeof(double)
528-
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
529-
#endif
530-
} else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) {
531-
#ifdef BUILD_COMPLEX
532-
sb = (void *)(((BLASLONG)sa + ((CGEMM_P * CGEMM_Q * 2 * sizeof(float)
533-
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
534-
#endif
535-
} else {
536-
/* Other types in future */
537-
}
538-
}
539-
queue->sb=sb;
540-
}
541-
542-
#ifdef MONITOR
543-
main_status[cpu] = MAIN_RUNNING2;
544-
#endif
545-
546-
if (queue -> mode & BLAS_LEGACY) {
547-
legacy_exec(routine, queue -> mode, queue -> args, sb);
548-
} else
549-
if (queue -> mode & BLAS_PTHREAD) {
550-
void (*pthreadcompat)(void *) = (void(*)(void*))queue -> routine;
551-
(pthreadcompat)(queue -> args);
552-
} else
553-
(routine)(queue -> args, queue -> range_m, queue -> range_n, sa, sb, queue -> position);
554-
555-
#ifdef SMP_DEBUG
556-
fprintf(STDERR, "Server[%2ld] Calculation finished!\n", cpu);
557-
#endif
558-
559-
#ifdef MONITOR
560-
main_status[cpu] = MAIN_FINISH;
561-
#endif
562-
563-
// arm: make sure all results are written out _before_
564-
// thread is marked as done and other threads use them
565-
MB;
566-
atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)0);
567-
568-
569-
}
462+
if(queue) {
463+
exec_threads(cpu, queue, 0);
464+
}
570465

571466
#ifdef MONITOR
572467
main_status[cpu] = MAIN_DONE;
@@ -588,8 +483,6 @@ blas_queue_t *tscq;
588483
fprintf(STDERR, "Server[%2ld] Shutdown!\n", cpu);
589484
#endif
590485

591-
blas_memory_free(buffer);
592-
593486
//pthread_exit(NULL);
594487

595488
return NULL;
@@ -671,6 +564,9 @@ int blas_thread_init(void){
671564

672565
LOCK_COMMAND(&server_lock);
673566

567+
// Adjust thread buffers
568+
adjust_thread_buffers();
569+
674570
if (!blas_server_avail){
675571

676572
thread_timeout_env=openblas_thread_timeout();
@@ -901,6 +797,18 @@ int exec_blas(BLASLONG num, blas_queue_t *queue){
901797
fprintf(STDERR, "Exec_blas is called. Number of executing threads : %ld\n", num);
902798
#endif
903799

800+
//Redirect to caller's callback routine
801+
if (openblas_threads_callback_) {
802+
int buf_index = 0, i = 0;
803+
#ifndef USE_SIMPLE_THREADED_LEVEL3
804+
for (i = 0; i < num; i ++)
805+
queue[i].position = i;
806+
#endif
807+
openblas_threads_callback_(1, (openblas_dojob_callback) exec_threads, num, sizeof(blas_queue_t), (void*) queue, buf_index);
808+
return 0;
809+
}
810+
811+
904812
#ifdef __ELF__
905813
if (omp_in_parallel && (num > 1)) {
906814
if (omp_in_parallel() > 0) {
@@ -1074,6 +982,14 @@ int BLASFUNC(blas_thread_shutdown)(void){
1074982

1075983
LOCK_COMMAND(&server_lock);
1076984

985+
//Free buffers allocated for threads
986+
for(i=0; i<MAX_CPU_NUMBER; i++){
987+
if(blas_thread_buffer[i]!=NULL){
988+
blas_memory_free(blas_thread_buffer[i]);
989+
blas_thread_buffer[i]=NULL;
990+
}
991+
}
992+
1077993
if (blas_server_avail) {
1078994

1079995
for (i = 0; i < blas_num_threads - 1; i++) {
@@ -1110,5 +1026,135 @@ int BLASFUNC(blas_thread_shutdown)(void){
11101026
return 0;
11111027
}
11121028

1029+
static void adjust_thread_buffers() {
1030+
1031+
int i=0;
1032+
1033+
//adjust buffer for each thread
1034+
for(i=0; i < blas_cpu_number; i++){
1035+
if(blas_thread_buffer[i] == NULL){
1036+
blas_thread_buffer[i] = blas_memory_alloc(2);
1037+
}
1038+
}
1039+
for(; i < MAX_CPU_NUMBER; i++){
1040+
if(blas_thread_buffer[i] != NULL){
1041+
blas_memory_free(blas_thread_buffer[i]);
1042+
blas_thread_buffer[i] = NULL;
1043+
}
1044+
}
1045+
}
1046+
1047+
static void exec_threads(int cpu, blas_queue_t *queue, int buf_index) {
1048+
1049+
int (*routine)(blas_arg_t *, void *, void *, void *, void *, BLASLONG) = (int (*)(blas_arg_t *, void *, void *, void *, void *, BLASLONG))queue -> routine;
1050+
1051+
atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)1);
1052+
1053+
void *buffer = blas_thread_buffer[cpu];
1054+
void *sa = queue -> sa;
1055+
void *sb = queue -> sb;
1056+
1057+
#ifdef SMP_DEBUG
1058+
if (queue -> args) {
1059+
fprintf(STDERR, "Server[%2ld] Calculation started. Mode = 0x%03x M = %3ld N=%3ld K=%3ld\n",
1060+
cpu, queue->mode, queue-> args ->m, queue->args->n, queue->args->k);
1061+
}
1062+
#endif
1063+
1064+
#ifdef CONSISTENT_FPCSR
1065+
#ifdef __aarch64__
1066+
__asm__ __volatile__ ("msr fpcr, %0" : : "r" (queue -> sse_mode));
1067+
#else
1068+
__asm__ __volatile__ ("ldmxcsr %0" : : "m" (queue -> sse_mode));
1069+
__asm__ __volatile__ ("fldcw %0" : : "m" (queue -> x87_mode));
1070+
#endif
1071+
#endif
1072+
1073+
#ifdef MONITOR
1074+
main_status[cpu] = MAIN_RUNNING1;
1075+
#endif
1076+
1077+
//For Loongson servers, like the 3C5000 (featuring 16 cores), applying an
1078+
//offset to the buffer is essential for minimizing cache conflicts and optimizing performance.
1079+
#if defined(LOONGSON3R5) && !defined(NO_AFFINITY)
1080+
char model_name[128];
1081+
get_cpu_model(model_name);
1082+
if ((strstr(model_name, "3C5000") != NULL) || (strstr(model_name, "3D5000") != NULL))
1083+
if (sa == NULL) sa = (void *)((BLASLONG)buffer + (WhereAmI() & 0xf) * GEMM_OFFSET_A);
11131084
#endif
1085+
if (sa == NULL) sa = (void *)((BLASLONG)buffer + GEMM_OFFSET_A);
1086+
1087+
if (sb == NULL) {
1088+
if (!(queue -> mode & BLAS_COMPLEX)){
1089+
#ifdef EXPRECISION
1090+
if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE){
1091+
sb = (void *)(((BLASLONG)sa + ((QGEMM_P * QGEMM_Q * sizeof(xdouble)
1092+
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
1093+
} else
1094+
#endif
1095+
if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE) {
1096+
#ifdef BUILD_DOUBLE
1097+
sb = (void *)(((BLASLONG)sa + ((DGEMM_P * DGEMM_Q * sizeof(double)
1098+
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
1099+
#endif
1100+
} else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) {
1101+
#ifdef BUILD_SINGLE
1102+
sb = (void *)(((BLASLONG)sa + ((SGEMM_P * SGEMM_Q * sizeof(float)
1103+
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
1104+
#endif
1105+
} else {
1106+
/* Other types in future */
1107+
}
1108+
} else {
1109+
#ifdef EXPRECISION
1110+
if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE){
1111+
sb = (void *)(((BLASLONG)sa + ((XGEMM_P * XGEMM_Q * 2 * sizeof(xdouble)
1112+
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
1113+
} else
1114+
#endif
1115+
if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE){
1116+
#ifdef BUILD_COMPLEX16
1117+
sb = (void *)(((BLASLONG)sa + ((ZGEMM_P * ZGEMM_Q * 2 * sizeof(double)
1118+
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
1119+
#endif
1120+
} else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) {
1121+
#ifdef BUILD_COMPLEX
1122+
sb = (void *)(((BLASLONG)sa + ((CGEMM_P * CGEMM_Q * 2 * sizeof(float)
1123+
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
1124+
#endif
1125+
} else {
1126+
/* Other types in future */
1127+
}
1128+
}
1129+
queue->sb=sb;
1130+
}
1131+
1132+
#ifdef MONITOR
1133+
main_status[cpu] = MAIN_RUNNING2;
1134+
#endif
1135+
1136+
if (queue -> mode & BLAS_LEGACY) {
1137+
legacy_exec(routine, queue -> mode, queue -> args, sb);
1138+
} else
1139+
if (queue -> mode & BLAS_PTHREAD) {
1140+
void (*pthreadcompat)(void *) = (void(*)(void*))queue -> routine;
1141+
(pthreadcompat)(queue -> args);
1142+
} else
1143+
(routine)(queue -> args, queue -> range_m, queue -> range_n, sa, sb, queue -> position);
1144+
1145+
#ifdef SMP_DEBUG
1146+
fprintf(STDERR, "Server[%2ld] Calculation finished!\n", cpu);
1147+
#endif
1148+
1149+
#ifdef MONITOR
1150+
main_status[cpu] = MAIN_FINISH;
1151+
#endif
1152+
1153+
// arm: make sure all results are written out _before_
1154+
// thread is marked as done and other threads use them
1155+
MB;
1156+
atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)0);
1157+
1158+
}
11141159

1160+
#endif

0 commit comments

Comments
 (0)