Skip to content

Commit f730304

Browse files
authored
Improve logging and remove deadlock (#1012)
* Improve logging 1. use forkserver context to start processes within the pynisher. This should remove a deadlock due to the use of logging, threading and multiprocessing. As a side-effect, we have to bump the minimal pynisher version to 0.6.2. 2. log messages from dask.distributed to a separate logfile 3. change a few log message severities or remove the log message alltogether * fix deadlock and tests * Fixes 1. randomly try IPs for the logging server inside try/except. Previously, the ID would be determined before starting the server, and therefore the IP could be taken by a different process in the meantime. 2. Fix a bug in model deletion (if mem usage too high or max model on disk reached). 3. improve output of metadata generation * add output for online test
1 parent 168330e commit f730304

File tree

17 files changed

+115
-109
lines changed

17 files changed

+115
-109
lines changed

autosklearn/automl.py

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
from autosklearn.util.stopwatch import StopWatch
4444
from autosklearn.util.logging_ import (
4545
get_logger,
46-
is_port_in_use,
4746
LogRecordSocketReceiver,
4847
setup_logger,
4948
)
@@ -247,7 +246,11 @@ def _create_dask_client(self):
247246
# file was deleted, so the client could not close
248247
# the worker properly
249248
local_directory=tempfile.gettempdir(),
250-
)
249+
# Memory is handled by the pynisher, not by the dask worker/nanny
250+
memory_limit=0,
251+
),
252+
# Heartbeat every 10s
253+
heartbeat_interval=10000,
251254
)
252255

253256
def _close_dask_client(self):
@@ -269,26 +272,35 @@ def _get_logger(self, name):
269272
# Setup the configuration for the logger
270273
# This is gonna be honored by the server
271274
# Which is created below
272-
setup_logger(os.path.join(self._backend.temporary_directory,
273-
'%s.log' % str(logger_name)),
274-
self.logging_config,
275-
)
276-
277-
# The desired port might be used, so check this
278-
while is_port_in_use(self._logger_port):
279-
self._logger_port += 1
275+
setup_logger(
276+
output_file=os.path.join(
277+
self._backend.temporary_directory, '%s.log' % str(logger_name)
278+
),
279+
logging_config=self.logging_config,
280+
output_dir=self._backend.temporary_directory,
281+
)
280282

