Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions autosklearn/automl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import json
import platform
import logging.handlers
import multiprocessing
import os
import sys
import time
from typing import Any, Dict, Optional, List, Union
import unittest.mock
import warnings
import tempfile
import threading

from ConfigSpace.read_and_write import json as cs_json
import dask.distributed
Expand Down Expand Up @@ -284,11 +284,12 @@ def _get_logger(self, name):
# under the above logging configuration setting
# We need to specify the logger_name so that received records
# are treated under the logger_name ROOT logger setting
self.stop_logging_server = threading.Event()
self.stop_logging_server = multiprocessing.Event()
self.logger_tcpserver = LogRecordSocketReceiver(logname=logger_name,
port=self._logger_port,
event=self.stop_logging_server)
self.logging_server = threading.Thread(target=self.logger_tcpserver.serve_until_stopped)
self.logging_server = multiprocessing.Process(
target=self.logger_tcpserver.serve_until_stopped)
self.logging_server.daemon = False
self.logging_server.start()
return get_logger(logger_name)
Expand All @@ -300,7 +301,15 @@ def _clean_logger(self):
# Clean up the logger
if self.logging_server.is_alive():
self.stop_logging_server.set()
self.logging_server.join()

# We try to join the process, after we sent
# the terminate event. Then we try a join to
# nicely join the event. In case something
# bad happens with nicely trying to kill the
# process, we execute a terminate to kill the
# process.
self.logging_server.join(timeout=5)
self.logging_server.terminate()
del self.logger_tcpserver
del self.stop_logging_server

Expand Down
19 changes: 6 additions & 13 deletions autosklearn/ensemble_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def build_ensemble(self, dask_client: dask.distributed.Client) -> None:
# The second criteria is elapsed time
elapsed_time = time.time() - self.start_time

logger = EnsembleBuilder._get_ensemble_logger(self.logger_port)
logger = get_named_client_logger('EnsembleBuilder', port=self.logger_port)

# First test for termination conditions
if self.time_left_for_ensembles < elapsed_time:
Expand Down Expand Up @@ -307,6 +307,8 @@ def fit_and_return_ensemble(
because we do not know when dask schedules the job.
iteration: int
The current iteration
logger_port: int
The port where the logging server is listening to.

Returns
-------
Expand Down Expand Up @@ -442,7 +444,7 @@ def __init__(

# Setup the logger
self.logger_port = logger_port
self.logger = self._get_ensemble_logger(self.logger_port)
self.logger = get_named_client_logger('EnsembleBuilder', port=self.logger_port)

if ensemble_nbest == 1:
self.logger.debug("Behaviour depends on int/float: %s, %s (ensemble_nbest, type)" %
Expand Down Expand Up @@ -529,15 +531,6 @@ def __init__(
del datamanager
self.ensemble_history = []

@classmethod
def _get_ensemble_logger(self, port: int):
"""
Returns the logger of for the ensemble process.
A subprocess will require to set this up, for instance,
pynisher forks
"""
return get_named_client_logger('EnsembleBuilder', port=port)

def run(
self,
iteration: int,
Expand All @@ -552,7 +545,7 @@ def run(
elif time_left is not None and end_at is not None:
raise ValueError('Cannot provide both time_left and end_at.')

self.logger = self._get_ensemble_logger(self.logger_port)
self.logger = get_named_client_logger('EnsembleBuilder', port=self.logger_port)

process_start_time = time.time()
while True:
Expand Down Expand Up @@ -621,7 +614,7 @@ def main(self, time_left, iteration, return_predictions):
# Pynisher jobs inside dask 'forget'
# the logger configuration. So we have to set it up
# accordingly
self.logger = self._get_ensemble_logger(self.logger_port)
self.logger = get_named_client_logger('EnsembleBuilder', port=self.logger_port)

self.start_time = time.time()
train_pred, valid_pred, test_pred = None, None, None
Expand Down
1 change: 1 addition & 0 deletions autosklearn/util/logging_.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,4 @@ def serve_until_stopped(self) -> None:

if self.event is not None and self.event.is_set():
break
print("Finished the process")