# csv_loader.py
import csv
from pathlib import Path
def check_csv_format(csv_path, expected_num_columns, delimiter=';'):
errors = []
with open(csv_path, mode="r", encoding="utf-8-sig") as f:
reader = csv.reader(f, delimiter=delimiter)
line_num = 0
for row in reader:
line_num += 1
if len(row) != expected_num_columns:
errors.append(f"Row {line_num} in {csv_path} has {len(row)} columns; expected {expected_num_columns}.")
return errors
def preprocess_row(row, integer_columns, real_columns):
"""
Convert empty strings to None for INTEGER and REAL columns.
:param row: List of values from CSV row
:param integer_columns: List of column indices that should be treated as INTEGER
:param real_columns: List of column indices that should be treated as REAL
:return: Preprocessed row
"""
processed_row = list(row)
# Handle INTEGER columns
for idx in integer_columns:
if processed_row[idx] == '': # Empty string for INTEGER column
processed_row[idx] = None
# Handle REAL columns
for idx in real_columns:
if processed_row[idx] == '': # Empty string for REAL column
processed_row[idx] = None
return processed_row
def load_csv_into_table(conn, csv_path, table_name, columns, delimiter=';'):
"""
Load CSV into PostgreSQL table, handling INTEGER and REAL columns appropriately.
"""
# Define which columns are INTEGER based on your schema
integer_columns_by_table = {
"Spatial": [0, 8], # ID, DOUBT
"Linguistic": [0, 1, 2, 3, 6, 8, 10], # ID, SPATID, DOUSPAT, MAINID, DOUTOPO, DOULANG, DOUPRON
"Temporal": [0, 1, 3, 4, 5, 6, 8], # ID, LINGID, START, DOUSTART, END, DOUEND, OBJID
"Sources": [4] # YEAR
}
# Define which columns are REAL based on your schema
real_columns_by_table = {
"Spatial": [1, 2], # LAT, LON
"Linguistic": [], # No REAL columns
"Temporal": [], # No REAL columns
"Sources": [] # No REAL columns
}
integer_columns = integer_columns_by_table.get(table_name, [])
real_columns = real_columns_by_table.get(table_name, [])
with conn.cursor() as cur:
placeholders = ", ".join(["%s"] * len(columns))
col_names = ", ".join(columns)
insert_sql = f"INSERT INTO {table_name} ({col_names}) VALUES ({placeholders})"
with open(csv_path, mode='r', encoding='utf-8') as f:
reader = csv.reader(f, delimiter=delimiter)
next(reader, None) # Skip header
for i, row in enumerate(reader, start=2):
try:
# Preprocess the row to handle empty strings for INTEGER and REAL columns
processed_row = preprocess_row(row, integer_columns, real_columns)
cur.execute(insert_sql, processed_row)
except Exception as e:
print(f"Error in {csv_path} at row {i}: {row}")
print(f"Exception: {e}")
raise
conn.commit()