Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmapi/cmapi_server/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ class ProgInfo(NamedTuple):
CMAPI_PORT = 8640 #TODO: use it in all places
CURRENT_NODE_CMAPI_URL = f'https://localhost:{CMAPI_PORT}'
REQUEST_TIMEOUT: float = 30.0
TRANSACTION_TIMEOUT: float = 300.0 # 5 minutes

DMLPROC_SHUTDOWN_TIMEOUT: float = 300.0 # 5 minutes, should be less then LONG_REQUEST_TIMEOUT
LONG_REQUEST_TIMEOUT: float = 400.0 # should be less than TRANSACTION_TIMEOUT
TRANSACTION_TIMEOUT: float = 600.0 # 10 minutes

# API version
_version = '0.4.0'
Expand Down
24 changes: 16 additions & 8 deletions cmapi/cmapi_server/controllers/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@
from mcs_node_control.models.node_config import NodeConfig
from mcs_node_control.models.node_status import NodeStatus
from cmapi_server.constants import (
CMAPI_PACKAGE_NAME, CMAPI_PORT, DEFAULT_MCS_CONF_PATH,
DEFAULT_SM_CONF_PATH, EM_PATH_SUFFIX, MCS_BRM_CURRENT_PATH, MCS_EM_PATH,
MDB_CS_PACKAGE_NAME, MDB_SERVER_PACKAGE_NAME, REQUEST_TIMEOUT,
S3_BRM_CURRENT_PATH, SECRET_KEY,
CMAPI_PACKAGE_NAME,
CMAPI_PORT,
DEFAULT_MCS_CONF_PATH,
DMLPROC_SHUTDOWN_TIMEOUT,
EM_PATH_SUFFIX,
MCS_BRM_CURRENT_PATH,
MCS_EM_PATH,
MDB_CS_PACKAGE_NAME,
MDB_SERVER_PACKAGE_NAME,
REQUEST_TIMEOUT,
S3_BRM_CURRENT_PATH,
SECRET_KEY,
)
from cmapi_server.controllers.api_clients import NodeControllerClient
from cmapi_server import helpers
Expand Down Expand Up @@ -728,7 +736,7 @@ def put_shutdown(self):
req = cherrypy.request
use_sudo = get_use_sudo(req.app.config)
request_body = cherrypy.request.json
timeout = request_body.get('timeout', 0)
timeout = request_body.get('timeout', DMLPROC_SHUTDOWN_TIMEOUT)
node_config = NodeConfig()
try:
MCSProcessManager.stop_node(
Expand Down Expand Up @@ -897,7 +905,7 @@ def put_shutdown(self):

request = cherrypy.request
request_body = request.json
timeout = request_body.get('timeout', None)
timeout = request_body.get('timeout', DMLPROC_SHUTDOWN_TIMEOUT)
force = request_body.get('force', False)
config = request_body.get('config', DEFAULT_MCS_CONF_PATH)
in_transaction = request_body.get('in_transaction', False)
Expand All @@ -907,7 +915,7 @@ def put_shutdown(self):
with TransactionManager():
response = ClusterHandler.shutdown(config, timeout)
else:
response = ClusterHandler.shutdown(config)
response = ClusterHandler.shutdown(config, timeout)
except CMAPIBasicError as err:
raise_422_error(module_logger, func_name, err.message)

Expand Down Expand Up @@ -1597,7 +1605,7 @@ def put_stop_dmlproc(self):

request = cherrypy.request
request_body = request.json
timeout = request_body.get('timeout', 10)
timeout = request_body.get('timeout', DMLPROC_SHUTDOWN_TIMEOUT)
force = request_body.get('force', False)

if force:
Expand Down
16 changes: 10 additions & 6 deletions cmapi/cmapi_server/handlers/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
from tracing.traced_session import get_traced_session

from cmapi_server.constants import (
CMAPI_CONF_PATH, CMAPI_PORT, DEFAULT_MCS_CONF_PATH, REQUEST_TIMEOUT,
CMAPI_CONF_PATH,
CMAPI_PORT,
DEFAULT_MCS_CONF_PATH,
DMLPROC_SHUTDOWN_TIMEOUT,
REQUEST_TIMEOUT,
)
from cmapi_server.exceptions import CMAPIBasicError, exc_to_cmapi_error
from cmapi_server.controllers.api_clients import NodeControllerClient
Expand Down Expand Up @@ -44,7 +48,7 @@ class ClusterAction(Enum):


def toggle_cluster_state(
action: ClusterAction, config: str) -> dict:
action: ClusterAction, config: str, timeout: int = DMLPROC_SHUTDOWN_TIMEOUT) -> dict:
"""Toggle the state of the cluster (start or stop).

:param action: The cluster action to perform.
Expand All @@ -64,7 +68,7 @@ def toggle_cluster_state(

switch_node_maintenance(maintainance_flag)
update_revision_and_manager()
broadcast_new_config(config, distribute_secrets=True)
broadcast_new_config(config, distribute_secrets=True, timeout=timeout)


class ClusterHandler:
Expand Down Expand Up @@ -161,15 +165,15 @@ def start(config: str = DEFAULT_MCS_CONF_PATH) -> dict:

@staticmethod
def shutdown(
config: str = DEFAULT_MCS_CONF_PATH, timeout: Optional[int] = None
config: str = DEFAULT_MCS_CONF_PATH, timeout: int = DMLPROC_SHUTDOWN_TIMEOUT,
) -> dict:
"""Method to stop the MCS Cluster.

