99 * University of Stuttgart. All rights reserved.
1010 * Copyright (c) 2004-2005 The Regents of the University of California.
1111 * All rights reserved.
12- * Copyright (c) 2008-2015 University of Houston. All rights reserved.
12+ * Copyright (c) 2008-2021 University of Houston. All rights reserved.
1313 * Copyright (c) 2017-2018 Research Organization for Information Science
1414 * and Technology (RIST). All rights reserved.
1515 * Copyright (c) 2017 IBM Corporation. All rights reserved.
@@ -56,7 +56,6 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
5656 struct ompi_datatype_t * datatype ,
5757 ompi_status_public_t * status )
5858{
59- MPI_Aint position = 0 ;
6059 MPI_Aint total_bytes = 0 ; /* total bytes to be read */
6160 MPI_Aint bytes_to_read_in_cycle = 0 ; /* left to be read in a cycle*/
6261 MPI_Aint bytes_per_cycle = 0 ; /* total read in each cycle by each process*/
@@ -75,7 +74,6 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
7574 int iov_index = 0 ;
7675 size_t current_position = 0 ;
7776 struct iovec * local_iov_array = NULL , * global_iov_array = NULL ;
78- char * receive_buf = NULL ;
7977 MPI_Aint * memory_displacements = NULL ;
8078 /* global iovec at the readers that contain the iovecs created from
8179 file_set_view */
@@ -96,12 +94,12 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
9694 size_t max_data = 0 ;
9795 MPI_Aint * total_bytes_per_process = NULL ;
9896 ompi_datatype_t * * sendtype = NULL ;
99- MPI_Request * send_req = NULL , recv_req = NULL ;
97+ MPI_Request * send_req = NULL ;
98+ MPI_Request recv_req = MPI_REQUEST_NULL ;
10099 int my_aggregator = -1 ;
101- bool recvbuf_is_contiguous = false;
102- size_t ftype_size ;
103- ptrdiff_t ftype_extent , lb ;
104100
101+ int * blocklength_proc = NULL ;
102+ ptrdiff_t * displs_proc = NULL ;
105103
106104#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
107105 double read_time = 0.0 , start_read_time = 0.0 , end_read_time = 0.0 ;
@@ -113,32 +111,16 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
113111 /**************************************************************************
114112 ** 1. In case the data is not contigous in memory, decode it into an iovec
115113 **************************************************************************/
116-
117- opal_datatype_type_size ( & datatype -> super , & ftype_size );
118- opal_datatype_get_extent ( & datatype -> super , & lb , & ftype_extent );
119-
120- if ( (ftype_extent == (ptrdiff_t ) ftype_size ) &&
121- opal_datatype_is_contiguous_memory_layout (& datatype -> super ,1 ) &&
122- 0 == lb ) {
123- recvbuf_is_contiguous = true;
124- }
125-
126-
127- if (! recvbuf_is_contiguous ) {
128- ret = mca_common_ompio_decode_datatype ((struct ompio_file_t * )fh ,
129- datatype ,
130- count ,
131- buf ,
132- & max_data ,
133- fh -> f_mem_convertor ,
134- & decoded_iov ,
135- & iov_count );
136- if (OMPI_SUCCESS != ret ){
137- goto exit ;
138- }
139- }
140- else {
141- max_data = count * datatype -> super .size ;
114+ ret = mca_common_ompio_decode_datatype ((struct ompio_file_t * )fh ,
115+ datatype ,
116+ count ,
117+ buf ,
118+ & max_data ,
119+ fh -> f_mem_convertor ,
120+ & decoded_iov ,
121+ & iov_count );
122+ if (OMPI_SUCCESS != ret ){
123+ goto exit ;
142124 }
143125
144126 if ( MPI_STATUS_IGNORE != status ) {
@@ -743,6 +725,7 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
743725 start_rcomm_time = MPI_Wtime ();
744726#endif
745727 for (i = 0 ;i < fh -> f_procs_per_group ;i ++ ){
728+ size_t datatype_size ;
746729 send_req [i ] = MPI_REQUEST_NULL ;
747730 if ( 0 < disp_index [i ] ) {
748731 ompi_datatype_create_hindexed (disp_index [i ],
@@ -751,16 +734,20 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
751734 MPI_BYTE ,
752735 & sendtype [i ]);
753736 ompi_datatype_commit (& sendtype [i ]);
754- ret = MCA_PML_CALL (isend (global_buf ,
755- 1 ,
756- sendtype [i ],
757- fh -> f_procs_in_group [i ],
758- 123 ,
759- MCA_PML_BASE_SEND_STANDARD ,
760- fh -> f_comm ,
761- & send_req [i ]));
762- if (OMPI_SUCCESS != ret ){
763- goto exit ;
737+ opal_datatype_type_size (& sendtype [i ]-> super , & datatype_size );
738+
739+ if (datatype_size ) {
740+ ret = MCA_PML_CALL (isend (global_buf ,
741+ 1 ,
742+ sendtype [i ],
743+ fh -> f_procs_in_group [i ],
744+ FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG ,
745+ MCA_PML_BASE_SEND_STANDARD ,
746+ fh -> f_comm ,
747+ & send_req [i ]));
748+ if (OMPI_SUCCESS != ret ){
749+ goto exit ;
750+ }
764751 }
765752 }
766753 }
@@ -773,35 +760,80 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
773760 /**********************************************************
774761 *** 7f. Scatter the Data from the readers
775762 *********************************************************/
776- if ( recvbuf_is_contiguous ) {
777- receive_buf = & ((char * )buf )[position ];
778- }
779- else if (bytes_received ) {
780- /* allocate a receive buffer and copy the data that needs
781- to be received into it in case the data is non-contigous
782- in memory */
783- receive_buf = malloc (bytes_received );
784- if (NULL == receive_buf ) {
763+ if (bytes_received ) {
764+ size_t remaining = bytes_received ;
765+ int block_index = -1 ;
766+ int blocklength_size = INIT_LEN ;
767+
768+ ptrdiff_t recv_mem_address = 0 ;
769+ ompi_datatype_t * newType = MPI_DATATYPE_NULL ;
770+
771+ blocklength_proc = (int * ) calloc (blocklength_size , sizeof (int ));
772+ displs_proc = (ptrdiff_t * ) calloc (blocklength_size , sizeof (ptrdiff_t ));
773+
774+ if (NULL == blocklength_proc || NULL == displs_proc ) {
785775 opal_output (1 , "OUT OF MEMORY\n" );
786776 ret = OMPI_ERR_OUT_OF_RESOURCE ;
787777 goto exit ;
788778 }
789- }
779+
780+ while (remaining ) {
781+ block_index ++ ;
782+
783+ if (0 == block_index ) {
784+ recv_mem_address = (ptrdiff_t ) (decoded_iov [iov_index ].iov_base ) + current_position ;
785+ }
786+ else {
787+ // Reallocate more memory if blocklength_size is not enough
788+ if (0 == block_index % INIT_LEN ) {
789+ blocklength_size += INIT_LEN ;
790+ blocklength_proc = (int * ) realloc (blocklength_proc , blocklength_size * sizeof (int ));
791+ displs_proc = (ptrdiff_t * ) realloc (displs_proc , blocklength_size * sizeof (ptrdiff_t ));
792+ }
793+ displs_proc [block_index ] = (ptrdiff_t ) (decoded_iov [iov_index ].iov_base ) +
794+ current_position - recv_mem_address ;
795+ }
796+
797+ if (remaining >= (decoded_iov [iov_index ].iov_len - current_position )) {
798+ blocklength_proc [block_index ] = decoded_iov [iov_index ].iov_len - current_position ;
799+
800+ remaining = remaining - blocklength_proc [block_index ];
801+ iov_index = iov_index + 1 ;
802+ current_position = 0 ;
803+ }
804+ else {
805+ blocklength_proc [block_index ] = remaining ;
806+ current_position += remaining ;
807+ remaining = 0 ;
808+ }
809+ }
810+
811+ ompi_datatype_create_hindexed (block_index + 1 ,
812+ blocklength_proc ,
813+ displs_proc ,
814+ MPI_BYTE ,
815+ & newType );
816+ ompi_datatype_commit (& newType );
790817
791818#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
792- start_rcomm_time = MPI_Wtime ();
819+ start_rcomm_time = MPI_Wtime ();
793820#endif
794- ret = MCA_PML_CALL (irecv (receive_buf ,
795- bytes_received ,
796- MPI_BYTE ,
797- my_aggregator ,
798- 123 ,
799- fh -> f_comm ,
800- & recv_req ));
801- if (OMPI_SUCCESS != ret ){
802- goto exit ;
803- }
821+ ret = MCA_PML_CALL (irecv ((char * )recv_mem_address ,
822+ 1 ,
823+ newType ,
824+ my_aggregator ,
825+ FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG ,
826+ fh -> f_comm ,
827+ & recv_req ));
828+
829+ if ( MPI_DATATYPE_NULL != newType ) {
830+ ompi_datatype_destroy (& newType );
831+ }
804832
833+ if (OMPI_SUCCESS != ret ){
834+ goto exit ;
835+ }
836+ }
805837
806838 if (my_aggregator == fh -> f_rank ){
807839 ret = ompi_request_wait_all (fh -> f_procs_per_group ,
@@ -816,50 +848,12 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
816848 if (OMPI_SUCCESS != ret ){
817849 goto exit ;
818850 }
819- position += bytes_received ;
820-
821- /* If data is not contigous in memory, copy the data from the
822- receive buffer into the buffer passed in */
823- if (!recvbuf_is_contiguous ) {
824- ptrdiff_t mem_address ;
825- size_t remaining = 0 ;
826- size_t temp_position = 0 ;
827851
828- remaining = bytes_received ;
829-
830- while (remaining ) {
831- mem_address = (ptrdiff_t )
832- (decoded_iov [iov_index ].iov_base ) + current_position ;
833-
834- if (remaining >=
835- (decoded_iov [iov_index ].iov_len - current_position )) {
836- memcpy ((IOVBASE_TYPE * ) mem_address ,
837- receive_buf + temp_position ,
838- decoded_iov [iov_index ].iov_len - current_position );
839- remaining = remaining -
840- (decoded_iov [iov_index ].iov_len - current_position );
841- temp_position = temp_position +
842- (decoded_iov [iov_index ].iov_len - current_position );
843- iov_index = iov_index + 1 ;
844- current_position = 0 ;
845- }
846- else {
847- memcpy ((IOVBASE_TYPE * ) mem_address ,
848- receive_buf + temp_position ,
849- remaining );
850- current_position = current_position + remaining ;
851- remaining = 0 ;
852- }
853- }
854-
855- if (NULL != receive_buf ) {
856- free (receive_buf );
857- receive_buf = NULL ;
858- }
859- }
860852#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
861- end_rcomm_time = MPI_Wtime ();
862- rcomm_time += end_rcomm_time - start_rcomm_time ;
853+ if (bytes_received ) {
854+ end_rcomm_time = MPI_Wtime ();
855+ rcomm_time += end_rcomm_time - start_rcomm_time ;
856+ }
863857#endif
864858 } /* end for (index=0; index < cycles; index ++) */
865859
@@ -881,12 +875,6 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
881875#endif
882876
883877exit :
884- if (!recvbuf_is_contiguous ) {
885- if (NULL != receive_buf ) {
886- free (receive_buf );
887- receive_buf = NULL ;
888- }
889- }
890878 if (NULL != global_buf ) {
891879 free (global_buf );
892880 global_buf = NULL ;
@@ -916,6 +904,17 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
916904 free (displs );
917905 displs = NULL ;
918906 }
907+
908+ if (NULL != blocklength_proc ) {
909+ free (blocklength_proc );
910+ blocklength_proc = NULL ;
911+ }
912+
913+ if (NULL != displs_proc ) {
914+ free (displs_proc );
915+ displs_proc = NULL ;
916+ }
917+
919918 if (my_aggregator == fh -> f_rank ) {
920919
921920 if (NULL != sorted_file_offsets ){
0 commit comments