Skip to content
Snippets Groups Projects
Commit 0263d365 authored by Neda Moeini's avatar Neda Moeini
Browse files

Refactor CSV file validation and error handling in forms.py and views.py in...

Refactor CSV file validation and error handling in forms.py and views.py in order to improve performance
parent a1bec936
No related branches found
No related tags found
1 merge request!12Refactor CSV file validation and error handling in forms.py and views.py in...
"""Forms for the file_validator app."""
import csv
import io
import re
from collections.abc import Sequence
from typing import ClassVar
from typing import ClassVar, Self
from django import forms
from django.core.files.uploadedfile import UploadedFile
......@@ -71,29 +72,45 @@ class CSVUploadForm(forms.Form):
def clean_file(self) -> UploadedFile:
"""Validate the uploaded file."""
file = self.cleaned_data["file"]
# Step 1: Validate file type
self._validate_file_type(file)
# Step 2: Parse file and validate headers
raw_data = file.read().decode("utf-8-sig")
normalized_data = raw_data.replace("\r\n", "\n").replace("\r", "\n")
csv_file = normalized_data.splitlines()
reader = csv.DictReader(csv_file, delimiter=",")
fieldnames = reader.fieldnames if reader.fieldnames is not None else []
text_stream = io.TextIOWrapper(file, encoding="utf-8-sig")
csv_content = text_stream.read().strip()
if not csv_content:
error_message = "CSV upload failed."
raise forms.ValidationError(error_message)
reader = csv.DictReader(io.StringIO(csv_content))
fieldnames = reader.fieldnames or []
self._validate_headers(fieldnames)
self._load_reference_data()
error_list = []
data = list(reader)
error_list.extend(self._validate_source_and_trader_type(data))
error_list.extend(self._validate_nominal_analysis_account(data))
error_list.extend(self._validate_nc_cc_dep_combination_against_meo_sage_account(data))
error_list.extend(self._cheque_fields_must_be_empty(data))
if error_list:
raise forms.ValidationError(error_list)
errors = []
for index, row in enumerate(reader, start=1):
errors.extend(self._validate_source_and_trader_type(row, index))
errors.extend(self._validate_nominal_analysis_account(row, index))
errors.extend(self._validate_nc_cc_dep_combination_against_meo_sage_account(row, index))
errors.extend(self._cheque_fields_must_be_empty(row, index))
if errors:
raise forms.ValidationError(errors)
self.cleaned_data["csv_data"] = csv_content
return file
def _load_reference_data(self: Self) -> None:
self.supplier_map = {
s.supplier_account_number: s.supplier_account_name
for s in MeoValidSuppliers.objects.using("meo").all()
}
self.cost_centre_map = {
cc.cc: cc.cc_type for cc in MeoCostCentres.objects.using("meo").all()
}
self.xx_data_map = {
x.xx_value: (x.project, x.overhead) for x in XxData.objects.using("meo").all()
}
@staticmethod
def _get_max_repeat(fieldnames: Sequence[str], section_prefix: str) -> int:
"""Identify the maximum number of repeats for a section."""
......@@ -132,66 +149,54 @@ class CSVUploadForm(forms.Form):
msg = f"Missing required columns: {', '.join(missing_columns)}"
raise forms.ValidationError(msg)
def _validate_source_and_trader_type(self, data: list[dict]) -> list:
def _validate_source_and_trader_type(self, row: dict, index: int) -> list:
"""Validate that 'Source' is always 80 and 'SYSTraderTranType' is always 4."""
errors = []
claimant_name = self.supplier_map.get(row.get("AccountNumber"))
claim_number = row.get("SecondReference")
for index, row in enumerate(data, start=1):
claimant_name = self.get_account_name_from_code(row.get("AccountNumber"))
claim_number = row.get("SecondReference")
if row.get("Source") != "80":
errors.append(f"Row {index}, claimant: {claimant_name} with claim number: {claim_number}: "
f"'Source' must be 80, but found {row.get('Source')}.")
if row.get("Source") != "80":
errors.append(
f"Row {index}, claimant: {claimant_name} with claim number: {claim_number}: "
f"'Source' must be 80, but found {row.get('Source')}.")
if row.get("SYSTraderTranType") != "4":
errors.append(f"Row {index}, claimant: {claimant_name} with claim number: {claim_number}: "
f"'SYSTraderTranType' must be 4, but found {row.get('SYSTraderTranType')}.")
if row.get("SYSTraderTranType") != "4":
errors.append(
f"Row {index}, claimant: {claimant_name} with claim number: {claim_number}: "
f"'SYSTraderTranType' must be 4, but found {row.get('SYSTraderTranType')}.")
return errors
@staticmethod
def _validate_nominal_analysis_account(data: list[dict]) -> list[str]:
def _validate_nominal_analysis_account(self, row: dict, index: int) -> list:
"""Validate that 'AccountNumber' matches the name in 'NominalAnalysisNominalAnalysisNarrative/1'.
This only checks the first group of NominalAnalysis columns. A list of codes/names
is fetched from the database for validation (from the 'PL Account Codes' table).
Args:
data (list[dict]): The rows of data to validate.
row (dict): The row of data to validate.
index (int): The index of the row in the CSV file.
Returns:
List[str]: A list of error messages, if any.
"""
errors = []
account_code_map = {
obj.supplier_account_number: obj.supplier_account_name
for obj in MeoValidSuppliers.objects.using("meo").all() # type: ignore[attr-defined]
}
for index, row in enumerate(data, start=1):
account_code = row.get("AccountNumber")
nominal = row.get("NominalAnalysisNominalAnalysisNarrative/1")
# Skip rows without 'AccountNumber' or 'NominalAnalysisNominalAnalysisNarrative/1'
if not account_code or not nominal:
continue
pl_account_name = account_code_map.get(account_code)
if pl_account_name is None:
errors.append(f"Row {index}: 'AccountNumber' {account_code} does not exist in PL Account Codes.")
else:
# Remove 'Soldo' and any hyphens from the PL account name. This is for credit card accounts.
revised_pl_account_name = re.sub(
r"\bSoldo\b|\s*-\s*", "", pl_account_name, flags=re.IGNORECASE).strip()
if revised_pl_account_name not in nominal:
errors.append(
f"Row {index}: 'AccountNumber' must match '{revised_pl_account_name}' in "
errors: list[str] = []
account_code = row.get("AccountNumber")
nominal = row.get("NominalAnalysisNominalAnalysisNarrative/1")
if not account_code or not nominal:
return errors
pl_account_name = self.supplier_map.get(account_code)
if pl_account_name is None:
errors.append(f"Row {index}: 'AccountNumber' {account_code} does not exist in PL Account Codes.")
else:
revised_name = re.sub(r"\bSoldo\b|\s*-\s*", "", pl_account_name, flags=re.IGNORECASE).strip()
if revised_name not in nominal:
errors.append(
f"Row {index}: 'AccountNumber' must match '{revised_name}' in "
f"'NominalAnalysisNominalAnalysisNarrative/1', but found '{nominal}'."
)
return errors
@staticmethod
......@@ -205,95 +210,82 @@ class CSVUploadForm(forms.Form):
except MeoValidSuppliers.DoesNotExist:
return None
def _validate_nc_cc_dep_combination_against_meo_sage_account(self, data: list[dict]) -> list[str]:
def _validate_nc_cc_dep_combination_against_meo_sage_account(self, row: dict, index: int) -> list:
"""Validate that all nominal analysis fields exist in MEO.
This includes 'NominalAnalysisNominalCostCentre/{N}', 'NominalAnalysisNominalDepartment/{N}',
and 'NominalAnalysisNominalAccountNumber/{N}'.
Args:
data (list[dict]): The rows of data to validate.
row (dict): The row of data to validate.
index (int): The index of the row in the CSV file.
Returns:
List[str]: A list of error messages, if any.
"""
errors = []
cost_centre_map = {
obj.cc: obj.cc_type for obj in MeoCostCentres.objects.using("meo").all()
}
xx_data_map = {
obj.xx_value: (obj.project, obj.overhead) for obj in XxData.objects.using("meo").all()
}
fieldnames = list(data[0].keys())
fieldnames = list(row.keys())
max_repeat = self._get_max_repeat(fieldnames, "NominalAnalysisNominalCostCentre")
claimant_name = self.get_account_name_from_code(row.get("AccountNumber"))
claim_number = row.get("SecondReference")
for repeat in range(1, max_repeat + 1):
cc_field = f"NominalAnalysisNominalCostCentre/{repeat}"
dep_field = f"NominalAnalysisNominalDepartment/{repeat}"
nom_field = f"NominalAnalysisNominalAccountNumber/{repeat}"
cc = row.get(cc_field)
dep = row.get(dep_field)
nom= row.get(nom_field)
if not cc and not dep and not nom:
continue
if not cc or not dep or not nom:
errors.append(
f"Row {index}: Missing values in '{cc_field}', '{dep_field}', or '{nom_field}'.")
continue
for index, row in enumerate(data, start=1):
claimant_name = self.get_account_name_from_code(row.get("AccountNumber"))
claim_number = row.get("SecondReference")
for repeat in range(1, max_repeat + 1):
cc_field = f"NominalAnalysisNominalCostCentre/{repeat}"
dep_field = f"NominalAnalysisNominalDepartment/{repeat}"
nominal_account_field = f"NominalAnalysisNominalAccountNumber/{repeat}"
cc = row.get(cc_field)
dep = row.get(dep_field)
nominal_account_name = row.get(nominal_account_field)
if not cc and not dep and not nominal_account_name:
continue
if not cc or not dep or not nominal_account_name:
errors.append(
f"Row {index}: Missing values in '{cc_field}', '{dep_field}', or '{nominal_account_field}'.")
continue
cc_type = cost_centre_map.get(cc)
if not cc_type:
errors.append(f"Row {index}: '{cc_field}' ({cc}) is not a valid cost centre.")
continue
xx_data = xx_data_map.get(nominal_account_name)
cc_type = self.cost_centre_map.get(cc)
if not cc_type:
errors.append(f"Row {index}: '{cc_field}' ({cc}) is not a valid cost centre.")
continue
if xx_data:
nc = xx_data[0] if cc_type == "Project" else xx_data[1]
elif MeoNominal.objects.using("meo").filter(nom=nominal_account_name).exists():
nc = nominal_account_name
else:
errors.append(f"Row {index}: '{nominal_account_field}' ({nominal_account_name}) is not valid.")
continue
xx_data = self.xx_data_map.get(nom)
if xx_data:
nc = xx_data[0] if cc_type == "Project" else xx_data[1]
elif MeoNominal.objects.using("meo").filter(nom=nom).exists():
nc = nom
else:
errors.append(f"Row {index}: '{nom_field}' ({nom}) is not valid.")
continue
if not MeoValidSageAccounts.objects.using("meo").filter(
account_cost_centre=cc, account_department=dep, account_number=nc
).exists():
errors.append(
f"Row {index}: The combination of '{cc_field}' ({cc}), "
f"'{dep_field}' ({dep}), and '{nominal_account_field}' "
f"({nc}) for claimant '{claimant_name}' and claim number '{claim_number}' "
f"does not exist in MEO valid Sage accounts."
)
if not MeoValidSageAccounts.objects.using("meo").filter(
account_cost_centre=cc, account_department=dep, account_number=nc
).exists():
errors.append(
f"Row {index}: The combination of '{cc_field}' ({cc}), "
f"'{dep_field}' ({dep}), and '{nom_field}' "
f"({nc}) for claimant '{claimant_name}' and claim number '{claim_number}' "
f"does not exist in MEO valid Sage accounts."
)
return errors
def _cheque_fields_must_be_empty(self, data: list[dict]) -> list[str]:
def _cheque_fields_must_be_empty(self, row: dict, index: int) -> list:
"""Validate that cheque fields are empty.
The cheque fields are 'ChequeCurrencyName', 'ChequeToBankExchangeRate', and 'ChequeValueInChequeCurrency'.
"""
errors = []
for index, row in enumerate(data, start=1):
cheque_currency_name = row.get("ChequeCurrencyName")
cheque_to_bank_exchange_rate = row.get("ChequeToBankExchangeRate")
cheque_value_in_cheque_currency = row.get("ChequeValueInChequeCurrency")
if any([
row.get("ChequeCurrencyName"),
row.get("ChequeToBankExchangeRate"),
row.get("ChequeValueInChequeCurrency")
]):
claimant_name = self.get_account_name_from_code(row.get("AccountNumber"))
claim_number = row.get("SecondReference")
if any([cheque_currency_name, cheque_to_bank_exchange_rate, cheque_value_in_cheque_currency]):
errors.append(
f"Row {index}: Unexpected values in the Cheque columns for {claimant_name} with claim number: "
f"{claim_number}. All cheque columns must be empty."
)
errors.append(
f"Row {index}: Unexpected values in the Cheque columns for {claimant_name} with claim number: "
f"{claim_number}. All cheque columns must be empty."
)
return errors
......@@ -51,20 +51,14 @@ class CSVUploadAPIView(APIView):
if not form.is_valid():
return Response({"status": "error", "errors": form.errors}, status=status.HTTP_400_BAD_REQUEST)
csv_file = form.cleaned_data["file"]
csv_file.seek(0)
decoded_file = csv_file.read().decode("utf-8-sig").strip()
if not decoded_file:
return Response({"status": "error", "message": "Uploaded file is empty."},
status=status.HTTP_400_BAD_REQUEST)
decoded_file = form.cleaned_data["csv_data"]
reader = csv.DictReader(io.StringIO(decoded_file))
csv_data: list[dict[str, str]] = list(reader)
updated_data = self.update_fields(csv_data)
request.session["validated_csv"] = updated_data
request.session["input_file_hash"] = UserActivityLog.generate_file_hash(csv_file)
request.session["input_file_hash"] = UserActivityLog.generate_file_hash(form.cleaned_data["csv_data"])
request.session.modified = True
return Response({
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment