Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 68 additions & 3 deletions autosklearn/automl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import io
import json
import platform
import logging.handlers
import multiprocessing
import os
import sys
import time
Expand Down Expand Up @@ -39,7 +41,12 @@
from autosklearn.metrics import calculate_score
from autosklearn.util.backend import Backend
from autosklearn.util.stopwatch import StopWatch
from autosklearn.util.logging_ import get_logger, setup_logger
from autosklearn.util.logging_ import (
get_logger,
is_port_in_use,
LogRecordSocketReceiver,
setup_logger,
)
from autosklearn.util import pipeline, RE_PATTERN
from autosklearn.ensemble_builder import EnsembleBuilderManager
from autosklearn.ensembles.singlebest_ensemble import SingleBest
Expand Down Expand Up @@ -258,12 +265,54 @@ def _close_dask_client(self):

def _get_logger(self, name):
logger_name = 'AutoML(%d):%s' % (self._seed, name)

# Setup the configuration for the logger
# This is gonna be honored by the server
# Which is created below
setup_logger(os.path.join(self._backend.temporary_directory,
'%s.log' % str(logger_name)),
self.logging_config,
)

# The desired port might be used, so check this
while is_port_in_use(self._logger_port):
self._logger_port += 1

# As Auto-sklearn works with distributed process,
# we implement a logger server that can receive tcp
# pickled messages. They are unpickled and processed locally
# under the above logging configuration setting
# We need to specify the logger_name so that received records
# are treated under the logger_name ROOT logger setting
self.stop_logging_server = multiprocessing.Event()
self.logger_tcpserver = LogRecordSocketReceiver(logname=logger_name,
port=self._logger_port,
event=self.stop_logging_server)
self.logging_server = multiprocessing.Process(
target=self.logger_tcpserver.serve_until_stopped)
self.logging_server.daemon = False
self.logging_server.start()
return get_logger(logger_name)

def _clean_logger(self):
if not hasattr(self, 'stop_logging_server') or self.stop_logging_server is None:
return

# Clean up the logger
if self.logging_server.is_alive():
self.stop_logging_server.set()

# We try to join the process, after we sent
# the terminate event. Then we try a join to
# nicely join the event. In case something
# bad happens with nicely trying to kill the
# process, we execute a terminate to kill the
# process.
self.logging_server.join(timeout=5)
self.logging_server.terminate()
del self.logger_tcpserver
del self.stop_logging_server

