Skip to content
2 changes: 2 additions & 0 deletions azure_functions_worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

__version__ = '1.1.10'
25 changes: 23 additions & 2 deletions azure_functions_worker/bindings/tracecontext.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,44 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from typing import Dict


class TraceContext:
"""Check https://www.w3.org/TR/trace-context/ for more information"""

def __init__(self, trace_parent: str,
trace_state: str, attributes: dict) -> None:
trace_state: str, attributes: Dict[str, str]) -> None:
self.__trace_parent = trace_parent
self.__trace_state = trace_state
self.__attributes = attributes

@property
def Tracestate(self) -> str:
"""Get trace state from trace-context (deprecated)."""
return self.__trace_state

@property
def Traceparent(self) -> str:
"""Get trace parent from trace-context (deprecated)."""
return self.__trace_parent

@property
def Attributes(self) -> Dict[str, str]:
"""Get trace-context attributes (deprecated)."""
return self.__attributes

@property
def trace_state(self) -> str:
"""Get trace state from trace-context"""
return self.__trace_state

@property
def trace_parent(self) -> str:
"""Get trace parent from trace-context"""
return self.__trace_parent

@property
def Attributes(self) -> str:
def attributes(self) -> Dict[str, str]:
"""Get trace-context attributes"""
return self.__attributes
6 changes: 3 additions & 3 deletions azure_functions_worker/constants.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

# Prefixes
CONSOLE_LOG_PREFIX = "LanguageWorkerConsoleLog"

# Capabilities
RAW_HTTP_BODY_BYTES = "RawHttpBodyBytes"
TYPED_DATA_COLLECTION = "TypedDataCollection"
Expand All @@ -26,6 +23,7 @@
PYTHON_ROLLBACK_CWD_PATH = "PYTHON_ROLLBACK_CWD_PATH"
PYTHON_THREADPOOL_THREAD_COUNT = "PYTHON_THREADPOOL_THREAD_COUNT"
PYTHON_ISOLATE_WORKER_DEPENDENCIES = "PYTHON_ISOLATE_WORKER_DEPENDENCIES"
PYTHON_ENABLE_WORKER_EXTENSIONS = "PYTHON_ENABLE_WORKER_EXTENSIONS"
FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED = \
"FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED"
"""
Expand All @@ -40,6 +38,8 @@
PYTHON_THREADPOOL_THREAD_COUNT_MAX = 32
PYTHON_ISOLATE_WORKER_DEPENDENCIES_DEFAULT = False
PYTHON_ISOLATE_WORKER_DEPENDENCIES_DEFAULT_39 = True
PYTHON_ENABLE_WORKER_EXTENSIONS_DEFAULT = False
PYTHON_ENABLE_WORKER_EXTENSIONS_DEFAULT_39 = True

# External Site URLs
MODULE_NOT_FOUND_TS_URL = "https://aka.ms/functions-modulenotfound"
39 changes: 29 additions & 10 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@

import grpc

from . import __version__
from . import bindings
from . import constants
from . import functions
from . import loader
from . import protos
from .constants import (CONSOLE_LOG_PREFIX, PYTHON_THREADPOOL_THREAD_COUNT,
from .constants import (PYTHON_THREADPOOL_THREAD_COUNT,
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
PYTHON_THREADPOOL_THREAD_COUNT_MAX,
PYTHON_THREADPOOL_THREAD_COUNT_MIN)
from .logging import disable_console_logging, enable_console_logging
from .logging import error_logger, is_system_log_category, logger
from .logging import (logger, error_logger, is_system_log_category,
CONSOLE_LOG_PREFIX)
from .extension import ExtensionManager
from .utils.common import get_app_setting
from .utils.tracing import marshall_exception_trace
from .utils.dependency import DependencyManager
Expand Down Expand Up @@ -255,8 +258,9 @@ async def _dispatch_grpc_request(self, request):
self._grpc_resp_queue.put_nowait(resp)

async def _handle__worker_init_request(self, req):
logger.info('Received WorkerInitRequest, request ID %s',
self.request_id)
logger.info('Received WorkerInitRequest, '
'python version %s, worker version %s, request ID %s',
sys.version, __version__, self.request_id)

capabilities = {
constants.RAW_HTTP_BODY_BYTES: _TRUE,
Expand Down Expand Up @@ -304,6 +308,11 @@ async def _handle__function_load_request(self, req):
self._functions.add_function(
function_id, func, func_request.metadata)

ExtensionManager.function_load_extension(
function_name,
func_request.metadata.directory
)

logger.info('Successfully processed FunctionLoadRequest, '
f'request ID: {self.request_id}, '
f'function ID: {function_id},'
Expand Down Expand Up @@ -373,20 +382,24 @@ async def _handle__invocation_request(self, req):
pytype=pb_type_info.pytype,
shmem_mgr=self._shmem_mgr)

fi_context = bindings.Context(
fi.name, fi.directory, invocation_id, trace_context)
if fi.requires_context:
args['context'] = bindings.Context(
fi.name, fi.directory, invocation_id, trace_context)
args['context'] = fi_context

if fi.output_types:
for name in fi.output_types:
args[name] = bindings.Out()

if fi.is_async:
call_result = await fi.func(**args)
call_result = await self._run_async_func(
fi_context, fi.func, args
)
else:
call_result = await self._loop.run_in_executor(
self._sync_call_tp,
self.__run_sync_func, invocation_id, fi.func, args)
self._run_sync_func,
invocation_id, fi_context, fi.func, args)
if call_result is not None and not fi.has_return:
raise RuntimeError(f'function {fi.name!r} without a $return '
'binding returned a non-None value')
Expand Down Expand Up @@ -582,15 +595,21 @@ def _create_sync_call_tp(
max_workers=max_worker
)

def __run_sync_func(self, invocation_id, func, params):
def _run_sync_func(self, invocation_id, context, func, params):
# This helper exists because we need to access the current
# invocation_id from ThreadPoolExecutor's threads.
_invocation_id_local.v = invocation_id
try:
return func(**params)
return ExtensionManager.get_sync_invocation_wrapper(context,
func)(params)
finally:
_invocation_id_local.v = None

async def _run_async_func(self, context, func, params):
return await ExtensionManager.get_async_invocation_wrapper(
context, func, params
)

def __poll_grpc(self):
options = []
if self._grpc_max_msg_len:
Expand Down
Loading