:param config: columnstore xml config file path,
defaults to DEFAULT_MCS_CONF_PATH
:type config: str, optional
:param timeout: timeout in seconds to gracefully stop DMLProc,
defaults to None
defaults to DMLPROC_SHUTDOWN_TIMEOUT
:type timeout: Optional[int], optional
:raises CMAPIBasicError: if no nodes in the cluster
:return: start timestamp
Expand All @@ -180,7 +184,7 @@ def shutdown(
'Cluster shutdown command called. Shutting down the cluster.'
)
operation_start_time = str(datetime.now())
toggle_cluster_state(ClusterAction.STOP, config)
toggle_cluster_state(ClusterAction.STOP, config, timeout=timeout)
logger.debug('Successfully finished shutting down the cluster.')
return {'timestamp': operation_start_time}

Expand Down
24 changes: 15 additions & 9 deletions cmapi/cmapi_server/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,15 @@
requests.packages.urllib3.disable_warnings() # pylint: disable=no-member

from cmapi_server.constants import (
CMAPI_CONF_PATH, CMAPI_DEFAULT_CONF_PATH, DEFAULT_MCS_CONF_PATH,
DEFAULT_SM_CONF_PATH, LOCALHOSTS, _version
CMAPI_CONF_PATH,
CMAPI_DEFAULT_CONF_PATH,
DEFAULT_MCS_CONF_PATH,
DEFAULT_SM_CONF_PATH,
DMLPROC_SHUTDOWN_TIMEOUT,
LOCALHOSTS,
LONG_REQUEST_TIMEOUT,
TRANSACTION_TIMEOUT,
_version
)
from cmapi_server.handlers.cej import CEJPasswordHandler
from cmapi_server.managers.process import MCSProcessManager
Expand Down Expand Up @@ -63,7 +70,7 @@ def start_transaction(
remove_nodes: Optional[list] = None,
optional_nodes: Optional[list] = None,
txn_id: Optional[int] = None,
timeout: float = 300.0
timeout: float = TRANSACTION_TIMEOUT
):
"""Start internal CMAPI transaction.

Expand All @@ -87,7 +94,7 @@ def start_transaction(
:param txn_id: id for transaction to start, defaults to None
:type txn_id: Optional[int], optional
:param timeout: time in seconds for cmapi transaction lock before it ends
automatically, defaults to 300
automatically, defaults to TRANSACTION_TIMEOUT
:type timeout: float, optional
:return: (success, txn_id, nodes)
:rtype: tuple[bool, int, list[str]]
Expand Down Expand Up @@ -324,8 +331,7 @@ def broadcast_new_config(
defaults to DEFAULT_SM_CONF_PATH
:param test_mode: for test purposes, defaults to False TODO: remove
:param nodes: nodes list for config put, defaults to None
:param timeout: timeout passing to gracefully stop DMLProc TODO: for next
releases. Could affect all logic of broadcacting new config
:param timeout: timeout passing to gracefully stop DMLProc process,
:param distribute_secrets: flag to distribute secrets to nodes
:param stateful_config_dict: stateful config update dict to distribute to nodes
:raises CMAPIBasicError: If Broadcasting config to nodes failed with errors
Expand All @@ -341,7 +347,7 @@ def broadcast_new_config(
headers = {'x-api-key': key}
if stateful_config_dict:
body = {
'timeout': 300,
'timeout': DMLPROC_SHUTDOWN_TIMEOUT if timeout is None else timeout,
'stateful_config_dict': stateful_config_dict,
'only_stateful_config': True,
}
Expand All @@ -357,7 +363,7 @@ def broadcast_new_config(
body = {
'manager': root.find('./ClusterManager').text,
'revision': root.find('./ConfigRevision').text,
'timeout': 300,
'timeout': DMLPROC_SHUTDOWN_TIMEOUT if timeout is None else timeout,
'config': config_text,
'mcs_config_filename': cs_config_filename,
'sm_config_filename': sm_config_filename,
Expand Down Expand Up @@ -395,7 +401,7 @@ async def update_config(node: str, headers: dict, body: dict) -> None:
async with create_traced_async_session() as session:
try:
async with session.put(
url, headers=headers, json=body, ssl=False, timeout=120
url, headers=headers, json=body, ssl=False, timeout=LONG_REQUEST_TIMEOUT
) as response:
resp_json = await response.json(encoding='utf-8')
response.raise_for_status()
Expand Down
61 changes: 40 additions & 21 deletions cmapi/cmapi_server/managers/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@
import logging
import os.path
import socket
import time
from time import sleep

import psutil

from cmapi_server.constants import ALL_MCS_PROGS, MCS_INSTALL_BIN, MCSProgs, ProgInfo
from cmapi_server.constants import (
ALL_MCS_PROGS,
DMLPROC_SHUTDOWN_TIMEOUT,
MCS_INSTALL_BIN,
MCSProgs,
ProgInfo,
)
from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.process_dispatchers.base import BaseDispatcher
from cmapi_server.process_dispatchers.container import ContainerDispatcher
Expand Down Expand Up @@ -238,32 +245,44 @@ def _wait_for_controllernode(cls) -> bool:
return True

@classmethod
def _wait_for_DMLProc_stop(cls, timeout: int = 10) -> bool:
def _wait_for_DMLProc_stop(cls, timeout: int = DMLPROC_SHUTDOWN_TIMEOUT) -> bool:
"""Waiting DMLProc process to stop.

