Skip to content

Commit 7b96739

Browse files
added option to cancel files in waiting (#1411)
* added option to cancel files in waiting * fix: update retry condition checks and streamline processing logic * added search check , checkbox uncheck * fix: update retry condition checks to improve document processing logic * check for row check * added check for retry options * added message change --------- Co-authored-by: Pravesh Kumar <121786590+praveshkumar1988@users.noreply.github.com>
1 parent 90acbd8 commit 7b96739

File tree

6 files changed

+197
-102
lines changed

6 files changed

+197
-102
lines changed

backend/score.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -891,17 +891,14 @@ async def retry_processing(uri=Form(None), userName=Form(None), password=Form(No
891891
try:
892892
start = time.time()
893893
graph = create_graph_database_connection(uri, userName, password, database)
894-
chunks = execute_graph_query(graph,QUERY_TO_GET_CHUNKS,params={"filename":file_name})
894+
# chunks = execute_graph_query(graph,QUERY_TO_GET_CHUNKS,params={"filename":file_name})
895895
end = time.time()
896896
elapsed_time = end - start
897897
json_obj = {'api_name':'retry_processing', 'db_url':uri, 'userName':userName, 'database':database, 'file_name':file_name,'retry_condition':retry_condition,
898898
'logging_time': formatted_time(datetime.now(timezone.utc)), 'elapsed_api_time':f'{elapsed_time:.2f}','email':email}
899899
logger.log_struct(json_obj, "INFO")
900-
if chunks[0]['text'] is None or chunks[0]['text']=="" or not chunks :
901-
return create_api_response('Success',message=f"Chunks are not created for the file{file_name}. Please upload again the file to re-process.",data=chunks)
902-
else:
903-
await asyncio.to_thread(set_status_retry, graph,file_name,retry_condition)
904-
return create_api_response('Success',message=f"Status set to Ready to Reprocess for filename : {file_name}")
900+
await asyncio.to_thread(set_status_retry, graph,file_name,retry_condition)
901+
return create_api_response('Success',message=f"Status set to Ready to Reprocess for filename : {file_name}")
905902
except Exception as e:
906903
job_status = "Failed"
907904
message="Unable to set status to Retry"

backend/src/main.py

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ def create_source_node_graph_url_wikipedia(graph, model, wiki_query, source_type
230230
async def extract_graph_from_file_local_file(uri, userName, password, database, model, merged_file_path, fileName, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions):
231231

232232
logging.info(f'Process file name :{fileName}')
233-
if not retry_condition:
233+
if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]:
234234
gcs_file_cache = os.environ.get('GCS_FILE_CACHE')
235235
if gcs_file_cache == 'True':
236236
folder_name = create_gcs_bucket_folder_name_hashed(uri, fileName)
@@ -244,7 +244,7 @@ async def extract_graph_from_file_local_file(uri, userName, password, database,
244244
return await processing_source(uri, userName, password, database, model, fileName, [], allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, True, merged_file_path, retry_condition, additional_instructions=additional_instructions)
245245

246246
async def extract_graph_from_file_s3(uri, userName, password, database, model, source_url, aws_access_key_id, aws_secret_access_key, file_name, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions):
247-
if not retry_condition:
247+
if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]:
248248
if(aws_access_key_id==None or aws_secret_access_key==None):
249249
raise LLMGraphBuilderException('Please provide AWS access and secret keys')
250250
else:
@@ -258,7 +258,7 @@ async def extract_graph_from_file_s3(uri, userName, password, database, model, s
258258
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition=retry_condition, additional_instructions=additional_instructions)
259259

260260
async def extract_graph_from_web_page(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions):
261-
if not retry_condition:
261+
if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]:
262262
pages = get_documents_from_web_page(source_url)
263263
if pages==None or len(pages)==0:
264264
raise LLMGraphBuilderException(f'Content is not available for given URL : {file_name}')
@@ -267,7 +267,7 @@ async def extract_graph_from_web_page(uri, userName, password, database, model,
267267
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition=retry_condition, additional_instructions=additional_instructions)
268268

269269
async def extract_graph_from_file_youtube(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions):
270-
if not retry_condition:
270+
if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]:
271271
file_name, pages = get_documents_from_youtube(source_url)
272272

273273
if pages==None or len(pages)==0:
@@ -277,7 +277,7 @@ async def extract_graph_from_file_youtube(uri, userName, password, database, mod
277277
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition=retry_condition, additional_instructions=additional_instructions)
278278

279279
async def extract_graph_from_file_Wikipedia(uri, userName, password, database, model, wiki_query, language, file_name, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions):
280-
if not retry_condition:
280+
if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]:
281281
file_name, pages = get_documents_from_Wikipedia(wiki_query, language)
282282
if pages==None or len(pages)==0:
283283
raise LLMGraphBuilderException(f'Wikipedia page is not available for file : {file_name}')
@@ -286,7 +286,7 @@ async def extract_graph_from_file_Wikipedia(uri, userName, password, database, m
286286
return await processing_source(uri, userName, password, database, model, file_name,[], allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition=retry_condition, additional_instructions=additional_instructions)
287287

288288
async def extract_graph_from_file_gcs(uri, userName, password, database, model, gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token, file_name, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions):
289-
if not retry_condition:
289+
if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]:
290290
file_name, pages = get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token)
291291
if pages==None or len(pages)==0:
292292
raise LLMGraphBuilderException(f'File content is not available for file : {file_name}')
@@ -431,7 +431,7 @@ async def processing_source(uri, userName, password, database, model, file_name,
431431

432432
# merged_file_path have value only when file uploaded from local
433433

434-
if is_uploaded_from_local:
434+
if is_uploaded_from_local and bool(is_cancelled_status) == False:
435435
gcs_file_cache = os.environ.get('GCS_FILE_CACHE')
436436
if gcs_file_cache == 'True':
437437
folder_name = create_gcs_bucket_folder_name_hashed(uri, file_name)
@@ -511,7 +511,7 @@ async def processing_chunks(chunkId_chunkDoc_list,graph,uri, userName, password,
511511
return node_count,rel_count,latency_processing_chunk
512512

513513
def get_chunkId_chunkDoc_list(graph, file_name, pages, token_chunk_size, chunk_overlap, retry_condition):
514-
if not retry_condition:
514+
if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]:
515515
logging.info("Break down file into chunks")
516516
bad_chars = ['"', "\n", "'"]
517517
for i in range(0,len(pages)):
@@ -532,7 +532,7 @@ def get_chunkId_chunkDoc_list(graph, file_name, pages, token_chunk_size, chunk_o
532532
chunks = execute_graph_query(graph,QUERY_TO_GET_CHUNKS, params={"filename":file_name})
533533

534534
if chunks[0]['text'] is None or chunks[0]['text']=="" or not chunks :
535-
raise LLMGraphBuilderException(f"Chunks are not created for {file_name}. Please re-upload file and try again.")
535+
raise LLMGraphBuilderException(f"Chunks are not created for {file_name}. Please re-upload file or reprocess the file with option Start From Beginning.")
536536
else:
537537
for chunk in chunks:
538538
chunk_doc = Document(page_content=chunk['text'], metadata={'id':chunk['id'], 'position':chunk['position']})
@@ -714,15 +714,9 @@ def manually_cancelled_job(graph, filenames, source_types, merged_dir, uri):
714714
obj_source_node.updated_at = datetime.now()
715715
graphDb_data_Access = graphDBdataAccess(graph)
716716
graphDb_data_Access.update_source_node(obj_source_node)
717-
count_response = graphDb_data_Access.update_node_relationship_count(file_name)
717+
#Update the nodeCount and relCount properties in Document node
718+
graphDb_data_Access.update_node_relationship_count(file_name)
718719
obj_source_node = None
719-
merged_file_path = os.path.join(merged_dir, file_name)
720-
if source_type == 'local file' and gcs_file_cache == 'True':
721-
folder_name = create_gcs_bucket_folder_name_hashed(uri, file_name)
722-
delete_file_from_gcs(BUCKET_UPLOAD,folder_name,file_name)
723-
else:
724-
logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}')
725-
delete_uploaded_local_file(merged_file_path,file_name)
726720
return "Cancelled the processing job successfully"
727721

728722
def populate_graph_schema_from_text(text, model, is_schema_description_checked, is_local_storage):
@@ -749,10 +743,19 @@ def set_status_retry(graph, file_name, retry_condition):
749743
obj_source_node.is_cancelled = False
750744
if retry_condition == DELETE_ENTITIES_AND_START_FROM_BEGINNING or retry_condition == START_FROM_BEGINNING:
751745
obj_source_node.processed_chunk=0
752-
if retry_condition == DELETE_ENTITIES_AND_START_FROM_BEGINNING:
753-
execute_graph_query(graph,QUERY_TO_DELETE_EXISTING_ENTITIES, params={"filename":file_name})
754746
obj_source_node.node_count=0
755747
obj_source_node.relationship_count=0
748+
obj_source_node.chunkNodeCount=0
749+
obj_source_node.chunkRelCount=0
750+
obj_source_node.communityNodeCount=0
751+
obj_source_node.communityRelCount=0
752+
obj_source_node.entityEntityRelCount=0
753+
obj_source_node.entityNodeCount=0
754+
obj_source_node.processingTime=0
755+
obj_source_node.total_chunks=0
756+
if retry_condition == DELETE_ENTITIES_AND_START_FROM_BEGINNING:
757+
execute_graph_query(graph,QUERY_TO_DELETE_EXISTING_ENTITIES, params={"filename":file_name})
758+
756759
logging.info(obj_source_node)
757760
graphDb_data_Access.update_source_node(obj_source_node)
758761

frontend/src/components/FileTable.tsx

Lines changed: 109 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ const FileTable: ForwardRefRenderFunction<ChildRef, FileTableProps> = (props, re
7979
const columnHelper = createColumnHelper<CustomFile>();
8080
const [columnFilters, setColumnFilters] = useState<ColumnFiltersState>([]);
8181
const [isLoading, setIsLoading] = useState<boolean>(false);
82+
const [isCancellingQueue, setIsCancellingQueue] = useState<boolean>(false);
8283
const [statusFilter, setStatusFilter] = useState<string>('');
8384
const [filetypeFilter, setFiletypeFilter] = useState<string>('');
8485
const [fileSourceFilter, setFileSourceFilter] = useState<string>('');
@@ -833,6 +834,73 @@ const FileTable: ForwardRefRenderFunction<ChildRef, FileTableProps> = (props, re
833834
}
834835
}, [connectionStatus, filesData.length, isReadOnlyUser]);
835836

