2323from core .storer .doc_manager .knowledge_index import FieldInfo , KBSchema , ParserConfig , check_kb_schema
2424from core .storer .doc_manager .schema_utils import convert_dataframe_types_by_schema
2525from core .storer .vector_manager .vearch_manager import VearchManager
26- from utils .files_process import calculate_file_md5 , extract_file_type , is_supported_file
26+ from utils .file_util import calculate_file_md5 , extract_file_type , is_supported_file
2727from utils .hash_util import str_to_md5
2828from utils .type_trans import get_py_type
2929
@@ -78,6 +78,58 @@ def create_parser_from_config(parser_config: ParserConfig = None) -> NodeParser
7878 return ParserFactory .create_parser ("sentence" , chunk_size = 300 , chunk_overlap = 20 )
7979
8080
81+ def validate_dataframe_columns (df : pd .DataFrame , kb_schema : KBSchema , kb_type : str ) -> None :
82+ """
83+ Validate DataFrame columns match schema field definitions.
84+
85+ For structured knowledge bases, ensures:
86+ 1. All schema-defined fields exist in the DataFrame
87+ 2. No extra undefined fields exist in the DataFrame
88+
89+ For unstructured knowledge bases, skips validation as they use dynamic chunking.
90+
91+ Args:
92+ df: DataFrame to validate
93+ kb_schema: Knowledge base schema containing field definitions
94+ kb_type: Knowledge base type ("structured" or "unstructured")
95+
96+ Raises:
97+ HTTPException 400: If columns don't match schema
98+ """
99+ # Skip validation for unstructured knowledge bases
100+ if kb_type == "unstructured" :
101+ logger .debug ("Skipping column validation for unstructured knowledge base" )
102+ return
103+
104+ # Get schema-defined fields
105+ schema_fields = {field .field_name for field in kb_schema .fields }
106+
107+ # Get DataFrame columns
108+ df_columns = set (df .columns )
109+
110+ # Check for missing required fields
111+ missing_fields = schema_fields - df_columns
112+ if missing_fields :
113+ raise HTTPException (
114+ status_code = 400 ,
115+ detail = f"File is missing required fields defined in schema: { ', ' .join (sorted (missing_fields ))} . "
116+ f"Schema defines the following fields: { ', ' .join (sorted (schema_fields ))} . "
117+ f"Please ensure the file contains all required columns."
118+ )
119+
120+ # Check for extra undefined fields
121+ extra_fields = df_columns - schema_fields
122+ if extra_fields :
123+ raise HTTPException (
124+ status_code = 400 ,
125+ detail = f"File contains undefined fields not in schema: { ', ' .join (sorted (extra_fields ))} . "
126+ f"Schema only defines: { ', ' .join (sorted (schema_fields ))} . "
127+ f"Please remove extra columns or update the schema to include them."
128+ )
129+
130+ logger .info (f"Column validation passed: all { len (schema_fields )} schema fields found in file" )
131+
132+
81133@router .get (
82134 "/kb_file" ,
83135 response_model = APIResponse [PaginatedResponse [KnowledgeFileItem ]],
@@ -129,6 +181,7 @@ def get_kb_files(
129181 )
130182
131183
184+ # TODO This endpoint only uploads the file to the corresponding folder, and later decides whether to store it in the database
132185@router .post (
133186 "/upload_file" ,
134187 response_model = APIResponse [FileUploadInfo ],
@@ -252,6 +305,7 @@ def get_uploaded_file_info(
252305 df = pd .read_excel (file_path )
253306
254307 else :
308+ # TODO Add support for other structured data types, e.g., excel
255309 raise HTTPException (
256310 status_code = 400 ,
257311 detail = f"Unsupported file type: { file_type } "
@@ -325,6 +379,14 @@ def ingest_kb_file(
325379 )
326380
327381 file_path = file_upload_info .file_path
382+
383+ # Check if file exists before processing
384+ if not os .path .exists (file_path ):
385+ raise HTTPException (
386+ status_code = 400 ,
387+ detail = f"File not found: { file_path } . The file may have been deleted after a previous failed ingestion. Please re-upload the file."
388+ )
389+
328390 try :
329391 if kb_type == "structured" : # Structured data processing method
330392 # 1. Read file into memory
@@ -380,17 +442,39 @@ def ingest_kb_file(
380442 elif os .path .exists (file_path ):
381443 logger .info (f"File preserved (not a temporary file): { file_path } " )
382444
383- # 2. Convert DataFrame column types according to schema
445+ # 2. Validate DataFrame columns match schema
446+ try :
447+ validate_dataframe_columns (df , kb_schema , kb_type )
448+ except HTTPException :
449+ raise
450+ except Exception as e :
451+ logger .error (f"Column validation failed: { e } " )
452+ raise HTTPException (
453+ status_code = 400 ,
454+ detail = f"Column validation failed: { str (e )} "
455+ )
456+
457+ # 3. Convert DataFrame column types according to schema
384458 # Keep NaN values first, convert types, then handle NaN based on field type
385459 df = convert_dataframe_types_by_schema (df , kb_schema )
386-
387- # 3. Add three fixed columns to each row of data here, which need to be correspondingly added when inferring ES and Vearch schemas
388- df ['kb_id' ] = kb_id
389- df ['ori_file_id' ] = file_upload_info .file_id
390- df ['chunk_id' ] = [str (uuid .uuid4 ().hex [:16 ]) for _ in range (len (df ))]
391460
392- # 2. Write the df data in memory to ES and Vearch
393- # 2.1 Write to ES index based on data in df and inferred schema
461+ # 4. Add system fields to each row of data (these fields are automatically added when inferring ES and Vearch schemas)
462+ current_time = datetime .datetime .now ().strftime ("%Y-%m-%d %H:%M:%S" )
463+
464+ df ['kb_id' ] = kb_id # Knowledge base ID (unchanged)
465+ df ['sys_sample_id' ] = [str (uuid .uuid4 ().hex [:16 ]) for _ in range (len (df ))] # Sample ID (renamed from chunk_id)
466+ df ['sys_group' ] = file_upload_info .file_id # Group identifier (renamed from ori_file_id)
467+ df ['sys_template' ] = "default" # Template type
468+ df ['sys_priority' ] = 3 # Priority (P3 by default)
469+ df ['sys_status' ] = "已入库" # Status (已入库 by default)
470+ df ['sys_executor' ] = "" # Executor (empty by default)
471+ df ['sys_overview' ] = "" # Overview (empty by default)
472+ df ['sys_remarks' ] = "" # Remarks (empty by default)
473+ df ['sys_create_time' ] = current_time # Creation time
474+ df ['sys_update_time' ] = current_time # Update time
475+
476+ # 5. Write the df data in memory to ES and Vearch
477+ # 5.1 Write to ES index based on data in df and inferred schema
394478 es_add_result = kb_file_client .kb_add_df (kb_name = kb_name , df = df )
395479 if not es_add_result :
396480 logger .error ("add file data into es failed" )
@@ -400,7 +484,7 @@ def ingest_kb_file(
400484 )
401485 logger .info (f"add file data into es success" )
402486
403- # 2 .2 First perform embedding processing on some fields based on the inferred schema, save to df, and then write to Vearch space
487+ # 5 .2 First perform embedding processing on some fields based on the inferred schema, save to df, and then write to Vearch space
404488 if kb_schema .match_rules :
405489 from core .storer .doc_manager .knowledge_index import VearchVectorMatchPolicy
406490
@@ -464,7 +548,7 @@ def ingest_kb_file(
464548 detail = f"Vectorization processing failed: { str (e )} "
465549 )
466550
467- # 3 . Write file information to kb_file
551+ # 6 . Write file information to kb_file
468552 current_time = datetime .datetime .now ().strftime ("%Y-%m-%d %H:%M:%S" )
469553 kb_file_item = {
470554 "kb_id" : kb_id ,
@@ -542,19 +626,44 @@ def ingest_kb_data(
542626 except ValueError :
543627 # If it's a scalar dictionary, wrap it in a list
544628 df = pd .DataFrame ([data ])
545-
629+
630+ # Get kb_type for validation
631+ kb_type = search_result [0 ].get ("kb_type" , "structured" )
632+
633+ # Validate DataFrame columns match schema
634+ try :
635+ validate_dataframe_columns (df , kb_schema , kb_type )
636+ except HTTPException :
637+ raise
638+ except Exception as e :
639+ logger .error (f"Column validation failed: { e } " )
640+ raise HTTPException (
641+ status_code = 400 ,
642+ detail = f"Column validation failed: { str (e )} "
643+ )
644+
546645 # Convert DataFrame column types according to schema
547646 # Keep NaN values first, convert types, then handle NaN based on field type
548647 df = convert_dataframe_types_by_schema (df , kb_schema )
549648
550- # Add three fixed columns to each row of data here, which need to be correspondingly added when inferring ES and Vearch schemas
551- df [ 'kb_id' ] = kb_id
649+ # Add system fields to each row of data (these fields are automatically added when inferring ES and Vearch schemas)
650+ current_time = datetime . datetime . now (). strftime ( "%Y-%m-%d %H:%M:%S" )
552651 mock_file_id = f"mock_file_{ kb_id } "
553- df ['ori_file_id' ] = mock_file_id
554- df ['chunk_id' ] = [str (uuid .uuid4 ().hex [:16 ]) for _ in range (len (df ))]
555652
556- # 2. Write the df data in memory to ES and Vearch
557- # 2.1 Write to ES index based on data in df and inferred schema
653+ df ['kb_id' ] = kb_id # Knowledge base ID (unchanged)
654+ df ['sys_sample_id' ] = [str (uuid .uuid4 ().hex [:16 ]) for _ in range (len (df ))] # Sample ID (renamed from chunk_id)
655+ df ['sys_group' ] = mock_file_id # Group identifier (renamed from ori_file_id)
656+ df ['sys_template' ] = "default" # Template type
657+ df ['sys_priority' ] = 3 # Priority (P3 by default)
658+ df ['sys_status' ] = "已入库" # Status (已入库 by default)
659+ df ['sys_executor' ] = "" # Executor (empty by default)
660+ df ['sys_overview' ] = "" # Overview (empty by default)
661+ df ['sys_remarks' ] = "" # Remarks (empty by default)
662+ df ['sys_create_time' ] = current_time # Creation time
663+ df ['sys_update_time' ] = current_time # Update time
664+
665+ # 5. Write the df data in memory to ES and Vearch
666+ # 5.1 Write to ES index based on data in df and inferred schema
558667 es_add_result = kb_file_client .kb_add_df (kb_name = kb_name , df = df )
559668 if not es_add_result :
560669 logger .error ("add file data into es failed" )
@@ -564,7 +673,7 @@ def ingest_kb_data(
564673 )
565674 logger .info (f"add file data into es success" )
566675
567- # 2 .2 First perform embedding processing on some fields based on the inferred schema, save to df, and then write to Vearch space
676+ # 5 .2 First perform embedding processing on some fields based on the inferred schema, save to df, and then write to Vearch space
568677 if kb_schema .match_rules :
569678 from core .storer .doc_manager .knowledge_index import VearchVectorMatchPolicy
570679
0 commit comments