ADDED tests/__init__.py Index: tests/__init__.py ================================================================== --- /dev/null +++ tests/__init__.py ADDED tests/config.py Index: tests/config.py ================================================================== --- /dev/null +++ tests/config.py @@ -0,0 +1,35 @@ +# config.py +import argparse + +def parse_args(): + parser = argparse.ArgumentParser( + description="Validate toponymical CSV files by loading into PostgreSQL and checking constraints." + ) + parser.add_argument("--spatial", required=True, help="Path to the Spatial CSV file.") + parser.add_argument("--linguistic", required=True, help="Path to the Linguistic CSV file.") + parser.add_argument("--temporal", required=True, help="Path to the Temporal CSV file.") + parser.add_argument("--sources", required=True, help="Path to the Sources CSV file.") + parser.add_argument("--db-host", default="localhost", help="PostgreSQL host") + parser.add_argument("--db-port", default=5432, type=int, help="PostgreSQL port") + parser.add_argument("--db-name", required=True, help="PostgreSQL database name") + parser.add_argument("--db-user", required=True, help="PostgreSQL username") + parser.add_argument("--db-password", required=False, help="PostgreSQL password") + return parser.parse_args() + +# Database connection parameters (example, adjust as needed) +DB_PARAMS = { + "host": None, + "port": None, + "dbname": None, + "user": None, + "password": None +} + +def update_db_params(args): + DB_PARAMS.update({ + "host": args.db_host, + "port": args.db_port, + "dbname": args.db_name, + "user": args.db_user, + "password": args.db_password + }) ADDED tests/csv_loader.py Index: tests/csv_loader.py ================================================================== --- /dev/null +++ tests/csv_loader.py @@ -0,0 +1,76 @@ +# csv_loader.py +import csv +from pathlib import Path + +def check_csv_format(csv_path, expected_num_columns, delimiter=';'): + errors = [] + with open(csv_path, mode="r", encoding="utf-8-sig") as f: + reader = csv.reader(f, delimiter=delimiter) + line_num = 0 + for row in reader: + line_num += 1 + if len(row) != expected_num_columns: + errors.append(f"Row {line_num} in {csv_path} has {len(row)} columns; expected {expected_num_columns}.") + return errors + +def preprocess_row(row, integer_columns, real_columns): + """ + Convert empty strings to None for INTEGER and REAL columns. + :param row: List of values from CSV row + :param integer_columns: List of column indices that should be treated as INTEGER + :param real_columns: List of column indices that should be treated as REAL + :return: Preprocessed row + """ + processed_row = list(row) + # Handle INTEGER columns + for idx in integer_columns: + if processed_row[idx] == '': # Empty string for INTEGER column + processed_row[idx] = None + # Handle REAL columns + for idx in real_columns: + if processed_row[idx] == '': # Empty string for REAL column + processed_row[idx] = None + return processed_row + +def load_csv_into_table(conn, csv_path, table_name, columns, delimiter=';'): + """ + Load CSV into PostgreSQL table, handling INTEGER and REAL columns appropriately. + """ + # Define which columns are INTEGER based on your schema + integer_columns_by_table = { + "Spatial": [0, 8], # ID, DOUBT + "Linguistic": [0, 1, 2, 3, 6, 8, 10], # ID, SPATID, DOUSPAT, MAINID, DOUTOPO, DOULANG, DOUPRON + "Temporal": [0, 1, 3, 4, 5, 6, 8], # ID, LINGID, START, DOUSTART, END, DOUEND, OBJID + "Sources": [4] # YEAR + } + + # Define which columns are REAL based on your schema + real_columns_by_table = { + "Spatial": [1, 2], # LAT, LON + "Linguistic": [], # No REAL columns + "Temporal": [], # No REAL columns + "Sources": [] # No REAL columns + } + + integer_columns = integer_columns_by_table.get(table_name, []) + real_columns = real_columns_by_table.get(table_name, []) + + with conn.cursor() as cur: + placeholders = ", ".join(["%s"] * len(columns)) + col_names = ", ".join(columns) + insert_sql = f"INSERT INTO {table_name} ({col_names}) VALUES ({placeholders})" + + with open(csv_path, mode='r', encoding='utf-8') as f: + reader = csv.reader(f, delimiter=delimiter) + next(reader, None) # Skip header + for i, row in enumerate(reader, start=2): + try: + # Preprocess the row to handle empty strings for INTEGER and REAL columns + processed_row = preprocess_row(row, integer_columns, real_columns) + cur.execute(insert_sql, processed_row) + except Exception as e: + print(f"Error in {csv_path} at row {i}: {row}") + print(f"Exception: {e}") + raise + + conn.commit() ADDED tests/main.py Index: tests/main.py ================================================================== --- /dev/null +++ tests/main.py @@ -0,0 +1,74 @@ +# main.py +import sys +import psycopg2 +from config import parse_args, update_db_params, DB_PARAMS +from schema import create_tables, drop_tables +from csv_loader import check_csv_format, load_csv_into_table +from validators import run_tests + +def main(): + args = parse_args() + update_db_params(args) + + # CSV format checks + format_errors = ( + check_csv_format(args.spatial, 12) + + check_csv_format(args.linguistic, 15) + + check_csv_format(args.temporal, 13) + + check_csv_format(args.sources, 10) + ) + if format_errors: + print("CSV Format/Parsing Errors Detected:") + for err in format_errors: + print(" -", err) + sys.exit(1) + + # Connect to PostgreSQL + try: + conn = psycopg2.connect(**DB_PARAMS) + conn.set_client_encoding('UTF8') # Ensure client encoding is UTF-8 + except psycopg2.Error as e: + print(f"Failed to connect to PostgreSQL: {e}") + sys.exit(1) + + # Setup and load data + try: + drop_tables(conn) # Ensure a clean slate + create_tables(conn) + + load_csv_into_table( + conn, args.spatial, "Spatial", + ["ID", "LAT", "LON", "OFFNAME", "LANG", "CLASS", "TYPE", "DISTRICT", "DOUBT", "LANDMARK", "COMMENTS", "OTHER"] + ) + load_csv_into_table( + conn, args.linguistic, "Linguistic", + ["ID", "SPATID", "DOUSPAT", "MAINID", "TOPONYM", "TOPFORMS", "DOUTOPO", "LANG", "DOULANG", + "PRONUNC", "DOUPRON", "ETYM", "ORIGIN", "COMMENTS", "OTHER"] + ) + load_csv_into_table( + conn, args.temporal, "Temporal", + ["ID", "LINGID", "LINGNAME", "START", "DOUSTART", "END", "DOUEND", "EVENT", "OBJID", + "OBJNAME", "COMMENTS", "OTHER", "FULLTEXT"] + ) + load_csv_into_table( + conn, args.sources, "Sources", + ["ID", "TYPE", "AUTHOR", "TITLE", "YEAR", "PUBLISHER", "CITATION", "COMMENTS", "PDF", "OTHER"] + ) + + # Run validations + errors = run_tests(conn) + if errors: + print("Validation Errors Detected:") + for e in errors: + print(" -", e) + print(f"Total errors: {len(errors)}") + sys.exit(1) + else: + print("All validations passed successfully!") + sys.exit(0) + + finally: + conn.close() + +if __name__ == "__main__": + main() ADDED tests/schema.py Index: tests/schema.py ================================================================== --- /dev/null +++ tests/schema.py @@ -0,0 +1,91 @@ +# schema.py +import psycopg2 + +def create_tables(conn): + """ + Creates tables in PostgreSQL with constraints. + PostgreSQL supports more robust constraints than SQLite. + """ + with conn.cursor() as cur: + # Spatial Table + cur.execute(""" + CREATE TABLE IF NOT EXISTS Spatial ( + ID SERIAL PRIMARY KEY, + LAT REAL CHECK (LAT >= -90 AND LAT <= 90), + LON REAL CHECK (LON >= -180 AND LON <= 180), + OFFNAME TEXT NOT NULL, + LANG TEXT NOT NULL CHECK (LANG IN ('RUS','CHU','HIM','MEM','ERZ','TAT','UNK','OTH')), + CLASS TEXT, + TYPE TEXT, + DISTRICT TEXT, + DOUBT INTEGER, + LANDMARK TEXT, + COMMENTS TEXT, + OTHER TEXT + ); + """) + + # Linguistic Table + cur.execute(""" + CREATE TABLE IF NOT EXISTS Linguistic ( + ID SERIAL PRIMARY KEY, + SPATID INTEGER REFERENCES Spatial(ID) ON DELETE CASCADE ON UPDATE CASCADE, + DOUSPAT INTEGER, + MAINID INTEGER REFERENCES Linguistic(ID) ON DELETE CASCADE ON UPDATE CASCADE, + TOPONYM TEXT NOT NULL, + TOPFORMS TEXT, + DOUTOPO INTEGER, + LANG TEXT NOT NULL CHECK (LANG IN ('RUS','CHU','HIM','MEM','ERZ','TAT','UNK','OTH')), + DOULANG INTEGER, + PRONUNC TEXT, + DOUPRON INTEGER, + ETYM TEXT, + ORIGIN TEXT, + COMMENTS TEXT, + OTHER TEXT + ); + """) + + # Temporal Table + cur.execute(""" + CREATE TABLE IF NOT EXISTS Temporal ( + ID SERIAL PRIMARY KEY, + LINGID INTEGER REFERENCES Linguistic(ID) ON DELETE CASCADE ON UPDATE CASCADE, + LINGNAME TEXT, + STARTYEAR INTEGER NOT NULL, + DOUSTART INTEGER, + ENDYEAR INTEGER, + DOUEND INTEGER, + EVENT TEXT NOT NULL CHECK (EVENT IN ('MERGEIN', 'ACTIVE', 'RENAME', 'CEASE')), + OBJID INTEGER, + OBJNAME TEXT, + COMMENTS TEXT, + OTHER TEXT, + FULLTEXT TEXT NOT NULL, + CONSTRAINT valid_years CHECK (ENDYEAR IS NULL OR STARTYEAR < ENDYEAR) + ); + """) + + # Sources Table + cur.execute(""" + CREATE TABLE IF NOT EXISTS Sources ( + ID TEXT PRIMARY KEY, + TYPE TEXT NOT NULL, + AUTHOR TEXT, + TITLE TEXT NOT NULL, + YEAR INTEGER, + PUBLISHER TEXT, + CITATION TEXT, + COMMENTS TEXT, + PDF TEXT, + OTHER TEXT + ); + """) + + conn.commit() + +def drop_tables(conn): + """Helper to drop tables for clean testing.""" + with conn.cursor() as cur: + cur.execute("DROP TABLE IF EXISTS Temporal, Linguistic, Spatial, Sources CASCADE;") + conn.commit() ADDED tests/test_docs_v0.5.md Index: tests/test_docs_v0.5.md ================================================================== --- /dev/null +++ tests/test_docs_v0.5.md @@ -0,0 +1,61 @@ +# Structure + +tests/ +├── __init__.py # Marks this as a Python package +├── config.py # Configuration (e.g., database connection details) +├── schema.py # Table creation and schema definitions +├── csv_loader.py # CSV loading logic +├── validators.py # Validation checks (schema, logic, data quality) +└── main.py # Entry point to tie everything together + +## __init__.py +Empty. + +## config.py +This file holds configuration details, such as database connection parameters and CSV file paths. + +## schema.py +This file defines the PostgreSQL table schemas with proper constraints. + +## csv_loader.py +Handles CSV parsing and loading into PostgreSQL. + +## validators.py +Contains all validation logic, adapted for PostgreSQL syntax. + +## main.py +Ties everything together and runs the validation. + + +# Key Changes and Notes + +## PostgreSQL Integration: +- Replaced sqlite3 with psycopg2 for PostgreSQL connectivity. +- Added SERIAL for auto-incrementing IDs (PostgreSQL equivalent of SQLite’s INTEGER PRIMARY KEY). +- Moved some checks (e.g., coordinate ranges, allowed LANG/EVENT values) into schema-level CHECK constraints, leveraging PostgreSQL’s capabilities. + +## Modular Design: +- Separated concerns into configuration, schema, loading, validation, and execution. +- Each module can be independently tested or extended. + +## Error Handling: +- Added basic database connection error handling. +- Ensures the connection is closed in a finally block. + +## Execution: +Run the script with: +``` +python main.py \ + --spatial /path/to/Spatial.csv \ + --linguistic /path/to/Linguistic.csv \ + --temporal /path/to/Temporal.csv \ + --sources /path/to/Sources.csv \ + --db-name your_db \ + --db-user your_user \ + --db-password your_password +``` + +## Customization: +- Adjust column names in main.py to match your CSV files exactly. +- Add more validation rules in validators.py as needed. +- Modify schema.py if your constraints differ. ADDED tests/validators.py Index: tests/validators.py ================================================================== --- /dev/null +++ tests/validators.py @@ -0,0 +1,111 @@ +# validators.py +def run_tests(conn): + errors = [] + with conn.cursor() as cur: + # Schema-Level & Key Constraints + # Check for duplicate IDs in Spatial + cur.execute(""" + SELECT ID, COUNT(*) + FROM Spatial + GROUP BY ID + HAVING COUNT(*) > 1 + """) + for row in cur.fetchall(): + errors.append(f"[Spatial] Duplicate ID found: {row[0]}") + + # Check for duplicate IDs in Linguistic + cur.execute(""" + SELECT ID, COUNT(*) + FROM Linguistic + GROUP BY ID + HAVING COUNT(*) > 1 + """) + for row in cur.fetchall(): + errors.append(f"[Linguistic] Duplicate ID found: {row[0]}") + + # Check for duplicate IDs in Temporal + cur.execute(""" + SELECT ID, COUNT(*) + FROM Temporal + GROUP BY ID + HAVING COUNT(*) > 1 + """) + for row in cur.fetchall(): + errors.append(f"[Temporal] Duplicate ID found: {row[0]}") + + # Check for duplicate IDs in Sources + cur.execute(""" + SELECT ID, COUNT(*) + FROM Sources + GROUP BY ID + HAVING COUNT(*) > 1 + """) + for row in cur.fetchall(): + errors.append(f"[Sources] Duplicate ID found: {row[0]}") + + # Check foreign key: Linguistic.SPATID must exist in Spatial + cur.execute(""" + SELECT L.ID, L.SPATID + FROM Linguistic L + LEFT JOIN Spatial S ON L.SPATID = S.ID + WHERE L.SPATID IS NOT NULL AND S.ID IS NULL + """) + for row in cur.fetchall(): + errors.append(f"[Linguistic] SPATID={row[1]} not found in Spatial (Linguistic.ID={row[0]})") + + # Check foreign key: Temporal.LINGID must exist in Linguistic + cur.execute(""" + SELECT T.ID, T.LINGID + FROM Temporal T + LEFT JOIN Linguistic L ON T.LINGID = L.ID + WHERE T.LINGID IS NOT NULL AND L.ID IS NULL + """) + for row in cur.fetchall(): + errors.append(f"[Temporal] LINGID={row[1]} not found in Linguistic (Temporal.ID={row[0]})") + + # (Composite key check) For each row in Temporal, + # LINGNAME should match the TOPONYM in the row of Linguistic where ID = LINGID. + cur.execute(""" + SELECT T.ID, T.LINGID, T.LINGNAME, L.TOPONYM + FROM Temporal T + LEFT JOIN Linguistic L ON T.LINGID = L.ID + WHERE L.ID IS NOT NULL AND T.LINGNAME != L.TOPONYM + """) + for row in cur.fetchall(): + errors.append(f"[Temporal] LINGNAME='{row[2]}' does not match TOPONYM='{row[3]}' for LINGID={row[1]} (Temporal.ID={row[0]})") + + # Logic-Based Validations + # Example: In Temporal, if EVENT='ACTIVE' => END should be NULL (or at least not set) + cur.execute(""" + SELECT ID, ENDYEAR, EVENT + FROM Temporal + WHERE EVENT='ACTIVE' AND ENDYEAR IS NOT NULL + """) + for row in cur.fetchall(): + errors.append(f"[Temporal] EVENT=ACTIVE but END is not null for row ID={row[0]}") + + # Example: If EVENT='CEASE' => OBJID and OBJNAME must be empty + cur.execute(""" + SELECT ID, OBJID, OBJNAME + FROM Temporal + WHERE EVENT='CEASE' AND (OBJID IS NOT NULL OR OBJNAME IS NOT NULL AND OBJNAME != '') + """) + for row in cur.fetchall(): + errors.append(f"[Temporal] EVENT=CEASE but OBJID/OBJNAME present for ID={row[0]}") + + # Example: If EVENT in ('RENAME','MERGEIN') => OBJID/OBJNAME must not be empty + cur.execute(""" + SELECT ID, EVENT, OBJID, OBJNAME + FROM Temporal + WHERE EVENT IN ('RENAME','MERGEIN') + AND (OBJID IS NULL OR OBJNAME IS NULL OR OBJNAME = '') + """) + for row in cur.fetchall(): + errors.append(f"[Temporal] EVENT={row[1]} but OBJID/OBJNAME is missing (Temporal.ID={row[0]})") + + # Data Quality Checks + # Note: Coordinate range checks are now in schema constraints + # EVENT values are checked via schema constraint + # LANG codes are checked via schema constraint + + return errors