281283
# As Auto-sklearn works with distributed process,
282284
# we implement a logger server that can receive tcp
283285
# pickled messages. They are unpickled and processed locally
284286
# under the above logging configuration setting
285287
# We need to specify the logger_name so that received records
286288
# are treated under the logger_name ROOT logger setting
287-
self.stop_logging_server = multiprocessing.Event()
288-
self.logger_tcpserver = LogRecordSocketReceiver(logname=logger_name,
289-
port=self._logger_port,
290-
event=self.stop_logging_server)
291-
self.logging_server = multiprocessing.Process(
289+
context = multiprocessing.get_context('fork')
290+
self.stop_logging_server = context.Event()
291+
292+
while True:
293+
# Loop until we find a valid port
294+
self._logger_port = np.random.randint(10000, 65535)
295+
try:
296+
self.logger_tcpserver = LogRecordSocketReceiver(logname=logger_name,
297+
port=self._logger_port,
298+
event=self.stop_logging_server)
299+
break
300+
except OSError:
301+
continue
302+
303+
self.logging_server = context.Process(
292304
target=self.logger_tcpserver.serve_until_stopped)
293305
self.logging_server.daemon = False
294306
self.logging_server.start()
@@ -354,7 +366,6 @@ def _do_dummy_prediction(self, datamanager, num_run):
354366
autosklearn_seed=self._seed,
355367
resampling_strategy=self._resampling_strategy,
356368
initial_num_run=num_run,
357-
logger=self._logger,
358369
stats=stats,
359370
metric=self._metric,
360371
memory_limit=memory_limit,
@@ -409,6 +420,9 @@ def fit(
409420
only_return_configuration_space: Optional[bool] = False,
410421
load_models: bool = True,
411422
):
423+
self._backend.save_start_time(self._seed)
424+
self._stopwatch = StopWatch()
425+
412426
# Make sure that input is valid
413427
# Performs Ordinal one hot encoding to the target
414428
# both for train and test data
@@ -434,6 +448,12 @@ def fit(
434448
raise ValueError('Metric must be instance of '
435449
'autosklearn.metrics.Scorer.')
436450

451+
if dataset_name is None:
452+
dataset_name = hash_array_or_matrix(X)
453+
# By default try to use the TCP logging port or get a new port
454+
self._logger_port = logging.handlers.DEFAULT_TCP_LOGGING_PORT
455+
self._logger = self._get_logger(dataset_name)
456+
437457
# If no dask client was provided, we create one, so that we can
438458
# start a ensemble process in parallel to smbo optimize
439459
if (
@@ -444,18 +464,9 @@ def fit(
444464
else:
445465
self._is_dask_client_internally_created = False
446466

447-
if dataset_name is None:
448-
dataset_name = hash_array_or_matrix(X)
449-
450-
self._backend.save_start_time(self._seed)
451-
self._stopwatch = StopWatch()
452467
self._dataset_name = dataset_name
453468
self._stopwatch.start_task(self._dataset_name)
454469

455-
# By default try to use the TCP logging port or get a new port
456-
self._logger_port = logging.handlers.DEFAULT_TCP_LOGGING_PORT
457-
self._logger = self._get_logger(dataset_name)
458-
459470
if feat_type is not None and len(feat_type) != X.shape[1]:
460471
raise ValueError('Array feat_type does not have same number of '
461472
'variables as X has features. %d vs %d.' %

autosklearn/ensemble_builder.py

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
import math
55
import numbers
66
import logging.handlers
7+
import multiprocessing
78
import os
89
import pickle
910
import re
1011
import shutil
12+
import sys
1113
import time
1214
import traceback
1315
from typing import List, Optional, Tuple, Union
@@ -28,7 +30,7 @@
2830
from autosklearn.metrics import calculate_score, Scorer
2931
from autosklearn.ensembles.ensemble_selection import EnsembleSelection
3032
from autosklearn.ensembles.abstract_ensemble import AbstractEnsemble
31-
from autosklearn.util.logging_ import get_named_client_logger
33+
from autosklearn.util.logging_ import get_named_client_logger, get_logger
3234

3335
Y_ENSEMBLE = 0
3436
Y_VALID = 1
@@ -153,7 +155,7 @@ def build_ensemble(self, dask_client: dask.distributed.Client) -> None:
153155
# The second criteria is elapsed time
154156
elapsed_time = time.time() - self.start_time
155157

156-
logger = get_named_client_logger('EnsembleBuilder', port=self.logger_port)
158+
logger = get_logger('EnsembleBuilder')
157159

158160
# First test for termination conditions
159161
if self.time_left_for_ensembles < elapsed_time:
@@ -562,10 +564,17 @@ def run(
562564

563565
if time_left - time_buffer < 1:
564566
break
567+
context = multiprocessing.get_context('forkserver')
568+
# Try to copy as many modules into the new context to reduce startup time
569+
# http://www.bnikolic.co.uk/blog/python/parallelism/2019/11/13/python-forkserver-preload.html
570+
# do not copy the logging module as it causes deadlocks!
571+
preload_modules = list(filter(lambda key: 'logging' not in key, sys.modules.keys()))
572+
context.set_forkserver_preload(preload_modules)
565573
safe_ensemble_script = pynisher.enforce_limits(
566574
wall_time_in_s=int(time_left - time_buffer),
567575
mem_in_mb=self.memory_limit,
568-
logger=self.logger
576+
logger=self.logger,
577+
context=context,
569578
)(self.main)
570579
safe_ensemble_script(time_left, iteration, return_predictions)
571580
if safe_ensemble_script.exit_status is pynisher.MemorylimitException:
@@ -1385,24 +1394,11 @@ def _delete_excess_models(self, selected_keys: List[str]):
13851394
13861395
"""
13871396

1388-
# Obtain a list of sorted pred keys
1389-
sorted_keys = self._get_list_of_sorted_preds()
1390-
sorted_keys = list(map(lambda x: x[0], sorted_keys))
1391-
1392-
if len(sorted_keys) <= self.max_resident_models:
1393-
# Don't waste time if not enough models to delete
1394-
return
1395-
1396-
# The top self.max_resident_models models would be the candidates
1397-
# Any other low performance model will be deleted
1398-
# The list is in ascending order of score
1399-
candidates = sorted_keys[:self.max_resident_models]
1400-
14011397
# Loop through the files currently in the directory
14021398
for pred_path in self.y_ens_files:
14031399

14041400
# Do not delete candidates
1405-
if pred_path in candidates:
1401+
if pred_path in selected_keys:
14061402
continue
14071403

14081404
if pred_path in self._has_been_candidate:

autosklearn/estimators.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,10 @@ def __init__(
177177
dask_client : dask.distributed.Client, optional
178178
User-created dask client, can be used to start a dask cluster and then
179179
attach auto-sklearn to it.
180+
181+
Auto-sklearn can run into a deadlock if the dask client uses threads for
182+
parallelization, it is therefore highly recommended to use dask workers
183+
using a single process.
180184
181185
disable_evaluator_output: bool or list, optional (False)
182186
If True, disable model and prediction output. Cannot be used

autosklearn/evaluation/__init__.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import math
55
import multiprocessing
66
from queue import Empty
7+
import sys
78
import time
89
import traceback
910
from typing import Dict, List, Optional, Tuple, Union
@@ -96,7 +97,7 @@ def _encode_exit_status(exit_status):
9697
class ExecuteTaFuncWithQueue(AbstractTAFunc):
9798

9899
def __init__(self, backend, autosklearn_seed, resampling_strategy, metric,
99-
logger, cost_for_crash, abort_on_first_run_crash,
100+
cost_for_crash, abort_on_first_run_crash,
100101
initial_num_run=1, stats=None,
101102
run_obj='quality', par_factor=1, all_scoring_functions=False,
102103
output_y_hat_optimization=True, include=None, exclude=None,
@@ -160,7 +161,6 @@ def __init__(self, backend, autosklearn_seed, resampling_strategy, metric,
160161
self.disable_file_output = disable_file_output
161162
self.init_params = init_params
162163
self.budget_type = budget_type
163-
self.logger = logger
164164

165165
if memory_limit is not None:
166166
memory_limit = int(math.ceil(memory_limit))
@@ -244,7 +244,13 @@ def run(
244244
instance_specific: Optional[str] = None,
245245
) -> Tuple[StatusType, float, float, Dict[str, Union[int, float, str, Dict, List, Tuple]]]:
246246

247-
queue = multiprocessing.Queue()
247+
context = multiprocessing.get_context('forkserver')
248+
# Try to copy as many modules into the new context to reduce startup time
249+
# http://www.bnikolic.co.uk/blog/python/parallelism/2019/11/13/python-forkserver-preload.html
250+
# do not copy the logging module as it causes deadlocks!
251+
preload_modules = list(filter(lambda key: 'logging' not in key, sys.modules.keys()))
252+
context.set_forkserver_preload(preload_modules)
253+
queue = context.Queue()
248254

249255
if not (instance_specific is None or instance_specific == '0'):
250256
raise ValueError(instance_specific)
@@ -257,6 +263,7 @@ def run(
257263
wall_time_in_s=cutoff,
258264
mem_in_mb=self.memory_limit,
259265
capture_output=True,
266+
context=context,
260267
)
261268

262269
if isinstance(config, int):
@@ -436,8 +443,4 @@ def run(
436443
runtime = float(obj.wall_clock_time)
437444

438445
autosklearn.evaluation.util.empty_queue(queue)
439-
self.logger.debug(
440-
'Finished function evaluation. Status: %s, Cost: %f, Runtime: %f, Additional %s',
441-
status, cost, runtime, additional_run_info,
442-
)
443446
return status, cost, runtime, additional_run_info

autosklearn/metalearning/input/aslib_simple.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,9 @@ def _find_files(self):
5757
for expected_file in optional:
5858
full_path = os.path.join(self.dir_, expected_file)
5959
if not os.path.isfile(full_path):
60-
self.logger.warning(
61-
"Not found: %s (maybe you want to add it)" % (full_path))
60+
# self.logger.warning(
61+
# "Not found: %s (maybe you want to add it)" % (full_path))
62+
pass
6263
else:
6364
self.found_files.append(full_path)
6465

autosklearn/metalearning/mismbo.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def suggest_via_metalearning(
1919

2020
task = TASK_TYPES_TO_STRING[task]
2121

22-
logger.warning(task)
22+
logger.info(task)
2323

2424
start = time.time()
2525
ml = MetaLearningOptimizer(

autosklearn/smbo.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,6 @@ def run_smbo(self):
430430
autosklearn_seed=seed,
431431
resampling_strategy=self.resampling_strategy,
432432
initial_num_run=num_run,
433-
logger=self.logger,
434433
include=include,
435434
exclude=exclude,
436435
metric=self.metric,

autosklearn/util/logging.yaml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ handlers:
1818
formatter: simple
1919
filename: autosklearn.log
2020

21+
distributed_logfile:
22+
class: logging.FileHandler
23+
level: DEBUG
24+
formatter: simple
25+
filename: distributed.log
26+
2127
root:
2228
level: DEBUG
2329
handlers: [console, file_handler]
@@ -26,7 +32,6 @@ loggers:
2632
autosklearn.metalearning:
2733
level: DEBUG
2834
handlers: [file_handler]
29-
propagate: no
3035

3136
autosklearn.util.backend:
3237
level: DEBUG
@@ -48,3 +53,7 @@ loggers:
4853
EnsembleBuilder:
4954
level: DEBUG
5055
propagate: no
56+
57+
distributed:
58+
level: DEBUG
59+
handlers: [distributed_logfile]

0 commit comments

Comments
 (0)