@@ -70,6 +70,7 @@ def process_csv_file(self, project_id: str, user_id: str):
7070 columns_metadata = []
7171 for column in df .columns :
7272 col_type = str (df [column ].dtype )
73+ col_series = df [column ]
7374
7475 # Determine data type category
7576 if "int" in col_type or "float" in col_type :
@@ -82,20 +83,88 @@ def process_csv_file(self, project_id: str, user_id: str):
8283 data_type = "string"
8384
8485 # Check for null values
85- nullable = df [column ].isnull ().any ()
86+ nullable = col_series .isnull ().any ()
87+ null_count = col_series .isnull ().sum ()
88+ null_percentage = (null_count / len (col_series )) * 100
8689
8790 # Get sample values (first 5 non-null values)
88- sample_values = df [column ].dropna ().head (5 ).tolist ()
91+ sample_values = col_series .dropna ().head (5 ).tolist ()
92+
93+ # Calculate statistics for numeric columns
94+ statistics = {}
95+ if data_type == "number" :
96+ statistics = {
97+ "min" : float (col_series .min ()) if not col_series .empty else None ,
98+ "max" : float (col_series .max ()) if not col_series .empty else None ,
99+ "mean" : float (col_series .mean ()) if not col_series .empty else None ,
100+ "median" : (
101+ float (col_series .median ()) if not col_series .empty else None
102+ ),
103+ "std" : float (col_series .std ()) if not col_series .empty else None ,
104+ }
105+ elif data_type == "string" :
106+ # String statistics
107+ unique_count = col_series .nunique ()
108+ most_common = col_series .mode ().tolist () if not col_series .empty else []
109+ avg_length = col_series .str .len ().mean () if not col_series .empty else 0
110+ statistics = {
111+ "unique_count" : int (unique_count ),
112+ "most_common_values" : most_common [:3 ], # Top 3 most common
113+ "average_length" : (
114+ float (avg_length ) if not pd .isna (avg_length ) else 0
115+ ),
116+ }
117+
118+ # Detect potential data quality issues
119+ data_quality_issues = []
120+ if null_percentage > 50 :
121+ data_quality_issues .append ("high_null_percentage" )
122+ if data_type == "string" and col_series .nunique () == 1 :
123+ data_quality_issues .append ("single_value_column" )
124+ if data_type == "number" and col_series .std () == 0 :
125+ data_quality_issues .append ("no_variance" )
89126
90127 columns_metadata .append (
91128 {
92129 "name" : column ,
93130 "type" : data_type ,
94131 "nullable" : nullable ,
132+ "null_count" : int (null_count ),
133+ "null_percentage" : round (null_percentage , 2 ),
95134 "sample_values" : sample_values ,
135+ "statistics" : statistics ,
136+ "data_quality_issues" : data_quality_issues ,
96137 }
97138 )
98139
140+ # Calculate dataset-level insights
141+ dataset_insights = {
142+ "total_rows" : len (df ),
143+ "total_columns" : len (df .columns ),
144+ "total_cells" : len (df ) * len (df .columns ),
145+ "null_cells" : df .isnull ().sum ().sum (),
146+ "null_percentage" : round (
147+ (df .isnull ().sum ().sum () / (len (df ) * len (df .columns ))) * 100 , 2
148+ ),
149+ "duplicate_rows" : int (df .duplicated ().sum ()),
150+ "duplicate_percentage" : round ((df .duplicated ().sum () / len (df )) * 100 , 2 ),
151+ "numeric_columns" : len (
152+ [col for col in columns_metadata if col ["type" ] == "number" ]
153+ ),
154+ "string_columns" : len (
155+ [col for col in columns_metadata if col ["type" ] == "string" ]
156+ ),
157+ "datetime_columns" : len (
158+ [col for col in columns_metadata if col ["type" ] == "datetime" ]
159+ ),
160+ "boolean_columns" : len (
161+ [col for col in columns_metadata if col ["type" ] == "boolean" ]
162+ ),
163+ "columns_with_issues" : len (
164+ [col for col in columns_metadata if col ["data_quality_issues" ]]
165+ ),
166+ }
167+
99168 # Update project with analysis results
100169 self .update_state (
101170 state = "PROGRESS" ,
@@ -122,6 +191,7 @@ def process_csv_file(self, project_id: str, user_id: str):
122191 "row_count" : len (df ),
123192 "column_count" : len (df .columns ),
124193 "columns_metadata" : columns_metadata ,
194+ "dataset_insights" : dataset_insights ,
125195 }
126196
127197 logger .info (f"Successfully processed CSV for project { project_id } " )
@@ -144,48 +214,147 @@ def process_csv_file(self, project_id: str, user_id: str):
144214
145215
146216@celery_app .task (bind = True )
147- def analyze_csv_schema (self , file_path : str ):
217+ def analyze_csv_schema (self , file_content : bytes , filename : str = "data.csv" ):
148218 """
149- Analyze CSV schema - placeholder implementation for Task B2
150- Will be fully implemented in Task B13
219+ Analyze CSV schema independently - enhanced implementation for Task B13
151220 """
152221 try :
153- logger .info (f"Analyzing CSV schema: { file_path } " )
222+ logger .info (f"Analyzing CSV schema for file : { filename } " )
154223
155- # Simulate schema analysis
156- import time
224+ # Update task state
225+ self .update_state (
226+ state = "PROGRESS" ,
227+ meta = {"current" : 20 , "total" : 100 , "status" : "Parsing CSV..." },
228+ )
157229
158- time .sleep (1 )
230+ # Parse CSV with pandas
231+ try :
232+ df = pd .read_csv (StringIO (file_content .decode ("utf-8" )))
233+ except Exception as e :
234+ raise Exception (f"Failed to parse CSV: { str (e )} " )
159235
160- # Mock schema result
161- schema = {
162- "columns" : [
163- {
164- "name" : "id" ,
165- "type" : "integer" ,
166- "nullable" : False ,
167- "sample_values" : [1 , 2 , 3 ],
168- },
169- {
170- "name" : "name" ,
171- "type" : "string" ,
172- "nullable" : False ,
173- "sample_values" : ["John" , "Jane" , "Bob" ],
174- },
236+ # Update task state
237+ self .update_state (
238+ state = "PROGRESS" ,
239+ meta = {"current" : 60 , "total" : 100 , "status" : "Analyzing schema..." },
240+ )
241+
242+ # Analyze columns
243+ columns_metadata = []
244+ for column in df .columns :
245+ col_type = str (df [column ].dtype )
246+ col_series = df [column ]
247+
248+ # Determine data type category
249+ if "int" in col_type or "float" in col_type :
250+ data_type = "number"
251+ elif "datetime" in col_type :
252+ data_type = "datetime"
253+ elif "bool" in col_type :
254+ data_type = "boolean"
255+ else :
256+ data_type = "string"
257+
258+ # Check for null values
259+ nullable = col_series .isnull ().any ()
260+ null_count = col_series .isnull ().sum ()
261+ null_percentage = (null_count / len (col_series )) * 100
262+
263+ # Get sample values (first 5 non-null values)
264+ sample_values = col_series .dropna ().head (5 ).tolist ()
265+
266+ # Calculate statistics for numeric columns
267+ statistics = {}
268+ if data_type == "number" :
269+ statistics = {
270+ "min" : float (col_series .min ()) if not col_series .empty else None ,
271+ "max" : float (col_series .max ()) if not col_series .empty else None ,
272+ "mean" : float (col_series .mean ()) if not col_series .empty else None ,
273+ "median" : (
274+ float (col_series .median ()) if not col_series .empty else None
275+ ),
276+ "std" : float (col_series .std ()) if not col_series .empty else None ,
277+ }
278+ elif data_type == "string" :
279+ # String statistics
280+ unique_count = col_series .nunique ()
281+ most_common = col_series .mode ().tolist () if not col_series .empty else []
282+ avg_length = col_series .str .len ().mean () if not col_series .empty else 0
283+ statistics = {
284+ "unique_count" : int (unique_count ),
285+ "most_common_values" : most_common [:3 ], # Top 3 most common
286+ "average_length" : (
287+ float (avg_length ) if not pd .isna (avg_length ) else 0
288+ ),
289+ }
290+
291+ # Detect potential data quality issues
292+ data_quality_issues = []
293+ if null_percentage > 50 :
294+ data_quality_issues .append ("high_null_percentage" )
295+ if data_type == "string" and col_series .nunique () == 1 :
296+ data_quality_issues .append ("single_value_column" )
297+ if data_type == "number" and col_series .std () == 0 :
298+ data_quality_issues .append ("no_variance" )
299+
300+ columns_metadata .append (
175301 {
176- "name" : "age" ,
177- "type" : "integer" ,
178- "nullable" : True ,
179- "sample_values" : [25 , 30 , None ],
180- },
181- ],
182- "row_count" : 1000 ,
183- "file_size" : "2.5 MB" ,
302+ "name" : column ,
303+ "type" : data_type ,
304+ "nullable" : nullable ,
305+ "null_count" : int (null_count ),
306+ "null_percentage" : round (null_percentage , 2 ),
307+ "sample_values" : sample_values ,
308+ "statistics" : statistics ,
309+ "data_quality_issues" : data_quality_issues ,
310+ }
311+ )
312+
313+ # Calculate dataset-level insights
314+ dataset_insights = {
315+ "total_rows" : len (df ),
316+ "total_columns" : len (df .columns ),
317+ "total_cells" : len (df ) * len (df .columns ),
318+ "null_cells" : df .isnull ().sum ().sum (),
319+ "null_percentage" : round (
320+ (df .isnull ().sum ().sum () / (len (df ) * len (df .columns ))) * 100 , 2
321+ ),
322+ "duplicate_rows" : int (df .duplicated ().sum ()),
323+ "duplicate_percentage" : round ((df .duplicated ().sum () / len (df )) * 100 , 2 ),
324+ "numeric_columns" : len (
325+ [col for col in columns_metadata if col ["type" ] == "number" ]
326+ ),
327+ "string_columns" : len (
328+ [col for col in columns_metadata if col ["type" ] == "string" ]
329+ ),
330+ "datetime_columns" : len (
331+ [col for col in columns_metadata if col ["type" ] == "datetime" ]
332+ ),
333+ "boolean_columns" : len (
334+ [col for col in columns_metadata if col ["type" ] == "boolean" ]
335+ ),
336+ "columns_with_issues" : len (
337+ [col for col in columns_metadata if col ["data_quality_issues" ]]
338+ ),
339+ }
340+
341+ # Update task state
342+ self .update_state (
343+ state = "PROGRESS" ,
344+ meta = {"current" : 100 , "total" : 100 , "status" : "Analysis complete" },
345+ )
346+
347+ schema_result = {
348+ "filename" : filename ,
349+ "file_size_bytes" : len (file_content ),
350+ "columns" : columns_metadata ,
351+ "dataset_insights" : dataset_insights ,
352+ "analysis_timestamp" : pd .Timestamp .now ().isoformat (),
184353 }
185354
186- logger .info (f"Successfully analyzed schema for { file_path } " )
187- return schema
355+ logger .info (f"Successfully analyzed schema for { filename } " )
356+ return schema_result
188357
189358 except Exception as exc :
190- logger .error (f"Error analyzing schema for { file_path } : { str (exc )} " )
359+ logger .error (f"Error analyzing schema for { filename } : { str (exc )} " )
191360 raise exc
0 commit comments