|
| 1 | +""" |
| 2 | +.. _ref_incremental: |
| 3 | +
|
| 4 | +Incremental |
| 5 | +=========== |
| 6 | +""" |
| 7 | + |
| 8 | +from ansys.dpf import core |
| 9 | + |
| 10 | +from typing import Dict, Any |
| 11 | + |
| 12 | + |
| 13 | +class IncrementalHelper: |
| 14 | + """Provides an API to transform an existing workflow into an incrementally evaluating one. |
| 15 | +
|
| 16 | + It works by plugging operators into an incomplete workflow. |
| 17 | +
|
| 18 | + Example |
| 19 | + ------- |
| 20 | + >>> from ansys.dpf import core as dpf |
| 21 | + >>> from ansys.dpf.core import examples |
| 22 | + >>> path = examples.find_msup_transient() |
| 23 | + >>> ds = dpf.DataSources(path) |
| 24 | + >>> scoping = dpf.time_freq_scoping_factory.scoping_on_all_time_freqs(ds) |
| 25 | + >>> |
| 26 | + >>> result_op = dpf.operators.result.displacement(data_sources=ds, time_scoping=scoping) |
| 27 | + >>> minmax_op = dpf.operators.min_max.min_max_fc_inc(result_op) |
| 28 | + >>> |
| 29 | + >>> new_op = dpf.split_workflow_in_chunks(result_op, minmax_op, scoping, chunk_size=5) |
| 30 | + >>> min_field = new_op.get_output(0, dpf.types.field) |
| 31 | + >>> max_field = new_op.get_output(1, dpf.types.field) |
| 32 | + """ |
| 33 | + |
| 34 | + def __init__( |
| 35 | + self, |
| 36 | + start_op: core.Operator, |
| 37 | + end_op: core.Operator, |
| 38 | + scoping: core.Scoping, |
| 39 | + scoping_pin: int = None, |
| 40 | + ): |
| 41 | + """Constructs an IncrementalHelper object. |
| 42 | +
|
| 43 | + Given the first and the last operator of a workflow, as well as the scoping. |
| 44 | +
|
| 45 | + This class can be used to simplify the use of incremental operators, and automatically |
| 46 | + be enabled to incrementally evaluate a workflow. |
| 47 | +
|
| 48 | + Under the constraint that the end_op supports incremental evaluation. |
| 49 | +
|
| 50 | + Parameters |
| 51 | + ---------- |
| 52 | + start_op : Operator |
| 53 | + First operator in the workflow to convert |
| 54 | + end_op : Operator |
| 55 | + Last operator in the workflow to convert (Operator providing the meaningful output) |
| 56 | + scoping : Scoping |
| 57 | + Scoping used to chunk the data |
| 58 | + scoping_pin : int, optional |
| 59 | + Pin number of the scoping on the first operator, otherwise it is deduced with types |
| 60 | + """ |
| 61 | + # Input operator should accept a scoping |
| 62 | + # Last operator should support incremental evaluation |
| 63 | + # but as we don't have a consistent method to check, |
| 64 | + # it should be permissive in the case the specification isn't up to date |
| 65 | + self._start_op = start_op |
| 66 | + self._end_op = self._map_to_incremental(end_op) |
| 67 | + |
| 68 | + self._scoping = scoping |
| 69 | + self._scoping_pin = self._find_scoping_pin(scoping_pin) |
| 70 | + |
| 71 | + def estimate_size(self, max_bytes: int, _dict_inputs: Dict[int, Any] = {}) -> int: |
| 72 | + """Estimates the chunk size from the estimated number of bytes outputted in one iteration. |
| 73 | +
|
| 74 | + Estimation is based on the size of the output for one ID of the given time_scoping, |
| 75 | + so it will run the operator for only one iteration. |
| 76 | +
|
| 77 | + It only supports Field and FieldContainer. |
| 78 | + For other types, you should specify chunk_size argument in the split() method. |
| 79 | +
|
| 80 | + Parameters |
| 81 | + ---------- |
| 82 | + max_bytes : int |
| 83 | + Max allowed size of an output from the first operator, for one iteration (in bytes). |
| 84 | + _dict_inputs: dict[int,any] |
| 85 | + Dictionary associating pin number to inputs, for evaluating output of one iteration. |
| 86 | + """ |
| 87 | + # Evaluate for the first element to try to guess memory consumption |
| 88 | + # It is best to use with a lot of elements |
| 89 | + first_id = self._scoping.ids[0] |
| 90 | + srv = self._scoping._server |
| 91 | + loc = self._scoping.location |
| 92 | + _dict_inputs[self._scoping_pin] = core.Scoping(server=srv, ids=[first_id], location=loc) |
| 93 | + |
| 94 | + outputs = self._prerun(_dict_inputs) |
| 95 | + |
| 96 | + _outputs = outputs._outputs |
| 97 | + data = map(lambda o: o.get_data(), _outputs) |
| 98 | + # Output sizes of all inputs for one iteration |
| 99 | + sizes = map(lambda obj: self._compute_size(obj), data) |
| 100 | + |
| 101 | + # Total size for one ID in the scoping |
| 102 | + size_for_one = sum(sizes) |
| 103 | + # total_size = size_for_one * self._scoping.size |
| 104 | + |
| 105 | + num_iter = int(max_bytes / size_for_one) |
| 106 | + num_iter = min(max(num_iter, 1), self._scoping.size) # clamp(num_iter, 1, scoping size) |
| 107 | + return num_iter # math.gcd(num_iter, self._scoping.size) |
| 108 | + |
| 109 | + def _compute_size(self, obj): |
| 110 | + if isinstance(obj, core.FieldsContainer): |
| 111 | + fc = obj |
| 112 | + return self._compute_size(fc[0]) |
| 113 | + elif isinstance(obj, core.Field): |
| 114 | + field = obj |
| 115 | + # Double = 8 bytes assumption |
| 116 | + return field.size * 8 |
| 117 | + |
| 118 | + raise NotImplementedError() |
| 119 | + |
| 120 | + def _prerun(self, _dict_inputs: Dict[int, Any]): |
| 121 | + """""" |
| 122 | + |
| 123 | + for pin_idx, val in _dict_inputs.items(): |
| 124 | + self._start_op.connect(pin_idx, val) |
| 125 | + self._start_op.run() |
| 126 | + return self._start_op.outputs |
| 127 | + |
| 128 | + # Transforms a user workflow: |
| 129 | + # |
| 130 | + # +----------+ +---------------+ +---------+ |
| 131 | + # scoping ->| start_op | -> | middle ops... | -> | end_op | -> |
| 132 | + # +----------+ +---------------+ +---------+ |
| 133 | + # |
| 134 | + # Into a new workflow like this: |
| 135 | + # |
| 136 | + # +----------+ +---------------+ +---------+ |
| 137 | + # scoping ->| start_op | -> | middle ops... | -> | end_op | |
| 138 | + # \ +----------+ +---------------+ +---------+ |
| 139 | + # \ \ | |
| 140 | + # \ \ +------------------+ | (pins remaps) |
| 141 | + # \ \> | | | +----------+ +-----------+ |
| 142 | + # \ scop_pin -> | chunk_in | +-> | | -> | forward | -> final |
| 143 | + # +----------> | for_each_range | iterables | | | (new end) | outputs |
| 144 | + # chunk_size -> | | -----------> | for_each | +-----------+ |
| 145 | + # +------------------+ | | |
| 146 | + # end_input_pin--> | | |
| 147 | + # +----------+ |
| 148 | + def split( |
| 149 | + self, chunk_size: int, end_input_pin: int = 0, rescope: bool = False |
| 150 | + ) -> core.Operator: |
| 151 | + """Integrate given operators into a new workflow enabling incremental evaluation. |
| 152 | +
|
| 153 | + Given a chunk size (multiple of given scoping), it will provide a new operator to retrieve |
| 154 | + outputs from, and enable incremental evaluation, notably reducing peak memory usage. |
| 155 | +
|
| 156 | + Parameters |
| 157 | + ---------- |
| 158 | + chunk_size : int |
| 159 | + Number of iterations per run |
| 160 | + end_input_pin : int, optional |
| 161 | + Pin number of the output to use from the first operator (default = 0) |
| 162 | + rescope : bool, optional |
| 163 | + Rescope all the outputs based on the given scoping (default = False) |
| 164 | + """ |
| 165 | + # Enables incremental evaluation: |
| 166 | + # Using for_each, chunk_in_for_each_range and incremental version of the last operator |
| 167 | + # by returning two operators with remapped inputs and outputs to other operators |
| 168 | + |
| 169 | + _server = self._start_op._server |
| 170 | + |
| 171 | + for_each = core.Operator("for_each", server=_server) |
| 172 | + split_in_range = core.Operator("chunk_in_for_each_range", server=_server) |
| 173 | + forward = core.Operator("forward", server=_server) |
| 174 | + |
| 175 | + split_in_range.connect_operator_as_input(1, self._start_op) |
| 176 | + split_in_range.connect(2, self._scoping_pin) |
| 177 | + split_in_range.connect(3, self._scoping) |
| 178 | + split_in_range.connect(4, chunk_size) |
| 179 | + |
| 180 | + for_each.connect(0, split_in_range, 0) |
| 181 | + for_each.connect(2, end_input_pin) |
| 182 | + |
| 183 | + # connect inputs |
| 184 | + dict_outputs = core.Operator.operator_specification( |
| 185 | + op_name=self._end_op.name, server=_server |
| 186 | + ).outputs |
| 187 | + if not dict_outputs: |
| 188 | + # temporary patch for incremental:: operators |
| 189 | + dict_outputs = {0: None} |
| 190 | + |
| 191 | + fe_pin_idx = 3 # see doc of for_each |
| 192 | + for pin_idx in dict_outputs.keys(): |
| 193 | + # connect end_op to for_each |
| 194 | + for_each.connect(fe_pin_idx, self._end_op, pin_idx) |
| 195 | + # remap |
| 196 | + forward.connect(pin_idx, for_each, fe_pin_idx) |
| 197 | + fe_pin_idx += 1 |
| 198 | + |
| 199 | + output = forward |
| 200 | + |
| 201 | + if rescope: |
| 202 | + new_forward = core.Operator("forward") |
| 203 | + for pin_idx in dict_outputs.keys(): |
| 204 | + rescope = core.Operator("Rescope") |
| 205 | + rescope.connect(0, forward, pin_idx) |
| 206 | + rescope.connect(1, self._scoping) |
| 207 | + new_forward.connect(pin_idx, rescope, 0) |
| 208 | + |
| 209 | + output = new_forward |
| 210 | + return output |
| 211 | + |
| 212 | + def _map_to_incremental(self, end_op: core.Operator): |
| 213 | + # The goal of this function is to validate that a given operator is indeed incremental. |
| 214 | + # If an operator is found to not support incremental evaluation, it must not be strict |
| 215 | + # it should only output warnings |
| 216 | + # because this function -by design- may be outdated. |
| 217 | + inc_operators = [ |
| 218 | + "accumulate_level_over_label_fc", |
| 219 | + "accumulate_min_over_label_fc", |
| 220 | + "accumulate_over_label_fc", |
| 221 | + "average_over_label_fc", |
| 222 | + "min_max_inc", |
| 223 | + "min_max_fc_inc", |
| 224 | + "max_over_time_by_entity", |
| 225 | + "min_max_over_time_by_entity", |
| 226 | + "min_max_by_time", |
| 227 | + "min_over_time_by_entity", |
| 228 | + "time_of_max_by_entity", |
| 229 | + "time_of_min_by_entity", |
| 230 | + "incremental::merge::property_field", |
| 231 | + "incremental::merge::mesh", |
| 232 | + "incremental::merge::field", |
| 233 | + "incremental::merge::fields_container", |
| 234 | + ] |
| 235 | + |
| 236 | + map_to_inc = {"min_max": "min_max_inc", "min_max_fc": "min_max_fc_inc"} |
| 237 | + |
| 238 | + if end_op.name not in inc_operators: |
| 239 | + print(f"WARNING: Operator named {end_op.name} may not support incremental evaluation") |
| 240 | + if end_op.name in map_to_inc.keys(): |
| 241 | + print( |
| 242 | + f"An operator named {map_to_inc[end_op.name]} supports incremental evaluation" |
| 243 | + ) |
| 244 | + |
| 245 | + if "incremental" in end_op.config.available_config_options: |
| 246 | + end_op.config.set_config_option("incremental", True) |
| 247 | + |
| 248 | + return end_op |
| 249 | + |
| 250 | + def _find_scoping_pin(self, pin_idx): |
| 251 | + dict_inputs = self._start_op.inputs._dict_inputs |
| 252 | + # validate given pin_idx |
| 253 | + if pin_idx != None and pin_idx in dict_inputs: |
| 254 | + pin_spec = dict_inputs[pin_idx] |
| 255 | + if "scoping" in pin_spec.type_names: |
| 256 | + return pin_idx |
| 257 | + |
| 258 | + # look for scoping pin |
| 259 | + for pin_idx, spec in dict_inputs.items(): |
| 260 | + if "scoping" in spec.type_names: |
| 261 | + return pin_idx |
| 262 | + |
| 263 | + raise Exception( |
| 264 | + f"Scoping pin could not be found in start_op with name '{self._start_op.name}'" |
| 265 | + ) |
| 266 | + |
| 267 | + |
| 268 | +def split_workflow_in_chunks( |
| 269 | + start_op: core.Operator, |
| 270 | + end_op: core.Operator, |
| 271 | + scoping: core.Scoping, |
| 272 | + rescope: bool = False, |
| 273 | + max_bytes: int = 1024**3, |
| 274 | + dict_inputs: Dict[int, Any] = {}, |
| 275 | + chunk_size: int = None, |
| 276 | + scoping_pin: int = None, |
| 277 | + end_input_pin: int = 0, |
| 278 | +): |
| 279 | + """Transforms a workflow into an incrementally evaluating one. |
| 280 | +
|
| 281 | + It wraps in one method the functionality of the IncrementalHelper class as well |
| 282 | + as the estimation of the chunk size. |
| 283 | +
|
| 284 | + If no chunk_size is specified, the function will attempt to estimate the value |
| 285 | + by calling IncrementalHelper.estimate_size(max_bytes, dict_inputs). |
| 286 | +
|
| 287 | + If no scoping_pin is specified, the function will attempt to deduce the correct pin, |
| 288 | + which would be the first input pin matching a scoping type. |
| 289 | +
|
| 290 | + Parameters |
| 291 | + ---------- |
| 292 | + start_op : Operator |
| 293 | + Initial operator of the workflow to convert |
| 294 | + end_op : Operator |
| 295 | + Last operator of the workflow to convert |
| 296 | + scoping : Scoping |
| 297 | + Scoping to split across multiple evaluation |
| 298 | + rescope : bool, optional |
| 299 | + If enabled, will rescope final outputs with the given scoping (default = False) |
| 300 | + max_bytes : int, optional |
| 301 | + Max allowed size for the output from the first operator (default = 1024**3) |
| 302 | + dict_inputs : dict[int, any], optional |
| 303 | + Inputs to pass to the first operator, used only for the estimation run (default = {}) |
| 304 | + chunk_size = int, optional |
| 305 | + Maximum number of scoping elements to process in an iteration (default = None) |
| 306 | + scoping_pin : int, optional |
| 307 | + The pin number on the first operator to bind the scoping (default = None) |
| 308 | + end_input_pin : int, optional |
| 309 | + Pin number of the output to use from the first operator(default = 0) |
| 310 | + """ |
| 311 | + splitter = IncrementalHelper(start_op, end_op, scoping, scoping_pin) |
| 312 | + |
| 313 | + if chunk_size == None: |
| 314 | + print(f"Estimating chunk_size with max_bytes: {max_bytes}") |
| 315 | + chunk_size = splitter.estimate_size(max_bytes, dict_inputs) |
| 316 | + print(f"Done. chunk_size set to {chunk_size} (scoping size: {scoping.size})") |
| 317 | + |
| 318 | + return splitter.split(chunk_size, end_input_pin, rescope) |
0 commit comments