@@ -65,13 +65,33 @@ static void opcbfunc(int status, void *cbdata)
6565 OPAL_PMIX_WAKEUP_THREAD (lock );
6666}
6767
68+ static void infocb (int status ,
69+ opal_list_t * info ,
70+ void * cbdata ,
71+ opal_pmix_release_cbfunc_t release_fn ,
72+ void * release_cbdata )
73+ {
74+ opal_pmix_lock_t * lock = (opal_pmix_lock_t * )cbdata ;
75+ opal_value_t * kv ;
76+
77+ OPAL_LIST_FOREACH (kv , info , opal_value_t ) {
78+ opal_output (0 , "QUERY DATA KEY: %s VALUE %s" , kv -> key , kv -> data .string );
79+ }
80+ if (NULL != release_fn ) {
81+ release_fn (release_cbdata );
82+ }
83+ OPAL_PMIX_WAKEUP_THREAD (lock );
84+ }
85+
6886static void * mylib (void * ptr )
6987{
7088 opal_list_t info , directives ;
7189 opal_value_t * kv ;
7290 int ret ;
7391 opal_pmix_lock_t lock ;
7492 bool init = false;
93+ opal_pmix_query_t * query ;
94+ opal_pmix_pdata_t * pdata ;
7595
7696 OPAL_PMIX_CONSTRUCT_LOCK (& thread_complete );
7797
@@ -150,10 +170,57 @@ static void *mylib(void *ptr)
150170 (void * )& lock );
151171 OPAL_PMIX_WAIT_THREAD (& lock );
152172 OPAL_PMIX_DESTRUCT_LOCK (& lock );
173+ OPAL_LIST_DESTRUCT (& info );
174+ OPAL_LIST_DESTRUCT (& directives );
153175
154176 /* wait for the model callback */
155177 OPAL_PMIX_WAIT_THREAD (& thread_complete );
156178
179+ /* let's do a couple of operations just to verify we can,
180+ * starting with a query */
181+ OBJ_CONSTRUCT (& info , opal_list_t );
182+ query = OBJ_NEW (opal_pmix_query_t );
183+ opal_argv_append_nosize (& query -> keys , OPAL_PMIX_QUERY_NAMESPACES );
184+ opal_list_append (& info , & query -> super );
185+ OPAL_PMIX_CONSTRUCT_LOCK (& lock );
186+ opal_pmix .query (& info , infocb , & lock );
187+ OPAL_PMIX_WAIT_THREAD (& lock );
188+ OPAL_PMIX_DESTRUCT_LOCK (& lock );
189+ OPAL_LIST_DESTRUCT (& info );
190+
191+ /* Get something */
192+ opal_pmix .get (& orte_process_info .my_name ,
193+ "WASSUP" , NULL , & kv );
194+ if (NULL == kv ) {
195+ fprintf (stderr , "ERROR GETTING WASSUP\n" );
196+ } else {
197+ fprintf (stderr , "THREAD WASSUP: %s\n" , kv -> data .string );
198+ OBJ_RELEASE (kv );
199+ }
200+
201+ /* lookup something published by the main thread */
202+ OBJ_CONSTRUCT (& info , opal_list_t );
203+ pdata = OBJ_NEW (opal_pmix_pdata_t );
204+ pdata -> proc = orte_process_info .my_name ;
205+ pdata -> value .key = strdup ("SOMETHING" );
206+ opal_list_append (& info , & pdata -> super );
207+ /* tell the call to wait for the data to be published */
208+ OBJ_CONSTRUCT (& directives , opal_list_t );
209+ kv = OBJ_NEW (opal_value_t );
210+ kv -> key = strdup (OPAL_PMIX_WAIT );
211+ kv -> type = OPAL_INT ;
212+ kv -> data .integer = 0 ; // wait for all
213+ opal_list_append (& directives , & kv -> super );
214+
215+ if (OPAL_SUCCESS != opal_pmix .lookup (& info , & directives )) {
216+ fprintf (stderr , "LOOKUP FAILED\n" );
217+ } else {
218+ pdata = (opal_pmix_pdata_t * )opal_list_get_first (& info );
219+ fprintf (stderr , "LOOKUP RETURNED %s\n" , pdata -> value .data .string );
220+ }
221+ OPAL_LIST_DESTRUCT (& info );
222+ OPAL_LIST_DESTRUCT (& directives );
223+
157224 if (init ) {
158225 /* need to finalize to maintain refcount */
159226 opal_pmix .finalize ();
@@ -170,12 +237,23 @@ int main(int argc, char* argv[])
170237 char * bindings = NULL ;
171238 pid_t pid ;
172239 pthread_t mythread ;
240+ opal_value_t kv , * kptr ;
241+ opal_list_t list ;
173242
174243 MPI_Init (& argc , & argv );
175244 MPI_Comm_rank (MPI_COMM_WORLD , & rank );
176245 MPI_Comm_size (MPI_COMM_WORLD , & size );
177246 pid = getpid ();
178247
248+ /* push something the thread can recognize */
249+ OBJ_CONSTRUCT (& kv , opal_value_t );
250+ kv .key = strdup ("WASSUP" );
251+ kv .type = OPAL_STRING ;
252+ kv .data .string = strdup ("nothing" );
253+ opal_pmix .put (OPAL_PMIX_LOCAL , & kv );
254+ OBJ_DESTRUCT (& kv );
255+ /* no need to commit it as this is strictly within ourselves */
256+
179257 /* spin up a thread */
180258 if (pthread_create (& mythread , NULL , mylib , NULL )) {
181259 fprintf (stderr , "Error creating thread\n" );
@@ -194,6 +272,16 @@ int main(int argc, char* argv[])
194272 rank , size , orte_process_info .num_local_peers , rc ,
195273 (NULL == bindings ) ? "NULL" : bindings );
196274
275+ /* publish something */
276+ OBJ_CONSTRUCT (& list , opal_list_t );
277+ kptr = OBJ_NEW (opal_value_t );
278+ kptr -> key = strdup ("SOMETHING" );
279+ kptr -> type = OPAL_STRING ;
280+ kptr -> data .string = strdup ("SILLY-THING" );
281+ opal_list_append (& list , & kptr -> super );
282+ opal_pmix .publish (& list );
283+ OPAL_LIST_DESTRUCT (& list );
284+
197285 /* wait for the thread to finish */
198286 if (pthread_join (mythread , NULL )) {
199287 fprintf (stderr , "Error joining thread\n" );
0 commit comments