From 571841f4a733caaabe4e46f4f0959b5ade3bc1e9 Mon Sep 17 00:00:00 2001 From: ali Date: Wed, 2 Jul 2025 12:28:10 +0530 Subject: [PATCH 1/4] UN-2579 [FIX] Fixed MIME type validation to auto-detect file types instead of checking content-type header MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../workflow_manager/endpoint_v2/source.py | 66 ++++++++++++++----- 1 file changed, 50 insertions(+), 16 deletions(-) diff --git a/backend/workflow_manager/endpoint_v2/source.py b/backend/workflow_manager/endpoint_v2/source.py index 8df0d0953c..1880ce7d42 100644 --- a/backend/workflow_manager/endpoint_v2/source.py +++ b/backend/workflow_manager/endpoint_v2/source.py @@ -66,6 +66,25 @@ class SourceConnector(BaseConnector): READ_CHUNK_SIZE = 4194304 # Chunk size for reading files + @classmethod + def _detect_mime_type(cls, chunk: bytes, file_name: str) -> str: + """Detect MIME type from file chunk using Python Magic. + + Args: + chunk (bytes): File chunk to analyze + file_name (str): Name of the file being processed + + Returns: + str: Detected MIME type (may be unsupported) + """ + # Primary MIME type detection using Python Magic + mime_type = magic.from_buffer(chunk, mime=True) + logger.info( + f"Detected MIME type using Python Magic: {mime_type} for file {file_name}" + ) + + return mime_type + def __init__( self, workflow: Workflow, @@ -919,22 +938,45 @@ def add_input_file_to_api_storage( workflow_id=workflow_id, execution_id=execution_id ) workflow: Workflow = Workflow.objects.get(id=workflow_id) - file_hashes: dict[str, FileHash] = {} + file_hashes_objects: dict[str, FileHash] = {} unique_file_hashes: set[str] = set() connection_type = WorkflowEndpoint.ConnectionType.API for file in file_objs: file_name = file.name destination_path = os.path.join(api_storage_dir, file_name) + file_system = FileSystem(FileStorageType.API_EXECUTION) + file_storage = file_system.get_file_storage() + file_hash = sha256() + first_iteration = True mime_type = file.content_type - logger.info(f"Detected MIME type: {mime_type} for file {file_name}") - if not AllowedFileTypes.is_allowed(mime_type): + logger.info(f"Detected MIME type from content type header: {mime_type} for file {file_name}") + + # Ensure file is at beginning + file.seek(0) + + try: + for chunk in file.chunks(chunk_size=cls.READ_CHUNK_SIZE): + if first_iteration: + mime_type = cls._detect_mime_type(chunk, file_name) + # Validate MIME type + if not AllowedFileTypes.is_allowed(mime_type): + raise UnsupportedMimeTypeError( + f"Unsupported MIME type '{mime_type}' for file '{file_name}'" + ) + first_iteration = False + + # Process each chunk + file_hash.update(chunk) + file_storage.write(path=destination_path, mode="ab", data=chunk) + file_hash = file_hash.hexdigest() + except UnsupportedMimeTypeError: log_message = f"Skipping file '{file_name}' to stage due to unsupported MIME type '{mime_type}'" workflow_log.log_info(logger=logger, message=log_message) # Generate a clearly marked temporary hash to avoid reading the file content # Helps to prevent duplicate entries in file executions fake_hash = f"temp-hash-{uuid.uuid4().hex}" - file_hash = FileHash( + file_hash_object = FileHash( file_path=destination_path, source_connection_type=connection_type, file_name=file_name, @@ -943,17 +985,9 @@ def add_input_file_to_api_storage( file_size=file.size, mime_type=mime_type, ) - file_hashes.update({file_name: file_hash}) + file_hashes_objects.update({file_name: file_hash_object}) continue - file_system = FileSystem(FileStorageType.API_EXECUTION) - file_storage = file_system.get_file_storage() - file_hash = sha256() - for chunk in file.chunks(chunk_size=cls.READ_CHUNK_SIZE): - file_hash.update(chunk) - file_storage.write(path=destination_path, mode="ab", data=chunk) - file_hash = file_hash.hexdigest() - # Skip duplicate files if file_hash in unique_file_hashes: log_message = f"Skipping file '{file_name}' — duplicate detected within the current request. Already staged for processing." @@ -967,7 +1001,7 @@ def add_input_file_to_api_storage( workflow=workflow, cache_key=file_hash ) is_executed = True if file_history and file_history.is_completed() else False - file_hash = FileHash( + file_hash_object = FileHash( file_path=destination_path, source_connection_type=connection_type, file_name=file_name, @@ -976,8 +1010,8 @@ def add_input_file_to_api_storage( file_size=file.size, mime_type=mime_type, ) - file_hashes.update({file_name: file_hash}) - return file_hashes + file_hashes_objects.update({file_name: file_hash_object}) + return file_hashes_objects @classmethod def create_endpoint_for_workflow( From 2d0c5c0c8d63b6610b655a6ff823dfba90ce4ccc Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 2 Jul 2025 07:01:48 +0000 Subject: [PATCH 2/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- backend/workflow_manager/endpoint_v2/source.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/backend/workflow_manager/endpoint_v2/source.py b/backend/workflow_manager/endpoint_v2/source.py index 1880ce7d42..01e1d7dcb5 100644 --- a/backend/workflow_manager/endpoint_v2/source.py +++ b/backend/workflow_manager/endpoint_v2/source.py @@ -69,11 +69,11 @@ class SourceConnector(BaseConnector): @classmethod def _detect_mime_type(cls, chunk: bytes, file_name: str) -> str: """Detect MIME type from file chunk using Python Magic. - + Args: chunk (bytes): File chunk to analyze file_name (str): Name of the file being processed - + Returns: str: Detected MIME type (may be unsupported) """ @@ -82,7 +82,7 @@ def _detect_mime_type(cls, chunk: bytes, file_name: str) -> str: logger.info( f"Detected MIME type using Python Magic: {mime_type} for file {file_name}" ) - + return mime_type def __init__( @@ -950,11 +950,13 @@ def add_input_file_to_api_storage( file_hash = sha256() first_iteration = True mime_type = file.content_type - logger.info(f"Detected MIME type from content type header: {mime_type} for file {file_name}") + logger.info( + f"Detected MIME type from content type header: {mime_type} for file {file_name}" + ) # Ensure file is at beginning file.seek(0) - + try: for chunk in file.chunks(chunk_size=cls.READ_CHUNK_SIZE): if first_iteration: @@ -965,7 +967,7 @@ def add_input_file_to_api_storage( f"Unsupported MIME type '{mime_type}' for file '{file_name}'" ) first_iteration = False - + # Process each chunk file_hash.update(chunk) file_storage.write(path=destination_path, mode="ab", data=chunk) From fe0e963afd85c26e41c1cb434b786bfea0a97049 Mon Sep 17 00:00:00 2001 From: ali Date: Wed, 2 Jul 2025 12:43:07 +0530 Subject: [PATCH 3/4] reduced complexity of method add_input_file_to_api_storage --- .../workflow_manager/endpoint_v2/source.py | 210 +++++++++++++----- 1 file changed, 151 insertions(+), 59 deletions(-) diff --git a/backend/workflow_manager/endpoint_v2/source.py b/backend/workflow_manager/endpoint_v2/source.py index 1880ce7d42..9d3feab86f 100644 --- a/backend/workflow_manager/endpoint_v2/source.py +++ b/backend/workflow_manager/endpoint_v2/source.py @@ -69,11 +69,11 @@ class SourceConnector(BaseConnector): @classmethod def _detect_mime_type(cls, chunk: bytes, file_name: str) -> str: """Detect MIME type from file chunk using Python Magic. - + Args: chunk (bytes): File chunk to analyze file_name (str): Name of the file being processed - + Returns: str: Detected MIME type (may be unsupported) """ @@ -82,7 +82,7 @@ def _detect_mime_type(cls, chunk: bytes, file_name: str) -> str: logger.info( f"Detected MIME type using Python Magic: {mime_type} for file {file_name}" ) - + return mime_type def __init__( @@ -904,6 +904,117 @@ def load_file(self, input_file_path: str) -> tuple[str, BytesIO]: return os.path.basename(input_file_path), file_stream + @classmethod + def _process_file_chunks( + cls, file: UploadedFile, file_storage, destination_path: str + ) -> tuple[str, str, bool]: + """Process file chunks and detect MIME type. + + Args: + file: The uploaded file to process + file_storage: File storage instance + destination_path: Path where file should be stored + + Returns: + tuple: (file_hash, mime_type, success) where success indicates if processing completed + """ + file_hash = sha256() + first_iteration = True + mime_type = file.content_type + + file.seek(0) + + try: + for chunk in file.chunks(chunk_size=cls.READ_CHUNK_SIZE): + if first_iteration: + mime_type = cls._detect_mime_type(chunk, file.name) + if not AllowedFileTypes.is_allowed(mime_type): + raise UnsupportedMimeTypeError( + f"Unsupported MIME type '{mime_type}' for file '{file.name}'" + ) + first_iteration = False + + file_hash.update(chunk) + file_storage.write(path=destination_path, mode="ab", data=chunk) + + return file_hash.hexdigest(), mime_type, True + + except UnsupportedMimeTypeError: + return "", mime_type, False + + @classmethod + def _create_file_hash_object( + cls, + file_path: str, + connection_type, + file_name: str, + file_hash: str, + is_executed: bool, + file_size: int, + mime_type: str, + ) -> FileHash: + """Create FileHash object with provided parameters.""" + return FileHash( + file_path=file_path, + source_connection_type=connection_type, + file_name=file_name, + file_hash=file_hash, + is_executed=is_executed, + file_size=file_size, + mime_type=mime_type, + ) + + @classmethod + def _handle_unsupported_file( + cls, + file_name: str, + mime_type: str, + destination_path: str, + connection_type, + file_size: int, + workflow_log, + ) -> FileHash: + """Handle files with unsupported MIME types.""" + log_message = f"Skipping file '{file_name}' to stage due to unsupported MIME type '{mime_type}'" + workflow_log.log_info(logger=logger, message=log_message) + + fake_hash = f"temp-hash-{uuid.uuid4().hex}" + return cls._create_file_hash_object( + file_path=destination_path, + connection_type=connection_type, + file_name=file_name, + file_hash=fake_hash, + is_executed=True, + file_size=file_size, + mime_type=mime_type, + ) + + @classmethod + def _check_duplicate_file( + cls, file_hash: str, unique_file_hashes: set[str], file_name: str, workflow_log + ) -> bool: + """Check if file is duplicate and log if needed.""" + if file_hash in unique_file_hashes: + log_message = f"Skipping file '{file_name}' — duplicate detected within the current request. Already staged for processing." + workflow_log.log_info(logger=logger, message=log_message) + return True + + unique_file_hashes.add(file_hash) + return False + + @classmethod + def _get_execution_status( + cls, use_file_history: bool, workflow, file_hash: str + ) -> bool: + """Get execution status for file based on history.""" + if not use_file_history: + return False + + file_history = FileHistoryHelper.get_file_history( + workflow=workflow, cache_key=file_hash + ) + return True if file_history and file_history.is_completed() else False + @classmethod def add_input_file_to_api_storage( cls, @@ -941,76 +1052,57 @@ def add_input_file_to_api_storage( file_hashes_objects: dict[str, FileHash] = {} unique_file_hashes: set[str] = set() connection_type = WorkflowEndpoint.ConnectionType.API + for file in file_objs: file_name = file.name destination_path = os.path.join(api_storage_dir, file_name) + logger.info( + f"Detected MIME type from content type header: {file.content_type} for file {file_name}" + ) + file_system = FileSystem(FileStorageType.API_EXECUTION) file_storage = file_system.get_file_storage() - file_hash = sha256() - first_iteration = True - mime_type = file.content_type - logger.info(f"Detected MIME type from content type header: {mime_type} for file {file_name}") - - # Ensure file is at beginning - file.seek(0) - - try: - for chunk in file.chunks(chunk_size=cls.READ_CHUNK_SIZE): - if first_iteration: - mime_type = cls._detect_mime_type(chunk, file_name) - # Validate MIME type - if not AllowedFileTypes.is_allowed(mime_type): - raise UnsupportedMimeTypeError( - f"Unsupported MIME type '{mime_type}' for file '{file_name}'" - ) - first_iteration = False - - # Process each chunk - file_hash.update(chunk) - file_storage.write(path=destination_path, mode="ab", data=chunk) - file_hash = file_hash.hexdigest() - except UnsupportedMimeTypeError: - log_message = f"Skipping file '{file_name}' to stage due to unsupported MIME type '{mime_type}'" - workflow_log.log_info(logger=logger, message=log_message) - # Generate a clearly marked temporary hash to avoid reading the file content - # Helps to prevent duplicate entries in file executions - fake_hash = f"temp-hash-{uuid.uuid4().hex}" - file_hash_object = FileHash( - file_path=destination_path, - source_connection_type=connection_type, - file_name=file_name, - file_hash=fake_hash, - is_executed=True, - file_size=file.size, - mime_type=mime_type, + + # Process file chunks and detect MIME type + file_hash, mime_type, success = cls._process_file_chunks( + file, file_storage, destination_path + ) + + # Handle unsupported files + if not success: + file_hash_object = cls._handle_unsupported_file( + file_name, + mime_type, + destination_path, + connection_type, + file.size, + workflow_log, ) file_hashes_objects.update({file_name: file_hash_object}) continue - # Skip duplicate files - if file_hash in unique_file_hashes: - log_message = f"Skipping file '{file_name}' — duplicate detected within the current request. Already staged for processing." - workflow_log.log_info(logger=logger, message=log_message) + # Check for duplicates + if cls._check_duplicate_file( + file_hash, unique_file_hashes, file_name, workflow_log + ): continue - unique_file_hashes.add(file_hash) - file_history = None - if use_file_history: - file_history = FileHistoryHelper.get_file_history( - workflow=workflow, cache_key=file_hash - ) - is_executed = True if file_history and file_history.is_completed() else False - file_hash_object = FileHash( - file_path=destination_path, - source_connection_type=connection_type, - file_name=file_name, - file_hash=file_hash, - is_executed=is_executed, - file_size=file.size, - mime_type=mime_type, + # Get execution status + is_executed = cls._get_execution_status(use_file_history, workflow, file_hash) + + # Create file hash object + file_hash_object = cls._create_file_hash_object( + destination_path, + connection_type, + file_name, + file_hash, + is_executed, + file.size, + mime_type, ) file_hashes_objects.update({file_name: file_hash_object}) + return file_hashes_objects @classmethod From dd9192f86d89f97302e56485af37828fbe2aed47 Mon Sep 17 00:00:00 2001 From: ali Date: Wed, 2 Jul 2025 18:23:38 +0530 Subject: [PATCH 4/4] addressing PR reviews --- backend/backend/settings/base.py | 3 + backend/sample.env | 4 + backend/workflow_manager/endpoint_v2/enums.py | 13 ++ .../workflow_manager/endpoint_v2/source.py | 131 +++++++++++------- 4 files changed, 103 insertions(+), 48 deletions(-) diff --git a/backend/backend/settings/base.py b/backend/backend/settings/base.py index 5fbfa71452..3e07773ca4 100644 --- a/backend/backend/settings/base.py +++ b/backend/backend/settings/base.py @@ -165,6 +165,9 @@ def get_required_setting(setting_key: str, default: str | None = None) -> str | ATOMIC_REQUESTS = CommonUtils.str_to_bool( os.environ.get("DJANGO_ATOMIC_REQUESTS", "False") ) +MAX_FILE_SIZE_LIMIT_TO_READ = int( + os.environ.get("MAX_FILE_SIZE_LIMIT_TO_READ", 100 * 1024 * 1024) +) # 100MB limit for full file analysis # Flag to Enable django admin ADMIN_ENABLED = False diff --git a/backend/sample.env b/backend/sample.env index 5f5219c7d4..5637ae316f 100644 --- a/backend/sample.env +++ b/backend/sample.env @@ -186,3 +186,7 @@ FILE_EXECUTION_TRACKER_COMPLETED_TTL_IN_SECOND=600 # 10 minutes # Runner polling timeout MAX_RUNNER_POLLING_WAIT_SECONDS=10800 # 3 hours RUNNER_POLLING_INTERVAL_SECONDS=2 # 2 seconds + + +# Max file size limit to read +MAX_FILE_SIZE_LIMIT_TO_READ=100 * 1024 * 1024 # 100MB limit for full file analysis diff --git a/backend/workflow_manager/endpoint_v2/enums.py b/backend/workflow_manager/endpoint_v2/enums.py index cd6cd9f585..5e8fd6bbfb 100644 --- a/backend/workflow_manager/endpoint_v2/enums.py +++ b/backend/workflow_manager/endpoint_v2/enums.py @@ -26,3 +26,16 @@ class AllowedFileTypes(Enum): @classmethod def is_allowed(cls, mime_type: str) -> bool: return mime_type in cls._value2member_map_ + + +class InvalidFileTypes(Enum): + OCTET_STREAM = "application/octet-stream" + + +class UncertainMimeTypes(Enum): + OCTET_STREAM = "application/octet-stream" + ZIP = "application/zip" + + @classmethod + def is_uncertain(cls, mime_type: str) -> bool: + return mime_type in cls._value2member_map_ diff --git a/backend/workflow_manager/endpoint_v2/source.py b/backend/workflow_manager/endpoint_v2/source.py index 9d3feab86f..4c6e04310c 100644 --- a/backend/workflow_manager/endpoint_v2/source.py +++ b/backend/workflow_manager/endpoint_v2/source.py @@ -12,6 +12,7 @@ import magic from connector_processor.constants import ConnectorKeys from connector_v2.models import ConnectorInstance +from django.conf import settings from django.core.files.uploadedfile import UploadedFile from django.db.models import Q from utils.user_context import UserContext @@ -24,7 +25,7 @@ SourceKey, ) from workflow_manager.endpoint_v2.dto import FileHash, SourceConfig -from workflow_manager.endpoint_v2.enums import AllowedFileTypes +from workflow_manager.endpoint_v2.enums import AllowedFileTypes, UncertainMimeTypes from workflow_manager.endpoint_v2.exceptions import ( InvalidInputDirectory, InvalidSourceConnectionType, @@ -64,26 +65,10 @@ class SourceConnector(BaseConnector): workflow (Workflow): The workflow associated with the source connector. """ - READ_CHUNK_SIZE = 4194304 # Chunk size for reading files - - @classmethod - def _detect_mime_type(cls, chunk: bytes, file_name: str) -> str: - """Detect MIME type from file chunk using Python Magic. - - Args: - chunk (bytes): File chunk to analyze - file_name (str): Name of the file being processed - - Returns: - str: Detected MIME type (may be unsupported) - """ - # Primary MIME type detection using Python Magic - mime_type = magic.from_buffer(chunk, mime=True) - logger.info( - f"Detected MIME type using Python Magic: {mime_type} for file {file_name}" - ) - - return mime_type + READ_CHUNK_SIZE = 4194304 # 4MB chunk size for reading files + MAX_FILE_SIZE_LIMIT_TO_READ = ( + settings.MAX_FILE_SIZE_LIMIT_TO_READ + ) # limit for full file analysis def __init__( self, @@ -905,10 +890,64 @@ def load_file(self, input_file_path: str) -> tuple[str, BytesIO]: return os.path.basename(input_file_path), file_stream @classmethod - def _process_file_chunks( + def _detect_mime_type( + cls, file: UploadedFile, first_chunk: bytes + ) -> tuple[str, bool]: + """Enhanced MIME type detection with smart fallback for uncertain types. + + Args: + file: The uploaded file + first_chunk: First chunk of file data + + Returns: + tuple: (mime_type, is_supported) where is_supported indicates if type is allowed + """ + mime_type = file.content_type + logger.info(f"Content-Type header: {mime_type} for file {file.name}") + + # Primary detection using python-magic with first chunk + mime_type = magic.from_buffer(first_chunk, mime=True) + logger.info( + f"Detected MIME type using Python Magic: {mime_type} for file {file.name}" + ) + + # Smart fallback: if uncertain type and file is small enough, analyze full file + if ( + UncertainMimeTypes.is_uncertain(mime_type) + and cls.READ_CHUNK_SIZE < file.size <= cls.MAX_FILE_SIZE_LIMIT_TO_READ + ): + logger.info( + f"Uncertain MIME type '{mime_type}', trying full file analysis for {file.name}" + ) + + # Read full file content + file.seek(0) + full_content = file.read() + file.seek(0) # Reset for subsequent processing + + # Analyze full file content + fallback_mime_type = magic.from_buffer(full_content, mime=True) + + # Only use fallback result if it's more specific than chunk result + if fallback_mime_type not in UncertainMimeTypes: + mime_type = fallback_mime_type + logger.info( + f"Full file analysis detected: {mime_type} for file {file.name}" + ) + else: + logger.info( + f"Full file analysis still uncertain, keeping: {mime_type} for file {file.name}" + ) + + # Check if detected type is supported + is_supported = AllowedFileTypes.is_allowed(mime_type) + return mime_type, is_supported + + @classmethod + def _validate_and_store_file( cls, file: UploadedFile, file_storage, destination_path: str ) -> tuple[str, str, bool]: - """Process file chunks and detect MIME type. + """Validate file MIME type and store file content. Args: file: The uploaded file to process @@ -916,31 +955,29 @@ def _process_file_chunks( destination_path: Path where file should be stored Returns: - tuple: (file_hash, mime_type, success) where success indicates if processing completed + tuple: (file_hash, mime_type, mime_type_detected) where mime_type_detected indicates if type is supported """ file_hash = sha256() first_iteration = True - mime_type = file.content_type file.seek(0) - try: - for chunk in file.chunks(chunk_size=cls.READ_CHUNK_SIZE): - if first_iteration: - mime_type = cls._detect_mime_type(chunk, file.name) - if not AllowedFileTypes.is_allowed(mime_type): - raise UnsupportedMimeTypeError( - f"Unsupported MIME type '{mime_type}' for file '{file.name}'" - ) - first_iteration = False + for chunk in file.chunks(chunk_size=cls.READ_CHUNK_SIZE): + if first_iteration: + # Enhanced MIME type detection + mime_type, is_supported = cls._detect_mime_type(file, chunk) - file_hash.update(chunk) - file_storage.write(path=destination_path, mode="ab", data=chunk) + # If unsupported, return early without processing further chunks + if not is_supported: + return "", mime_type, False - return file_hash.hexdigest(), mime_type, True + first_iteration = False - except UnsupportedMimeTypeError: - return "", mime_type, False + # Process chunk - hash and store + file_hash.update(chunk) + file_storage.write(path=destination_path, mode="ab", data=chunk) + + return file_hash.hexdigest(), mime_type, True @classmethod def _create_file_hash_object( @@ -1003,10 +1040,10 @@ def _check_duplicate_file( return False @classmethod - def _get_execution_status( + def _is_execution_completed( cls, use_file_history: bool, workflow, file_hash: str ) -> bool: - """Get execution status for file based on history.""" + """Check if file execution is completed based on history.""" if not use_file_history: return False @@ -1057,20 +1094,16 @@ def add_input_file_to_api_storage( file_name = file.name destination_path = os.path.join(api_storage_dir, file_name) - logger.info( - f"Detected MIME type from content type header: {file.content_type} for file {file_name}" - ) - file_system = FileSystem(FileStorageType.API_EXECUTION) file_storage = file_system.get_file_storage() # Process file chunks and detect MIME type - file_hash, mime_type, success = cls._process_file_chunks( + file_hash, mime_type, mime_type_detected = cls._validate_and_store_file( file, file_storage, destination_path ) # Handle unsupported files - if not success: + if not mime_type_detected: file_hash_object = cls._handle_unsupported_file( file_name, mime_type, @@ -1089,7 +1122,9 @@ def add_input_file_to_api_storage( continue # Get execution status - is_executed = cls._get_execution_status(use_file_history, workflow, file_hash) + is_executed = cls._is_execution_completed( + use_file_history, workflow, file_hash + ) # Create file hash object file_hash_object = cls._create_file_hash_object(