837+
const refreshFileData = async () => {
838+
try {
839+
const res = await getSourceNodes();
840+
if (res.data && res.data.status !== 'Failed' && res.data.data.length) {
841+
const updatedFiles = res.data.data
842+
.map((item: SourceNode) => {
843+
const existingFile = filesData.find((f) => f.name === item.fileName);
844+
if (existingFile) {
845+
// Check if file is in queue
846+
const isInQueue = queue.items.some((f) => f.name === item.fileName);
847+
return {
848+
...existingFile,
849+
status: isInQueue ? 'Waiting' : getFileSourceStatus(item),
850+
nodesCount: item?.nodeCount ?? existingFile.nodesCount,
851+
relationshipsCount: item?.relationshipCount ?? existingFile.relationshipsCount,
852+
processingTotalTime: item?.processingTime ?? existingFile.processingTotalTime,
853+
};
854+
}
855+
return existingFile;
856+
})
857+
.filter(Boolean);
858+
859+
setFilesData(updatedFiles as CustomFile[]);
860+
setRowSelection((prev) => {
861+
const updated = { ...prev };
862+
updatedFiles.forEach((file) => {
863+
if (file?.status === 'Cancelled' && updated[file.id]) {
864+
delete updated[file.id];
865+
}
866+
});
867+
return updated;
868+
});
869+
}
870+
} catch (error) {
871+
console.error('Refresh failed:', error);
872+
}
873+
};
874+
875+
const cancelQueue = async () => {
876+
if (queue.isEmpty()) {
877+
showNormalToast('No files in queue to cancel');
878+
return;
879+
}
880+
881+
setIsCancellingQueue(true);
882+
try {
883+
const queuedFileNames = queue.items.map((f) => f.name as string).filter(Boolean);
884+
const queuedFileSources = queue.items.map((f) => f.fileSource as string).filter(Boolean);
885+
const res = await cancelAPI(queuedFileNames, queuedFileSources);
886+
887+
if (res.data.status === 'Success') {
888+
queue.clear();
889+
await refreshFileData();
890+
891+
showNormalToast(`Successfully cancelled ${queuedFileNames.length} waiting file(s)`);
892+
} else {
893+
throw new Error(res.data.error || 'Failed to cancel queue');
894+
}
895+
} catch (err) {
896+
if (err instanceof Error) {
897+
showErrorToast(`Failed to cancel queue: ${err.message}`);
898+
}
899+
} finally {
900+
setIsCancellingQueue(false);
901+
}
902+
};
903+
836904
const cancelHandler = async (fileName: string, id: string, fileSource: string) => {
837905
setFilesData((prevfiles) =>
838906
prevfiles.map((curfile) => {
@@ -860,6 +928,11 @@ const FileTable: ForwardRefRenderFunction<ChildRef, FileTableProps> = (props, re
860928
return curfile;
861929
})
862930
);
931+
setRowSelection((prev) => {
932+
const updated = { ...prev };
933+
delete updated[id];
934+
return updated;
935+
});
863936
setProcessedCount((prev) => {
864937
if (prev == batchSize) {
865938
return batchSize - 1;
@@ -1036,14 +1109,44 @@ const FileTable: ForwardRefRenderFunction<ChildRef, FileTableProps> = (props, re
10361109
</DataGridComponents.TableResults>
10371110
);
10381111
} else if (connectionStatus) {
1112+
const queueSize = queue.size();
10391113
return (
10401114
<DataGridComponents.TableResults>
1041-
<Flex flexDirection='row' gap='0' alignItems='center'>
1042-
<span>
1043-
<InformationCircleIconOutline className='n-size-token-6' />
1044-
</span>
1045-
{`Large files may be partially processed up to 10K characters due to resource limit.`}
1046-
<span></span>
1115+
<Flex flexDirection='row' gap='4' alignItems='center'>
1116+
<Flex flexDirection='row' gap='0' alignItems='center'>
1117+
<span>
1118+
<InformationCircleIconOutline className='n-size-token-6' />
1119+
</span>
1120+
{`Large files may be partially processed up to 10K characters due to resource limit.`}
1121+
</Flex>
1122+
{queueSize > 0 && (
1123+
<Flex
1124+
flexDirection='row'
1125+
gap='2'
1126+
alignItems='center'
1127+
className={`${isCancellingQueue ? 'opacity-50' : 'animate-pulse'} bg-palette-warning-bg-weak rounded-md px-3 py-2 border border-palette-warning-border`}
1128+
>
1129+
<InformationCircleIconOutline className='n-size-token-5 text-palette-warning-text' />
1130+
<Typography variant='body-medium' className='font-semibold text-palette-warning-text'>
1131+
{isCancellingQueue
1132+
? 'Cancelling files in waiting queue...'
1133+
: `${queueSize} file${queueSize !== 1 ? 's' : ''} waiting in queue`}
1134+
</Typography>
1135+
{!isReadOnlyUser && (
1136+
<IconButtonWithToolTip
1137+
placement='right'
1138+
text={isCancellingQueue ? 'Cancelling...' : 'Cancel all waiting files'}
1139+
size='small'
1140+
label='Cancel Queue'
1141+
clean
1142+
disabled={isCancellingQueue}
1143+
onClick={cancelQueue}
1144+
>
1145+
<XMarkIconOutline className='n-size-token-4' />
1146+
</IconButtonWithToolTip>
1147+
)}
1148+
</Flex>
1149+
)}
10471150
</Flex>
10481151
</DataGridComponents.TableResults>
10491152
);

0 commit comments

Comments
 (0)