Source code for ABConnect.Loader

import os
import json
import codecs
import logging
import chardet
import pandas as pd
import openpyxl
from bs4 import BeautifulSoup
from typing import Any, Dict, List, Optional

logger = logging.getLogger(__name__)


[docs] class FileLoader: """ Loads data from CSV, JSON, or XLSX files while handling unsupported characters. Attributes: file_path (str): The path to the file. key (Optional[str]): Key to convert data into a dictionary form. data (List[Dict[str, Any]]): The loaded data. """
[docs] def __init__( self, file_path: str, key: Optional[str] = None, interactive: bool = True ): self.file_path = file_path self.key = key self.data: List[Dict[str, Any]] = [] self.interactive = interactive self.load()
[docs] def load(self) -> None: file_extension = os.path.splitext(self.file_path)[1].lower() if file_extension == ".csv": self._read_csv() elif file_extension == ".json": self._read_json() elif file_extension in [".xlsx", ".xls"]: self._read_xlsx() else: raise ValueError(f"Unsupported file type: {file_extension}")
def _read_csv(self) -> None: with open(self.file_path, "rb") as file: raw_data = file.read(10000) detected = chardet.detect(raw_data) encodings_to_try = [ detected.get("encoding"), "utf-8", "utf-8-sig", "cp1252", "latin1", "iso-8859-1", "mbcs", ] results = [] for encoding in encodings_to_try: if not encoding: continue try: # Read up to 5 lines (handle files with fewer than 5 lines) with codecs.open(self.file_path, "r", encoding=encoding) as f: sample_lines = [] for _ in range(5): try: sample_lines.append(next(f)) except StopIteration: break df = pd.read_csv(self.file_path, encoding=encoding, na_filter=False) df = df.fillna("") results.append( {"encoding": encoding, "sample_lines": sample_lines, "df": df} ) print(f"Successfully read file with encoding: {encoding}") except Exception as e: print(f"Error reading file with encoding {encoding}: {str(e)}") continue if not results: raise ValueError("Unable to read file with any encoding") # Select the first successful result if not interactive if self.interactive: print("\nAvailable Encodings:") for i, result in enumerate(results): print(f"{i}: {result['encoding']}") for line in result["sample_lines"]: print(line.strip()) print("-" * 40) choice = input( "\nEnter the number of the best encoding (or press Enter for the first successful one): " ) chosen_result = results[int(choice)] if choice.strip() else results[0] else: chosen_result = results[0] df = chosen_result["df"] def clean_text(text): if pd.isna(text): return text text = str(text) replacements = { "é": "é", "è": "è", "â": "â", "Ã": "à", "Å": "œ", "\x92": "'", "\x93": '"', "\x94": '"', "\ufeff": "", # Remove BOM } for bad, good in replacements.items(): text = text.replace(bad, good) soup = BeautifulSoup(text, "html.parser") return " ".join(soup.get_text().split()).strip() if "title" in df.columns: df["title"] = df["title"].apply(clean_text) self.df = df self.data = df.to_dict("records") def _read_json(self) -> None: with open(self.file_path, "r") as jsonfile: self.data = json.load(jsonfile) def _read_xlsx(self) -> None: workbook = openpyxl.load_workbook(self.file_path) sheet = workbook.active headers = [cell.value for cell in sheet[1]] self.data = [] for row in sheet.iter_rows(min_row=2, values_only=True): self.data.append(dict(zip(headers, row)))
[docs] def to_list(self) -> List[Dict[str, Any]]: """Returns the loaded data as a list of dictionaries.""" return self.data
[docs] def to_dict(self) -> Dict[str, Any]: """ Converts the loaded data into a dictionary using self.key for keys. Raises: KeyError: If self.key is not found in a row. """ if isinstance(self.data, dict): return self.data result = {} for row in self.data: if self.key not in row: raise KeyError(f"Key '{self.key}' not found in row: {row}") key_value = row.pop(self.key) if key_value in result: if isinstance(result[key_value], list): result[key_value].append(row) else: result[key_value] = [result[key_value], row] else: result[key_value] = row return result