Chuvash Toponymy Database v.0.1

csv_loader.py at [4a19a2a00b]
Login

csv_loader.py at [4a19a2a00b]

File tests/csv_loader.py artifact d1b35b0185 part of check-in 4a19a2a00b


# csv_loader.py
import csv
from pathlib import Path

def check_csv_format(csv_path, expected_num_columns, delimiter=';'):
    errors = []
    with open(csv_path, mode="r", encoding="utf-8-sig") as f:
        reader = csv.reader(f, delimiter=delimiter)
        line_num = 0
        for row in reader:
            line_num += 1
            if len(row) != expected_num_columns:
                errors.append(f"Row {line_num} in {csv_path} has {len(row)} columns; expected {expected_num_columns}.")
    return errors

def preprocess_row(row, integer_columns, real_columns):
    """
    Convert empty strings to None for INTEGER and REAL columns.
    :param row: List of values from CSV row
    :param integer_columns: List of column indices that should be treated as INTEGER
    :param real_columns: List of column indices that should be treated as REAL
    :return: Preprocessed row
    """
    processed_row = list(row)
    # Handle INTEGER columns
    for idx in integer_columns:
        if processed_row[idx] == '':  # Empty string for INTEGER column
            processed_row[idx] = None
    # Handle REAL columns
    for idx in real_columns:
        if processed_row[idx] == '':  # Empty string for REAL column
            processed_row[idx] = None
    return processed_row

def load_csv_into_table(conn, csv_path, table_name, columns, delimiter=';'):
    """
    Load CSV into PostgreSQL table, handling INTEGER and REAL columns appropriately.
    """
    # Define which columns are INTEGER based on your schema
    integer_columns_by_table = {
        "Spatial": [0, 8],  # ID, DOUBT
        "Linguistic": [0, 1, 2, 3, 6, 8, 10],  # ID, SPATID, DOUSPAT, MAINID, DOUTOPO, DOULANG, DOUPRON
        "Temporal": [0, 1, 3, 4, 5, 6, 8],  # ID, LINGID, STARTYEAR, DOUSTART, ENDYEAR, DOUEND, OBJID
        "Sources": [4]  # YEAR
    }
    
    # Define which columns are REAL based on your schema
    real_columns_by_table = {
        "Spatial": [1, 2],  # LAT, LON
        "Linguistic": [],  # No REAL columns
        "Temporal": [],    # No REAL columns
        "Sources": []      # No REAL columns
    }
    
    integer_columns = integer_columns_by_table.get(table_name, [])
    real_columns = real_columns_by_table.get(table_name, [])

    with conn.cursor() as cur:
        placeholders = ", ".join(["%s"] * len(columns))
        col_names = ", ".join(columns)
        insert_sql = f"INSERT INTO {table_name} ({col_names}) VALUES ({placeholders})"
        
        with open(csv_path, mode='r', encoding='utf-8') as f:
            reader = csv.reader(f, delimiter=delimiter)
            next(reader, None)  # Skip header
            for i, row in enumerate(reader, start=2):
                try:
                    # Preprocess the row to handle empty strings for INTEGER and REAL columns
                    processed_row = preprocess_row(row, integer_columns, real_columns)
                    cur.execute(insert_sql, processed_row)
                except Exception as e:
                    print(f"Error in {csv_path} at row {i}: {row}")
                    print(f"Exception: {e}")
                    raise
    
    conn.commit()