:param timeout: timeout to wait, defaults to 10
:param timeout: timeout to wait in seconds, defaults to DMLPROC_SHUTDOWN_TIMEOUT
:type timeout: int, optional
:return: True on success
:rtype: bool
"""
logging.info(f'Waiting for DMLProc to stop in {timeout} seconds')
dmlproc_stopped = False
while timeout > 0:
logging.info(
f'Waiting for DMLProc to stop. Seconds left {timeout}.'
)
deadline = time.monotonic() + max(1, int(timeout))
# Log at most every 5 seconds while polling every ~1s for responsiveness
LOG_INTERVAL_SEC = 5.0
next_log_at = time.monotonic() # log immediately on first iteration

while True:
now = time.monotonic()
remaining = deadline - now
if remaining <= 0:
break

if not Process.check_process_alive('DMLProc'):
logging.info('DMLProc gracefully stopped by DBRM command.')
dmlproc_stopped = True
break
sleep(1)
timeout -= 1
else:
logging.error(
f'DMLProc did not stopped gracefully by DBRM command within '
f'{timeout} seconds. Will be stopped directly.'
)
return dmlproc_stopped
return True

if now >= next_log_at:
logging.info(
f'Waiting for DMLProc to stop. Seconds left ~{int(remaining)}.'
)
next_log_at = now + LOG_INTERVAL_SEC

# Sleep in small increments to minimize over-wait after process exit
sleep(min(1.0, max(0.1, remaining)))

logging.error(
"DMLProc didn't stop gracefully by DBRM command within "
f"{int(timeout)} seconds. Will be stopped directly."
)
return False

@classmethod
def noop(cls, *args, **kwargs):
Expand Down Expand Up @@ -324,7 +343,7 @@ def start(cls, name: str, is_primary: bool, use_sudo: bool) -> bool:

@classmethod
def stop(
cls, name: str, is_primary: bool, use_sudo: bool, timeout: int = 10
cls, name: str, is_primary: bool, use_sudo: bool, timeout: int = DMLPROC_SHUTDOWN_TIMEOUT
) -> bool:
"""Stop mcs process.