@staticmethod
def _start_task(watcher, task_name):
watcher.start_task(task_name)
Expand Down Expand Up @@ -403,6 +452,8 @@ def fit(
self._dataset_name = dataset_name
self._stopwatch.start_task(self._dataset_name)

# By default try to use the TCP logging port or get a new port
self._logger_port = logging.handlers.DEFAULT_TCP_LOGGING_PORT
self._logger = self._get_logger(dataset_name)

if feat_type is not None and len(feat_type) != X.shape[1]:
Expand Down Expand Up @@ -589,8 +640,8 @@ def fit(
max_iterations=None,
read_at_most=np.inf,
ensemble_memory_limit=self._memory_limit,
logger_name=self._logger.name,
random_state=self._seed,
logger_port=self._logger_port,
)

self._stopwatch.stop_task(ensemble_task_name)
Expand Down Expand Up @@ -686,6 +737,12 @@ def fit(
# while the ensemble builder tries to access the data
if proc_ensemble is not None:
self.ensemble_performance_history = list(proc_ensemble.history)

# save the ensemble performance history file
if len(self.ensemble_performance_history) > 0:
pd.DataFrame(self.ensemble_performance_history).to_json(
os.path.join(self._backend.internals_directory, 'ensemble_history.json'))

if len(proc_ensemble.futures) > 0:
future = proc_ensemble.futures.pop()
future.cancel()
Expand All @@ -694,6 +751,9 @@ def fit(
self._load_models()
self._close_dask_client()

# Clean up the logger
self._clean_logger()

return self

def refit(self, X, y):
Expand Down Expand Up @@ -866,7 +926,7 @@ def fit_ensemble(self, y, task=None, precision=32,
read_at_most=np.inf,
ensemble_memory_limit=self._memory_limit,
random_state=self._seed,
logger_name=self._logger.name,
logger_port=self._logger_port,
)
manager.build_ensemble(self._dask_client)
future = manager.futures.pop()
Expand Down Expand Up @@ -1157,9 +1217,14 @@ def configuration_space_created_hook(self, datamanager, configuration_space):
def __getstate__(self) -> Dict[str, Any]:
# Cannot serialize a client!
self._dask_client = None
self.logging_server = None
self.stop_logging_server = None
return self.__dict__

def __del__(self):
# Clean up the logger
self._clean_logger()

self._close_dask_client()

# When a multiprocessing work is done, the
Expand Down
61 changes: 19 additions & 42 deletions autosklearn/ensemble_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import gzip
import math
import numbers
import logging.handlers
import os
import pickle
import re
Expand All @@ -27,7 +28,7 @@
from autosklearn.metrics import calculate_score, Scorer
from autosklearn.ensembles.ensemble_selection import EnsembleSelection
from autosklearn.ensembles.abstract_ensemble import AbstractEnsemble
from autosklearn.util.logging_ import get_logger, setup_logger
from autosklearn.util.logging_ import get_named_client_logger

Y_ENSEMBLE = 0
Y_VALID = 1
Expand All @@ -54,7 +55,7 @@ def __init__(
read_at_most: int,
ensemble_memory_limit: Optional[int],
random_state: int,
logger_name: str,
logger_port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT,
):
""" SMAC callback to handle ensemble building

Expand Down Expand Up @@ -100,8 +101,8 @@ def __init__(
memory limit in mb. If ``None``, no memory limit is enforced.
read_at_most: int
read at most n new prediction files in each iteration
logger_name: str
Name of the logger where we are gonna write information
logger_port: int
port that receives logging records

Returns
-------
Expand All @@ -124,7 +125,7 @@ def __init__(
self.read_at_most = read_at_most
self.ensemble_memory_limit = ensemble_memory_limit
self.random_state = random_state
self.logger_name = logger_name
self.logger_port = logger_port

# Store something similar to SMAC's runhistory
self.history = []
Expand Down Expand Up @@ -152,11 +153,7 @@ def build_ensemble(self, dask_client: dask.distributed.Client) -> None:
# The second criteria is elapsed time
elapsed_time = time.time() - self.start_time

logger = EnsembleBuilder._get_ensemble_logger(
self.logger_name,
self.backend.temporary_directory,
False
)
logger = get_named_client_logger('EnsembleBuilder', port=self.logger_port)

# First test for termination conditions
if self.time_left_for_ensembles < elapsed_time:
Expand Down Expand Up @@ -216,11 +213,11 @@ def build_ensemble(self, dask_client: dask.distributed.Client) -> None:
memory_limit=self.ensemble_memory_limit,
read_at_most=self.read_at_most,
random_state=self.seed,
logger_name=self.logger_name,
end_at=self.start_time + self.time_left_for_ensembles,
iteration=self.iteration,
return_predictions=False,
priority=100,
logger_port=self.logger_port,
))

logger.info(
Expand Down Expand Up @@ -254,10 +251,10 @@ def fit_and_return_ensemble(
memory_limit: Optional[int],
read_at_most: int,
random_state: int,
logger_name: str,
end_at: float,
iteration: int,
return_predictions: bool,
logger_port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT,
) -> Tuple[
List[Tuple[int, float, float, float]],
int,
Expand Down Expand Up @@ -305,13 +302,13 @@ def fit_and_return_ensemble(
memory limit in mb. If ``None``, no memory limit is enforced.
read_at_most: int
read at most n new prediction files in each iteration
logger_name: str
Name of the logger where we are gonna write information
end_at: float
At what time the job must finish. Needs to be the endtime and not the time left
because we do not know when dask schedules the job.
iteration: int
The current iteration
logger_port: int
The port where the logging server is listening to.

Returns
-------
Expand All @@ -333,7 +330,7 @@ def fit_and_return_ensemble(
memory_limit=memory_limit,
read_at_most=read_at_most,
random_state=random_state,
logger_name=logger_name,
logger_port=logger_port,
).run(
end_at=end_at,
iteration=iteration,
Expand All @@ -358,7 +355,7 @@ def __init__(
memory_limit: Optional[int] = 1024,
read_at_most: int = 5,
random_state: Optional[Union[int, np.random.RandomState]] = None,
logger_name: str = 'ensemble_builder',
logger_port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT,
):
"""
Constructor
Expand Down Expand Up @@ -404,8 +401,8 @@ def __init__(
memory limit in mb. If ``None``, no memory limit is enforced.
read_at_most: int
read at most n new prediction files in each iteration
logger_name: str
Name of the logger where we are gonna write information
logger_port: int
port that receives logging records
"""

super(EnsembleBuilder, self).__init__()
Expand Down Expand Up @@ -446,9 +443,8 @@ def __init__(
self.random_state = check_random_state(random_state)

# Setup the logger
self.logger_name = logger_name
self.logger = self._get_ensemble_logger(
self.logger_name, self.backend.temporary_directory, False)
self.logger_port = logger_port
self.logger = get_named_client_logger('EnsembleBuilder', port=self.logger_port)

if ensemble_nbest == 1:
self.logger.debug("Behaviour depends on int/float: %s, %s (ensemble_nbest, type)" %
Expand Down Expand Up @@ -535,23 +531,6 @@ def __init__(
del datamanager
self.ensemble_history = []

@classmethod
def _get_ensemble_logger(self, logger_name, dirname, setup):
"""
Returns the logger of for the ensemble process.
A subprocess will require to set this up, for instance,
pynisher forks
"""
if setup:
setup_logger(
os.path.join(
dirname,
'%s.log' % str(logger_name)
),
)

return get_logger('EnsembleBuilder')

def run(
self,
iteration: int,
Expand All @@ -566,8 +545,7 @@ def run(
elif time_left is not None and end_at is not None:
raise ValueError('Cannot provide both time_left and end_at.')

self.logger = self._get_ensemble_logger(
self.logger_name, self.backend.temporary_directory, True)
self.logger = get_named_client_logger('EnsembleBuilder', port=self.logger_port)

process_start_time = time.time()
while True:
Expand Down Expand Up @@ -636,8 +614,7 @@ def main(self, time_left, iteration, return_predictions):
# Pynisher jobs inside dask 'forget'
# the logger configuration. So we have to set it up
# accordingly
self.logger = self._get_ensemble_logger(
self.logger_name, self.backend.temporary_directory, False)
self.logger = get_named_client_logger('EnsembleBuilder', port=self.logger_port)

self.start_time = time.time()
train_pred, valid_pred, test_pred = None, None, None
Expand Down
4 changes: 4 additions & 0 deletions autosklearn/util/logging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ loggers:
smac.optimizer.smbo.SMBO:
level: INFO
handlers: [file_handler, console]

EnsembleBuilder:
level: DEBUG
propagate: no
Loading