diff --git a/backend/backend/settings/base.py b/backend/backend/settings/base.py index 5fbfa71452..3e07773ca4 100644 --- a/backend/backend/settings/base.py +++ b/backend/backend/settings/base.py @@ -165,6 +165,9 @@ def get_required_setting(setting_key: str, default: str | None = None) -> str | ATOMIC_REQUESTS = CommonUtils.str_to_bool( os.environ.get("DJANGO_ATOMIC_REQUESTS", "False") ) +MAX_FILE_SIZE_LIMIT_TO_READ = int( + os.environ.get("MAX_FILE_SIZE_LIMIT_TO_READ", 100 * 1024 * 1024) +) # 100MB limit for full file analysis # Flag to Enable django admin ADMIN_ENABLED = False diff --git a/backend/sample.env b/backend/sample.env index 5f5219c7d4..5637ae316f 100644 --- a/backend/sample.env +++ b/backend/sample.env @@ -186,3 +186,7 @@ FILE_EXECUTION_TRACKER_COMPLETED_TTL_IN_SECOND=600 # 10 minutes # Runner polling timeout MAX_RUNNER_POLLING_WAIT_SECONDS=10800 # 3 hours RUNNER_POLLING_INTERVAL_SECONDS=2 # 2 seconds + + +# Max file size limit to read +MAX_FILE_SIZE_LIMIT_TO_READ=100 * 1024 * 1024 # 100MB limit for full file analysis diff --git a/backend/workflow_manager/endpoint_v2/enums.py b/backend/workflow_manager/endpoint_v2/enums.py index cd6cd9f585..5e8fd6bbfb 100644 --- a/backend/workflow_manager/endpoint_v2/enums.py +++ b/backend/workflow_manager/endpoint_v2/enums.py @@ -26,3 +26,16 @@ class AllowedFileTypes(Enum): @classmethod def is_allowed(cls, mime_type: str) -> bool: return mime_type in cls._value2member_map_ + + +class InvalidFileTypes(Enum): + OCTET_STREAM = "application/octet-stream" + + +class UncertainMimeTypes(Enum): + OCTET_STREAM = "application/octet-stream" + ZIP = "application/zip" + + @classmethod + def is_uncertain(cls, mime_type: str) -> bool: + return mime_type in cls._value2member_map_ diff --git a/backend/workflow_manager/endpoint_v2/source.py b/backend/workflow_manager/endpoint_v2/source.py index 8df0d0953c..4c6e04310c 100644 --- a/backend/workflow_manager/endpoint_v2/source.py +++ b/backend/workflow_manager/endpoint_v2/source.py @@ -12,6 +12,7 @@ import magic from connector_processor.constants import ConnectorKeys from connector_v2.models import ConnectorInstance +from django.conf import settings from django.core.files.uploadedfile import UploadedFile from django.db.models import Q from utils.user_context import UserContext @@ -24,7 +25,7 @@ SourceKey, ) from workflow_manager.endpoint_v2.dto import FileHash, SourceConfig -from workflow_manager.endpoint_v2.enums import AllowedFileTypes +from workflow_manager.endpoint_v2.enums import AllowedFileTypes, UncertainMimeTypes from workflow_manager.endpoint_v2.exceptions import ( InvalidInputDirectory, InvalidSourceConnectionType, @@ -64,7 +65,10 @@ class SourceConnector(BaseConnector): workflow (Workflow): The workflow associated with the source connector. """ - READ_CHUNK_SIZE = 4194304 # Chunk size for reading files + READ_CHUNK_SIZE = 4194304 # 4MB chunk size for reading files + MAX_FILE_SIZE_LIMIT_TO_READ = ( + settings.MAX_FILE_SIZE_LIMIT_TO_READ + ) # limit for full file analysis def __init__( self, @@ -885,6 +889,169 @@ def load_file(self, input_file_path: str) -> tuple[str, BytesIO]: return os.path.basename(input_file_path), file_stream + @classmethod + def _detect_mime_type( + cls, file: UploadedFile, first_chunk: bytes + ) -> tuple[str, bool]: + """Enhanced MIME type detection with smart fallback for uncertain types. + + Args: + file: The uploaded file + first_chunk: First chunk of file data + + Returns: + tuple: (mime_type, is_supported) where is_supported indicates if type is allowed + """ + mime_type = file.content_type + logger.info(f"Content-Type header: {mime_type} for file {file.name}") + + # Primary detection using python-magic with first chunk + mime_type = magic.from_buffer(first_chunk, mime=True) + logger.info( + f"Detected MIME type using Python Magic: {mime_type} for file {file.name}" + ) + + # Smart fallback: if uncertain type and file is small enough, analyze full file + if ( + UncertainMimeTypes.is_uncertain(mime_type) + and cls.READ_CHUNK_SIZE < file.size <= cls.MAX_FILE_SIZE_LIMIT_TO_READ + ): + logger.info( + f"Uncertain MIME type '{mime_type}', trying full file analysis for {file.name}" + ) + + # Read full file content + file.seek(0) + full_content = file.read() + file.seek(0) # Reset for subsequent processing + + # Analyze full file content + fallback_mime_type = magic.from_buffer(full_content, mime=True) + + # Only use fallback result if it's more specific than chunk result + if fallback_mime_type not in UncertainMimeTypes: + mime_type = fallback_mime_type + logger.info( + f"Full file analysis detected: {mime_type} for file {file.name}" + ) + else: + logger.info( + f"Full file analysis still uncertain, keeping: {mime_type} for file {file.name}" + ) + + # Check if detected type is supported + is_supported = AllowedFileTypes.is_allowed(mime_type) + return mime_type, is_supported + + @classmethod + def _validate_and_store_file( + cls, file: UploadedFile, file_storage, destination_path: str + ) -> tuple[str, str, bool]: + """Validate file MIME type and store file content. + + Args: + file: The uploaded file to process + file_storage: File storage instance + destination_path: Path where file should be stored + + Returns: + tuple: (file_hash, mime_type, mime_type_detected) where mime_type_detected indicates if type is supported + """ + file_hash = sha256() + first_iteration = True + + file.seek(0) + + for chunk in file.chunks(chunk_size=cls.READ_CHUNK_SIZE): + if first_iteration: + # Enhanced MIME type detection + mime_type, is_supported = cls._detect_mime_type(file, chunk) + + # If unsupported, return early without processing further chunks + if not is_supported: + return "", mime_type, False + + first_iteration = False + + # Process chunk - hash and store + file_hash.update(chunk) + file_storage.write(path=destination_path, mode="ab", data=chunk) + + return file_hash.hexdigest(), mime_type, True + + @classmethod + def _create_file_hash_object( + cls, + file_path: str, + connection_type, + file_name: str, + file_hash: str, + is_executed: bool, + file_size: int, + mime_type: str, + ) -> FileHash: + """Create FileHash object with provided parameters.""" + return FileHash( + file_path=file_path, + source_connection_type=connection_type, + file_name=file_name, + file_hash=file_hash, + is_executed=is_executed, + file_size=file_size, + mime_type=mime_type, + ) + + @classmethod + def _handle_unsupported_file( + cls, + file_name: str, + mime_type: str, + destination_path: str, + connection_type, + file_size: int, + workflow_log, + ) -> FileHash: + """Handle files with unsupported MIME types.""" + log_message = f"Skipping file '{file_name}' to stage due to unsupported MIME type '{mime_type}'" + workflow_log.log_info(logger=logger, message=log_message) + + fake_hash = f"temp-hash-{uuid.uuid4().hex}" + return cls._create_file_hash_object( + file_path=destination_path, + connection_type=connection_type, + file_name=file_name, + file_hash=fake_hash, + is_executed=True, + file_size=file_size, + mime_type=mime_type, + ) + + @classmethod + def _check_duplicate_file( + cls, file_hash: str, unique_file_hashes: set[str], file_name: str, workflow_log + ) -> bool: + """Check if file is duplicate and log if needed.""" + if file_hash in unique_file_hashes: + log_message = f"Skipping file '{file_name}' — duplicate detected within the current request. Already staged for processing." + workflow_log.log_info(logger=logger, message=log_message) + return True + + unique_file_hashes.add(file_hash) + return False + + @classmethod + def _is_execution_completed( + cls, use_file_history: bool, workflow, file_hash: str + ) -> bool: + """Check if file execution is completed based on history.""" + if not use_file_history: + return False + + file_history = FileHistoryHelper.get_file_history( + workflow=workflow, cache_key=file_hash + ) + return True if file_history and file_history.is_completed() else False + @classmethod def add_input_file_to_api_storage( cls, @@ -919,65 +1086,59 @@ def add_input_file_to_api_storage( workflow_id=workflow_id, execution_id=execution_id ) workflow: Workflow = Workflow.objects.get(id=workflow_id) - file_hashes: dict[str, FileHash] = {} + file_hashes_objects: dict[str, FileHash] = {} unique_file_hashes: set[str] = set() connection_type = WorkflowEndpoint.ConnectionType.API + for file in file_objs: file_name = file.name destination_path = os.path.join(api_storage_dir, file_name) - mime_type = file.content_type - logger.info(f"Detected MIME type: {mime_type} for file {file_name}") - if not AllowedFileTypes.is_allowed(mime_type): - log_message = f"Skipping file '{file_name}' to stage due to unsupported MIME type '{mime_type}'" - workflow_log.log_info(logger=logger, message=log_message) - # Generate a clearly marked temporary hash to avoid reading the file content - # Helps to prevent duplicate entries in file executions - fake_hash = f"temp-hash-{uuid.uuid4().hex}" - file_hash = FileHash( - file_path=destination_path, - source_connection_type=connection_type, - file_name=file_name, - file_hash=fake_hash, - is_executed=True, - file_size=file.size, - mime_type=mime_type, - ) - file_hashes.update({file_name: file_hash}) - continue - file_system = FileSystem(FileStorageType.API_EXECUTION) file_storage = file_system.get_file_storage() - file_hash = sha256() - for chunk in file.chunks(chunk_size=cls.READ_CHUNK_SIZE): - file_hash.update(chunk) - file_storage.write(path=destination_path, mode="ab", data=chunk) - file_hash = file_hash.hexdigest() - # Skip duplicate files - if file_hash in unique_file_hashes: - log_message = f"Skipping file '{file_name}' — duplicate detected within the current request. Already staged for processing." - workflow_log.log_info(logger=logger, message=log_message) - continue - unique_file_hashes.add(file_hash) + # Process file chunks and detect MIME type + file_hash, mime_type, mime_type_detected = cls._validate_and_store_file( + file, file_storage, destination_path + ) - file_history = None - if use_file_history: - file_history = FileHistoryHelper.get_file_history( - workflow=workflow, cache_key=file_hash + # Handle unsupported files + if not mime_type_detected: + file_hash_object = cls._handle_unsupported_file( + file_name, + mime_type, + destination_path, + connection_type, + file.size, + workflow_log, ) - is_executed = True if file_history and file_history.is_completed() else False - file_hash = FileHash( - file_path=destination_path, - source_connection_type=connection_type, - file_name=file_name, - file_hash=file_hash, - is_executed=is_executed, - file_size=file.size, - mime_type=mime_type, + file_hashes_objects.update({file_name: file_hash_object}) + continue + + # Check for duplicates + if cls._check_duplicate_file( + file_hash, unique_file_hashes, file_name, workflow_log + ): + continue + + # Get execution status + is_executed = cls._is_execution_completed( + use_file_history, workflow, file_hash ) - file_hashes.update({file_name: file_hash}) - return file_hashes + + # Create file hash object + file_hash_object = cls._create_file_hash_object( + destination_path, + connection_type, + file_name, + file_hash, + is_executed, + file.size, + mime_type, + ) + file_hashes_objects.update({file_name: file_hash_object}) + + return file_hashes_objects @classmethod def create_endpoint_for_workflow(