Expand Down Expand Up @@ -455,7 +474,7 @@ def stop_node(
cls,
is_primary: bool,
use_sudo: bool = True,
timeout: int = 10,
timeout: int = DMLPROC_SHUTDOWN_TIMEOUT,
):
"""Stop mcs node processes.

Expand All @@ -472,7 +491,7 @@ def stop_node(
# undefined behaviour when primary gone and then recovers (failover
# triggered 2 times).
for prog_name in cls._get_sorted_progs(is_primary=True, reverse=True):
if not cls.stop(prog_name, is_primary, use_sudo):
if not cls.stop(prog_name, is_primary, use_sudo, timeout=timeout):
logging.error(f'Process "{prog_name}" not stopped properly.')
raise CMAPIBasicError(f'Error while stopping "{prog_name}"')

Expand Down
3 changes: 1 addition & 2 deletions cmapi/cmapi_server/managers/upgrade/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ def get_latest_tested_mdb_version(cls) -> str:
:raises CMAPIBasicError: no latest version matched with latest tested
:raises CMAPIBasicError: if request error
:return: latest MDB version matched with latest tested major
:rtype: str
"""
try:
# Download the keyring file
Expand All @@ -174,7 +173,7 @@ def get_latest_tested_mdb_version(cls) -> str:
)
latest_version_num = sorted(latest_version_nums, reverse=True)[0]
logging.debug(
'Succesfully got latest MBD version number: '
'Succesfully got latest MDB version number: '
f'{latest_version_num}'
)
except requests.RequestException as exc:
Expand Down
1 change: 1 addition & 0 deletions cmapi/mcs_cluster_tool/cluster_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def stop(
# could affect put_config (helpers.py broadcast_config) operation
timeout = 0

#TODO: bypass timeout here
resp = client.shutdown_cluster({'in_transaction': True})
return {'timestamp': start_time}

Expand Down
8 changes: 6 additions & 2 deletions cmapi/mcs_cluster_tool/tools_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@


from cmapi_server.constants import (
MCS_DATA_PATH, MCS_SECRETS_FILENAME, REQUEST_TIMEOUT, TRANSACTION_TIMEOUT,
CMAPI_CONF_PATH, CMAPI_PORT,
CMAPI_CONF_PATH,
CMAPI_PORT,
MCS_DATA_PATH,
MCS_SECRETS_FILENAME,
REQUEST_TIMEOUT,
TRANSACTION_TIMEOUT,
)
from cmapi_server.controllers.api_clients import (
AppControllerClient, ClusterControllerClient, NodeControllerClient
Expand Down