Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions ibm_cloud_sdk_core/base_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import logging
import json as json_import
from http.cookiejar import CookieJar
from http.client import HTTPConnection
from http import client
from os.path import basename
from typing import Dict, List, Optional, Tuple, Union
from urllib3.util.retry import Retry
Expand All @@ -43,7 +43,10 @@
GzipStream,
)
from .private_helpers import _build_user_agent
from .logger import get_logger
from .logger import (
get_logger,
LoggingFilter,
)

logger = get_logger()

Expand Down Expand Up @@ -89,7 +92,7 @@ def __init__(
self,
*,
service_url: str = None,
authenticator: Authenticator = None,
authenticator: Optional[Authenticator] = None,
disable_ssl_verification: bool = False,
enable_gzip_compression: bool = False,
) -> None:
Expand All @@ -113,7 +116,10 @@ def __init__(
self.http_client.mount('https://', self.http_adapter)
# If debug logging is requested, then trigger HTTP message logging as well.
if logger.isEnabledFor(logging.DEBUG):
HTTPConnection.debuglevel = 1
client.HTTPConnection.debuglevel = 1
# Replace the `print` function in the HTTPClient module to
# use the debug logger instead of the bare Python print.
client.print = lambda *args: logger.debug(LoggingFilter.filter_message(" ".join(args)))

def enable_retries(self, max_retries: int = 4, retry_interval: float = 30.0) -> None:
"""Enable automatic retries on the underlying http client used by the BaseService instance.
Expand Down
62 changes: 61 additions & 1 deletion ibm_cloud_sdk_core/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,71 @@
# limitations under the License.

import logging
import re


# This is the name of the primary logger used by the library.
LOGGER_NAME = 'ibm-cloud-sdk-core'
# Keywords that are redacted.
REDACTED_KEYWORDS = [
"apikey",
"api_key",
"passcode",
"password",
"token",
"aadClientId",
"aadClientSecret",
"auth",
"auth_provider_x509_cert_url",
"auth_uri",
"client_email",
"client_id",
"client_x509_cert_url",
"key",
"project_id",
"secret",
"subscriptionId",
"tenantId",
"thumbprint",
"token_uri",
]


class LoggingFilter:
"""Functions used to filter messages before they are logged."""

redacted_tokens = "|".join(REDACTED_KEYWORDS)
auth_header_pattern = re.compile(r"(?m)(Authorization|X-Auth\S*): ((.*?)(\r\n.*)|(.*))")
property_settings_pattern = re.compile(r"(?i)(" + redacted_tokens + r")=[^&]*(&|$)")
json_field_pattern = re.compile(r'(?i)"([^"]*(' + redacted_tokens + r')[^"_]*)":\s*"[^\,]*"')

@classmethod
def redact_secrets(cls, text: str) -> str:
"""Replaces values of potential secret keywords with a placeholder value.
Args:
text (str): the string to check and process

Returns:
str: the safe, redacted string with all secrets masked out
"""

placeholder = "[redacted]"
redacted = cls.auth_header_pattern.sub(r"\1: " + placeholder + r"\4", text)
redacted = cls.property_settings_pattern.sub(r"\1=" + placeholder + r"\2", redacted)
redacted = cls.json_field_pattern.sub(r'"\1":"' + placeholder + r'"', redacted)
return redacted

@classmethod
def filter_message(cls, s: str) -> str:
"""Filters 's' prior to logging it as a debug message"""
# Redact secrets
s = LoggingFilter.redact_secrets(s)

# Replace CRLF characters with an actual newline to make the message more readable.
s = s.replace('\\r\\n', '\n')
return s


def get_logger():
def get_logger() -> logging.Logger:
"""Returns the primary logger object instance used by the library."""
return logging.getLogger(LOGGER_NAME)
1 change: 0 additions & 1 deletion test/test_base_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,6 @@ def test_reserved_keys(caplog):
assert response.get_result().request.url == 'https://gateway.watsonplatform.net/test/api'
assert response.get_result().request.method == 'GET'
assert response.get_result().request.hooks == {'response': []}
print('caplog.record_tuples:', caplog.record_tuples)
assert caplog.record_tuples[0][2] == '"method" has been removed from the request'
assert caplog.record_tuples[1][2] == '"url" has been removed from the request'
assert caplog.record_tuples[2][2] == '"cookies" has been removed from the request'
Expand Down
46 changes: 5 additions & 41 deletions test/test_http_adapter.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
# pylint: disable=missing-docstring
import logging
import os
import threading
import warnings
from http.server import HTTPServer, SimpleHTTPRequestHandler
from ssl import SSLContext, PROTOCOL_TLSv1_1, PROTOCOL_TLSv1_2
from typing import Callable
from ssl import PROTOCOL_TLSv1_1, PROTOCOL_TLSv1_2

import pytest
import urllib3
from requests import exceptions

from ibm_cloud_sdk_core.base_service import BaseService
from ibm_cloud_sdk_core.authenticators import NoAuthAuthenticator
from .utils.logger_utils import setup_test_logger
from .utils.http_utils import local_server


setup_test_logger(logging.ERROR)

Expand All @@ -33,40 +30,7 @@
key = os.path.join(os.path.dirname(__file__), '../resources/test_ssl.key')


def _local_server(tls_version: int, port: int) -> Callable:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to a separate file, so that it's easier to use it in other tests too.

def decorator(test_function: Callable) -> Callable:
def inner():
# Disable warnings caused by the self-signed certificate.
urllib3.disable_warnings()

# Build the SSL context for the server.
ssl_context = SSLContext(tls_version)
ssl_context.load_cert_chain(certfile=cert, keyfile=key)

# Create and start the server on a separate thread.
server = HTTPServer(('localhost', port), SimpleHTTPRequestHandler)
server.socket = ssl_context.wrap_socket(server.socket, server_side=True)
t = threading.Thread(target=server.serve_forever)
t.start()

# We run everything in a big try-except-finally block to make sure we always
# shutdown the HTTP server gracefully.
try:
test_function()
except Exception: # pylint: disable=try-except-raise
raise
finally:
server.shutdown()
t.join()
# Re-enable warnings.
warnings.resetwarnings()

return inner

return decorator


@_local_server(PROTOCOL_TLSv1_1, 3333)
@local_server(3333, PROTOCOL_TLSv1_1, cert, key)
def test_tls_v1_1():
service = BaseService(service_url='https://localhost:3333', authenticator=NoAuthAuthenticator())
prepped = service.prepare_request('GET', url='/')
Expand All @@ -78,7 +42,7 @@ def test_tls_v1_1():
assert exception.type is exceptions.SSLError or exception.type is exceptions.ConnectionError


@_local_server(PROTOCOL_TLSv1_2, 3334)
@local_server(3334, PROTOCOL_TLSv1_2, cert, key)
def test_tls_v1_2():
service = BaseService(service_url='https://localhost:3334', authenticator=NoAuthAuthenticator())

Expand Down
94 changes: 94 additions & 0 deletions test/test_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# coding: utf-8

# Copyright 2024 IBM All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=missing-docstring

import logging

from ibm_cloud_sdk_core import base_service
from ibm_cloud_sdk_core.authenticators import NoAuthAuthenticator
from ibm_cloud_sdk_core.logger import LoggingFilter
from .utils.http_utils import local_server


def test_redact_secrets():
redact_secrets = LoggingFilter.redact_secrets

assert "secret" not in redact_secrets("Authorization: Bearer secret")
assert "secret" not in redact_secrets("Authorization: Basic secret")
assert "secret" not in redact_secrets("X-Authorization: secret")
assert "secret" not in redact_secrets("ApIKey=secret")
assert "secret" not in redact_secrets("ApI_Key=secret")
assert "secret" not in redact_secrets("passCode=secret")
assert "secret" not in redact_secrets("PASSword=secret")
assert "secret" not in redact_secrets("toKen=secret")
assert "secret" not in redact_secrets("client_id=secret")
assert "secret" not in redact_secrets("client_x509_cert_url=secret")
assert "secret" not in redact_secrets("client_id=secret")
assert "secret" not in redact_secrets("key=secret")
assert "secret" not in redact_secrets("project_id=secret")
assert "DaSecret" not in redact_secrets("secret=DaSecret")
assert "secret" not in redact_secrets("subscriptionId=secret")
assert "secret" not in redact_secrets("tenantId=secret")
assert "secret" not in redact_secrets("thumbprint=secret")
assert "secret" not in redact_secrets("token_uri=secret")
assert "secret" not in redact_secrets('xxx "apIKEy": "secret",xxx')
assert "secret" not in redact_secrets('xxx "apI_KEy": "secret",xxx')
assert "secret" not in redact_secrets('xxx "pAsSCoDe": "secret",xxx')
assert "secret" not in redact_secrets('xxx "passWORD": "secret",xxx')
assert "secret" not in redact_secrets('xxx {"token": "secret"},xxx')
assert "secret" not in redact_secrets('xxx "aadClientId": "secret",xxx')
assert "secret" not in redact_secrets('xxx "aadClientSecret": "secret",xxx')
assert "secret" not in redact_secrets('xxx "auth": "secret",xxx')
assert "secret" not in redact_secrets('xxx "auth_provider_x509_cert_url": "secret",xxx')
assert "secret" not in redact_secrets('xxx "auth_uri": "secret",xxx')
assert "secret" not in redact_secrets('xxx "client_email": "secret",xxx')
# Now use a real-world example, to validate the correct behavior.
assert (
redact_secrets(
'GET / HTTP/1.1\r\nHost: localhost:3335\r\n'
+ 'User-Agent: ibm-python-sdk-core-3.20.6 os.name=Darwin os.version=23.6.0 python.version=3.11.10\r\n'
+ 'Accept-Encoding: gzip, deflate\r\nAccept: */*\r\nAuthorization: token-foo-bar-123\r\n'
+ 'Connection: keep-alive\r\n\r\n'
)
== 'GET / HTTP/1.1\r\nHost: localhost:3335\r\n'
+ 'User-Agent: ibm-python-sdk-core-3.20.6 os.name=Darwin os.version=23.6.0 python.version=3.11.10\r\n'
+ 'Accept-Encoding: gzip, deflate\r\nAccept: */*\r\nAuthorization: [redacted]\r\nConnection: keep-alive\r\n\r\n'
)


# Simulate a real-world scenario.
@local_server(3335)
def test_redact_secrets_log(caplog):
# Since we use a real BaseService here, we need to set the logging level
# to DEBUG in its module, to simulate the real behavior.
original_logging_level = base_service.logger.level
base_service.logger.setLevel(logging.DEBUG)

try:
service = base_service.BaseService(service_url="http://localhost:3335", authenticator=NoAuthAuthenticator())
prepped = service.prepare_request('GET', url='/', headers={'Authorization': 'token-foo-bar-123'})
res = service.send(prepped)
except Exception as ex:
raise ex
finally:
# And now we restore the logger's level to the original value.
base_service.logger.setLevel(original_logging_level)

assert res is not None
# Make sure the request has been logged and the token is redacted.
assert "Authorization" in caplog.text
assert "token" not in caplog.text
66 changes: 66 additions & 0 deletions test/utils/http_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# coding: utf-8

# Copyright 2024 IBM All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
import threading
import warnings
from typing import Callable, Optional
from http.server import HTTPServer, SimpleHTTPRequestHandler
from ssl import SSLContext

import urllib3


def local_server(
port: int, tls_version: Optional[int] = None, cert: Optional[str] = None, key: Optional[str] = None
) -> Callable:
"""local_server helps setting up and running an HTTP(S) server for testing purposes."""

def decorator(test_function: Callable) -> Callable:
@functools.wraps(test_function)
def inner(*args, **kwargs):
is_https = tls_version and cert and key
# Disable warnings caused by the self-signed certificate.
urllib3.disable_warnings()

if is_https:
# Build the SSL context for the server.
ssl_context = SSLContext(tls_version)
ssl_context.load_cert_chain(certfile=cert, keyfile=key)

# Create and start the server on a separate thread.
server = HTTPServer(('localhost', port), SimpleHTTPRequestHandler)
if is_https:
server.socket = ssl_context.wrap_socket(server.socket, server_side=True)

t = threading.Thread(target=server.serve_forever)
t.start()

# We run everything in a big try-except-finally block to make sure we always
# shutdown the HTTP server gracefully.
try:
test_function(*args, **kwargs)
except Exception: # pylint: disable=try-except-raise
raise
finally:
server.shutdown()
t.join()
# Re-enable warnings.
warnings.resetwarnings()

return inner

return decorator
10 changes: 7 additions & 3 deletions test/utils/logger_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
# limitations under the License.

import logging
from http.client import HTTPConnection
from ibm_cloud_sdk_core.logger import get_logger
from http import client
from ibm_cloud_sdk_core.logger import (
get_logger,
LoggingFilter,
)


def setup_test_logger(level: int):
Expand All @@ -27,4 +30,5 @@ def setup_test_logger(level: int):

# If debug logging is requested, then trigger HTTP message logging as well.
if logger.isEnabledFor(logging.DEBUG):
HTTPConnection.debuglevel = 1
client.HTTPConnection.debuglevel = 1
client.print = lambda *args: logger.debug(LoggingFilter.filter_message(" ".join(args)))