Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 37 additions & 22 deletions workers/proxy_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,22 @@
from proxy_worker.version import VERSION
from .utils.dependency import DependencyManager

# Cache protobuf constants to avoid repeated lookups in hot paths
_RpcLog = protos.RpcLog
_LOG_LEVEL_CRITICAL = _RpcLog.Critical
_LOG_LEVEL_ERROR = _RpcLog.Error
_LOG_LEVEL_WARNING = _RpcLog.Warning
_LOG_LEVEL_INFO = _RpcLog.Information
_LOG_LEVEL_DEBUG = _RpcLog.Debug
_LOG_LEVEL_NONE = getattr(_RpcLog, 'None')

_RpcLogCategory = _RpcLog.RpcLogCategory
_LOG_CATEGORY_SYSTEM = _RpcLogCategory.Value('System')
_LOG_CATEGORY_USER = _RpcLogCategory.Value('User')

# Library worker import reloaded in init and reload request
_library_worker = None
_library_worker_has_cv = False

# Thread-local invocation ID registry for efficient lookup
_thread_invocation_registry: typing.Dict[int, str] = {}
Expand Down Expand Up @@ -99,9 +113,9 @@ def get_global_current_invocation_id() -> Optional[str]:


def get_current_invocation_id() -> Optional[Any]:
global _library_worker
global _library_worker, _library_worker_has_cv
# Check global current invocation first (most up-to-date)
if _library_worker and not hasattr(_library_worker, 'invocation_id_cv'):
if _library_worker and not _library_worker_has_cv:
global_invocation_id = get_global_current_invocation_id()
if global_invocation_id is not None:
return global_invocation_id
Expand All @@ -121,23 +135,22 @@ def get_current_invocation_id() -> Optional[Any]:
# No event loop running
pass

# Check contextvar from library worker
if _library_worker and _library_worker_has_cv:
try:
cv = _library_worker.invocation_id_cv
val = cv.get()
if val is not None:
return val
except (AttributeError, LookupError):
pass

# Check the thread-local invocation ID registry
current_thread_id = threading.get_ident()
thread_invocation_id = get_thread_invocation_id(current_thread_id)
if thread_invocation_id is not None:
return thread_invocation_id

# Check contextvar from library worker
if _library_worker:
try:
cv = getattr(_library_worker, 'invocation_id_cv', None)
if cv:
val = cv.get()
if val is not None:
return val
except (AttributeError, LookupError):
pass

return getattr(_invocation_id_local, 'invocation_id', None)


Expand Down Expand Up @@ -204,22 +217,22 @@ def __init__(self, loop: AbstractEventLoop, host: str, port: int,
def on_logging(self, record: logging.LogRecord,
formatted_msg: str) -> None:
if record.levelno >= logging.CRITICAL:
log_level = protos.RpcLog.Critical
log_level = _LOG_LEVEL_CRITICAL
elif record.levelno >= logging.ERROR:
log_level = protos.RpcLog.Error
log_level = _LOG_LEVEL_ERROR
elif record.levelno >= logging.WARNING:
log_level = protos.RpcLog.Warning
log_level = _LOG_LEVEL_WARNING
elif record.levelno >= logging.INFO:
log_level = protos.RpcLog.Information
log_level = _LOG_LEVEL_INFO
elif record.levelno >= logging.DEBUG:
log_level = protos.RpcLog.Debug
log_level = _LOG_LEVEL_DEBUG
else:
log_level = getattr(protos.RpcLog, 'None')
log_level = _LOG_LEVEL_NONE

if is_system_log_category(record.name):
log_category = protos.RpcLog.RpcLogCategory.Value('System')
log_category = _LOG_CATEGORY_SYSTEM
else: # customers using logging will yield 'root' in record.name
log_category = protos.RpcLog.RpcLogCategory.Value('User')
log_category = _LOG_CATEGORY_USER

log = dict(
level=log_level,
Expand Down Expand Up @@ -404,12 +417,13 @@ def stop(self) -> None:

@staticmethod
def reload_library_worker(directory: str):
global _library_worker
global _library_worker, _library_worker_has_cv
v2_scriptfile = os.path.join(directory, get_script_file_name())
if os.path.exists(v2_scriptfile):
try:
import azure_functions_runtime # NoQA
_library_worker = azure_functions_runtime
_library_worker_has_cv = hasattr(_library_worker, 'invocation_id_cv')
logger.debug("azure_functions_runtime import succeeded: %s",
_library_worker.__file__)
except ImportError:
Expand All @@ -419,6 +433,7 @@ def reload_library_worker(directory: str):
try:
import azure_functions_runtime_v1 # NoQA
_library_worker = azure_functions_runtime_v1
_library_worker_has_cv = hasattr(_library_worker, 'invocation_id_cv')
logger.debug("azure_functions_runtime_v1 import succeeded: %s",
_library_worker.__file__) # type: ignore[union-attr]
except ImportError:
Expand Down
Loading