@@ -35,18 +35,15 @@ static void model_callback(int status,
3535 opal_value_t * val ;
3636
3737 /* we can ignore our own callback as we obviously
38- * know that we are MPI */
38+ * know that we are OpenMP */
3939 if (NULL != info ) {
4040 OPAL_LIST_FOREACH (val , info , opal_value_t ) {
41+ if (0 == strcmp (val -> key , OPAL_PMIX_PROGRAMMING_MODEL ) &&
42+ 0 == strcmp (val -> data .string , "OpenMP" )) {
43+ goto cback ;
44+ }
4145 if (OPAL_STRING == val -> type ) {
42- #if 1
4346 opal_output (0 , "Thread Model Callback Key: %s Val %s" , val -> key , val -> data .string );
44- #else
45- if (0 == strcmp (val -> key , OPAL_PMIX_MODEL_LIBRARY_NAME ) &&
46- 0 == strcmp (val -> data .string , "OpenMPI" )) {
47- goto cback ;
48- }
49- #endif
5047 }
5148 }
5249 }
@@ -62,12 +59,39 @@ static void model_callback(int status,
6259 OPAL_PMIX_WAKEUP_THREAD (& thread_complete );
6360}
6461
62+ static void opcbfunc (int status , void * cbdata )
63+ {
64+ opal_pmix_lock_t * lock = (opal_pmix_lock_t * )cbdata ;
65+ OPAL_PMIX_WAKEUP_THREAD (lock );
66+ }
67+
68+ static void infocb (int status ,
69+ opal_list_t * info ,
70+ void * cbdata ,
71+ opal_pmix_release_cbfunc_t release_fn ,
72+ void * release_cbdata )
73+ {
74+ opal_pmix_lock_t * lock = (opal_pmix_lock_t * )cbdata ;
75+ opal_value_t * kv ;
76+
77+ OPAL_LIST_FOREACH (kv , info , opal_value_t ) {
78+ opal_output (0 , "QUERY DATA KEY: %s VALUE %s" , kv -> key , kv -> data .string );
79+ }
80+ if (NULL != release_fn ) {
81+ release_fn (release_cbdata );
82+ }
83+ OPAL_PMIX_WAKEUP_THREAD (lock );
84+ }
85+
6586static void * mylib (void * ptr )
6687{
6788 opal_list_t info , directives ;
6889 opal_value_t * kv ;
6990 int ret ;
7091 opal_pmix_lock_t lock ;
92+ bool init = false;
93+ opal_pmix_query_t * query ;
94+ opal_pmix_pdata_t * pdata ;
7195
7296 OPAL_PMIX_CONSTRUCT_LOCK (& thread_complete );
7397
@@ -94,9 +118,31 @@ static void *mylib(void *ptr)
94118 kv -> data .string = strdup ("PTHREAD" );
95119 opal_list_append (& info , & kv -> super );
96120
97- /* call pmix to initialize these values */
98- ret = opal_pmix .init (& info );
99- OPAL_LIST_DESTRUCT (& info );
121+ /* see if pmix is already initialized */
122+ if (opal_pmix .initialized ()) {
123+ /* mark that this isn't to go to any default event handler - pmix_init
124+ * takes care of that for us, but we have to explicitly do it here */
125+ kv = OBJ_NEW (opal_value_t );
126+ kv -> key = strdup (OPAL_PMIX_EVENT_NON_DEFAULT );
127+ kv -> type = OPAL_BOOL ;
128+ kv -> data .flag = true;
129+ opal_list_append (& info , & kv -> super );
130+ /* it is, so let's just use the event notification
131+ * API to let everyone know we are here */
132+ OPAL_PMIX_CONSTRUCT_LOCK (& lock );
133+ ret = opal_pmix .notify_event (OPAL_ERR_MODEL_DECLARED ,
134+ & orte_process_info .my_name ,
135+ OPAL_PMIX_RANGE_PROC_LOCAL , & info ,
136+ opcbfunc , & lock );
137+ OPAL_PMIX_WAIT_THREAD (& lock );
138+ OPAL_PMIX_DESTRUCT_LOCK (& lock );
139+ OPAL_LIST_DESTRUCT (& info );
140+ } else {
141+ /* call pmix to initialize these values */
142+ ret = opal_pmix .init (& info );
143+ OPAL_LIST_DESTRUCT (& info );
144+ init = true;
145+ }
100146
101147 /* register to receive model callbacks */
102148
@@ -124,12 +170,61 @@ static void *mylib(void *ptr)
124170 (void * )& lock );
125171 OPAL_PMIX_WAIT_THREAD (& lock );
126172 OPAL_PMIX_DESTRUCT_LOCK (& lock );
173+ OPAL_LIST_DESTRUCT (& info );
174+ OPAL_LIST_DESTRUCT (& directives );
127175
128176 /* wait for the model callback */
129177 OPAL_PMIX_WAIT_THREAD (& thread_complete );
130178
131- /* finalize */
132- opal_pmix .finalize ();
179+ /* let's do a couple of operations just to verify we can,
180+ * starting with a query */
181+ OBJ_CONSTRUCT (& info , opal_list_t );
182+ query = OBJ_NEW (opal_pmix_query_t );
183+ opal_argv_append_nosize (& query -> keys , OPAL_PMIX_QUERY_NAMESPACES );
184+ opal_list_append (& info , & query -> super );
185+ OPAL_PMIX_CONSTRUCT_LOCK (& lock );
186+ opal_pmix .query (& info , infocb , & lock );
187+ OPAL_PMIX_WAIT_THREAD (& lock );
188+ OPAL_PMIX_DESTRUCT_LOCK (& lock );
189+ OPAL_LIST_DESTRUCT (& info );
190+
191+ /* Get something */
192+ opal_pmix .get (& orte_process_info .my_name ,
193+ "WASSUP" , NULL , & kv );
194+ if (NULL == kv ) {
195+ fprintf (stderr , "ERROR GETTING WASSUP\n" );
196+ } else {
197+ fprintf (stderr , "THREAD WASSUP: %s\n" , kv -> data .string );
198+ OBJ_RELEASE (kv );
199+ }
200+
201+ /* lookup something published by the main thread */
202+ OBJ_CONSTRUCT (& info , opal_list_t );
203+ pdata = OBJ_NEW (opal_pmix_pdata_t );
204+ pdata -> proc = orte_process_info .my_name ;
205+ pdata -> value .key = strdup ("SOMETHING" );
206+ opal_list_append (& info , & pdata -> super );
207+ /* tell the call to wait for the data to be published */
208+ OBJ_CONSTRUCT (& directives , opal_list_t );
209+ kv = OBJ_NEW (opal_value_t );
210+ kv -> key = strdup (OPAL_PMIX_WAIT );
211+ kv -> type = OPAL_INT ;
212+ kv -> data .integer = 0 ; // wait for all
213+ opal_list_append (& directives , & kv -> super );
214+
215+ if (OPAL_SUCCESS != opal_pmix .lookup (& info , & directives )) {
216+ fprintf (stderr , "LOOKUP FAILED\n" );
217+ } else {
218+ pdata = (opal_pmix_pdata_t * )opal_list_get_first (& info );
219+ fprintf (stderr , "LOOKUP RETURNED %s\n" , pdata -> value .data .string );
220+ }
221+ OPAL_LIST_DESTRUCT (& info );
222+ OPAL_LIST_DESTRUCT (& directives );
223+
224+ if (init ) {
225+ /* need to finalize to maintain refcount */
226+ opal_pmix .finalize ();
227+ }
133228
134229 /* done */
135230 return NULL ;
@@ -142,12 +237,23 @@ int main(int argc, char* argv[])
142237 char * bindings = NULL ;
143238 pid_t pid ;
144239 pthread_t mythread ;
240+ opal_value_t kv , * kptr ;
241+ opal_list_t list ;
145242
146243 MPI_Init (& argc , & argv );
147244 MPI_Comm_rank (MPI_COMM_WORLD , & rank );
148245 MPI_Comm_size (MPI_COMM_WORLD , & size );
149246 pid = getpid ();
150247
248+ /* push something the thread can recognize */
249+ OBJ_CONSTRUCT (& kv , opal_value_t );
250+ kv .key = strdup ("WASSUP" );
251+ kv .type = OPAL_STRING ;
252+ kv .data .string = strdup ("nothing" );
253+ opal_pmix .put (OPAL_PMIX_LOCAL , & kv );
254+ OBJ_DESTRUCT (& kv );
255+ /* no need to commit it as this is strictly within ourselves */
256+
151257 /* spin up a thread */
152258 if (pthread_create (& mythread , NULL , mylib , NULL )) {
153259 fprintf (stderr , "Error creating thread\n" );
@@ -166,6 +272,16 @@ int main(int argc, char* argv[])
166272 rank , size , orte_process_info .num_local_peers , rc ,
167273 (NULL == bindings ) ? "NULL" : bindings );
168274
275+ /* publish something */
276+ OBJ_CONSTRUCT (& list , opal_list_t );
277+ kptr = OBJ_NEW (opal_value_t );
278+ kptr -> key = strdup ("SOMETHING" );
279+ kptr -> type = OPAL_STRING ;
280+ kptr -> data .string = strdup ("SILLY-THING" );
281+ opal_list_append (& list , & kptr -> super );
282+ opal_pmix .publish (& list );
283+ OPAL_LIST_DESTRUCT (& list );
284+
169285 /* wait for the thread to finish */
170286 if (pthread_join (mythread , NULL )) {
171287 fprintf (stderr , "Error joining thread\n" );
0 commit comments