|
1 | 1 | from concurrent.futures import ProcessPoolExecutor, wait |
2 | | -from multiprocessing import Manager |
| 2 | +from multiprocessing import Manager, cpu_count |
3 | 3 | import logging |
4 | 4 | from threading import Lock |
5 | 5 |
|
@@ -327,12 +327,14 @@ def split_into_chunks(list, chunk_size): |
327 | 327 | return [list[i:i+chunk_size] for i in range(0, len(list), chunk_size)] |
328 | 328 |
|
329 | 329 | # Update a single version |
330 | | -def update_version(db, tag, pool, manager, chunk_size, dts_comp_support): |
| 330 | +def update_version(db, tag, pool, manager, dts_comp_support): |
331 | 331 | state = build_partial_state(db, tag) |
332 | 332 |
|
333 | 333 | # Collect blobs to process and split list of blobs into chunks |
334 | 334 | idxes = [(idx, hash, filename) for (idx, (hash, filename)) in state.idx_to_hash_and_filename.items()] |
335 | | - chunks = split_into_chunks(idxes, chunk_size) |
| 335 | + chunksize = int(len(idxes) / cpu_count()) |
| 336 | + chunksize = min(max(1, chunksize), 400) |
| 337 | + chunks = split_into_chunks(idxes, chunksize) |
336 | 338 |
|
337 | 339 | def after_all_defs_done(): |
338 | 340 | # NOTE: defs database cannot be written to from now on. This is very important - process pool is used, |
@@ -425,7 +427,7 @@ def after_all_comps_done(): |
425 | 427 |
|
426 | 428 | if not db.vers.exists(tag): |
427 | 429 | print("updating tag", tag) |
428 | | - update_version(db, tag, pool, manager, 1000, dts_comp_support) |
| 430 | + update_version(db, tag, pool, manager, dts_comp_support) |
429 | 431 | db.close() |
430 | 432 | db = None |
431 | 433 |
|
0 commit comments