import sys
import pandas as pd
from PyQt5.QtWidgets import (QApplication, QWidget, QVBoxLayout, QHBoxLayout, QPushButton,
QFileDialog, QTableView, QHeaderView, QLabel,
QMessageBox, QGroupBox, QFormLayout, QTableWidget,
QTableWidgetItem, QSpinBox, QProgressBar, QCheckBox,
QRadioButton, QSplitter)
from PyQt5.QtCore import QAbstractTableModel, Qt, QVariant
class PandasModel(QAbstractTableModel):
"""A model to interface a pandas DataFrame with a QTableView."""
def __init__(self, data):
super().__init__()
self._data = data
def rowCount(self, parent=None):
return self._data.shape[0]
def columnCount(self, parent=None):
return self._data.shape[1]
def data(self, index, role=Qt.DisplayRole):
if index.isValid() and role == Qt.DisplayRole:
try:
value = self._data.iloc[index.row(), index.column()]
if isinstance(value, (int, float)):
return f"{value:,.2f}"
return str(value)
except (ValueError, TypeError):
return str(self._data.iloc[index.row(), index.column()])
return QVariant()
def headerData(self, section, orientation, role=Qt.DisplayRole):
if role == Qt.DisplayRole:
if orientation == Qt.Horizontal:
return str(self._data.columns[section])
if orientation == Qt.Vertical:
return str(self._data.index[section])
return QVariant()
class AgeingApp(QWidget):
def __init__(self):
super().__init__()
self.df = None
self.processed_df = None
self.setWindowTitle('Ageing Adjustment Tool')
self.setGeometry(100, 100, 1400, 900)
self.setAcceptDrops(True) # Enable Drag and Drop
# --- Main Layout ---
main_layout = QVBoxLayout(self)
top_splitter = QSplitter(Qt.Horizontal)
# --- Left Panel (Controls) ---
left_panel = QWidget()
controls_layout = QVBoxLayout(left_panel)
# File Operations
file_group = QGroupBox("1. Load Data (Upload or Drag & Drop)")
file_layout = QVBoxLayout()
self.upload_button = QPushButton('Upload Excel/CSV File')
self.upload_button.clicked.connect(self.upload_file)
file_layout.addWidget(self.upload_button)
file_group.setLayout(file_layout)
controls_layout.addWidget(file_group)
# Column Mapping
mapping_group = QGroupBox("2. Define Ageing Hierarchy")
mapping_layout = QVBoxLayout()
mapping_layout.addWidget(QLabel("Set a unique priority for each bucket (1 = Oldest)."))
self.hierarchy_table = QTableWidget()
self.hierarchy_table.setColumnCount(2)
self.hierarchy_table.setHorizontalHeaderLabels(["Ageing Column", "Priority (1=Oldest)"])
self.hierarchy_table.horizontalHeader().setSectionResizeMode(0, QHeaderView.Stretch)
mapping_layout.addWidget(self.hierarchy_table)
mapping_group.setLayout(mapping_layout)
controls_layout.addWidget(mapping_group)
# Process Controls
process_group = QGroupBox("3. Process Data")
process_layout = QVBoxLayout()
self.negate_checkbox = QCheckBox("Negate all ageing figures before processing")
self.process_button = QPushButton('Process Ageing Adjustment')
self.process_button.clicked.connect(self.process_data)
self.process_button.setEnabled(False)
self.progress_bar = QProgressBar()
process_layout.addWidget(self.negate_checkbox)
process_layout.addWidget(self.process_button)
process_layout.addWidget(self.progress_bar)
process_group.setLayout(process_layout)
controls_layout.addWidget(process_group)
# Export Options
export_group = QGroupBox("4. Export Results")
export_layout = QVBoxLayout()
self.export_button = QPushButton('Export to Excel')
self.export_button.clicked.connect(self.export_file)
self.export_button.setEnabled(False)
self.export_compare_checkbox = QCheckBox("Compare original and adjusted data in export")
self.export_compare_checkbox.toggled.connect(self.toggle_export_options)
self.export_options_group = QGroupBox("Comparison Format")
export_options_layout = QVBoxLayout()
self.radio_side_by_side = QRadioButton("Side-by-side columns")
self.radio_separate_sheets = QRadioButton("Separate sheets")
self.radio_side_by_side.setChecked(True)
export_options_layout.addWidget(self.radio_side_by_side)
export_options_layout.addWidget(self.radio_separate_sheets)
self.export_options_group.setLayout(export_options_layout)
self.export_options_group.setEnabled(False)
export_layout.addWidget(self.export_button)
export_layout.addWidget(self.export_compare_checkbox)
export_layout.addWidget(self.export_options_group)
export_group.setLayout(export_layout)
controls_layout.addWidget(export_group)
controls_layout.addStretch()
top_splitter.addWidget(left_panel)
# --- Right Panel (Data Views) ---
right_panel = QWidget()
data_layout = QVBoxLayout(right_panel)
data_splitter = QSplitter(Qt.Vertical)
self.original_table = QTableView()
self.processed_table = QTableView()
original_group = QGroupBox("Original Data")
original_layout = QVBoxLayout()
original_layout.addWidget(self.original_table)
original_group.setLayout(original_layout)
processed_group = QGroupBox("Adjusted Data")
processed_layout = QVBoxLayout()
processed_layout.addWidget(self.processed_table)
processed_group.setLayout(processed_layout)
data_splitter.addWidget(original_group)
data_splitter.addWidget(processed_group)
data_layout.addWidget(data_splitter)
top_splitter.addWidget(right_panel)
top_splitter.setSizes([400, 1000]) # Initial size split
main_layout.addWidget(top_splitter)
def dragEnterEvent(self, event):
if event.mimeData().hasUrls():
event.accept()
else:
event.ignore()
def dropEvent(self, event):
files = [u.toLocalFile() for u in event.mimeData().urls()]
if files:
self.load_data_from_path(files[0])
def upload_file(self):
file_path, _ = QFileDialog.getOpenFileName(self, "Open File", "", "Data Files (*.xlsx *.xls *.csv)")
if file_path:
self.load_data_from_path(file_path)
def load_data_from_path(self, path):
try:
if path.endswith('.csv'):
self.df = pd.read_csv(path)
else:
self.df = pd.read_excel(path)
self.original_table.setModel(PandasModel(self.df))
self.populate_hierarchy_table()
self.process_button.setEnabled(True)
self.export_button.setEnabled(False)
self.processed_table.setModel(None)
self.progress_bar.setValue(0)
self.show_message("Success", f"Successfully loaded {path.split('/')[-1]}")
except Exception as e:
self.show_message("Error", f"Failed to load file: {e}")
def populate_hierarchy_table(self):
self.hierarchy_table.setRowCount(0)
numeric_cols = self.df.select_dtypes(include='number').columns
for col in numeric_cols:
row_position = self.hierarchy_table.rowCount()
self.hierarchy_table.insertRow(row_position)
self.hierarchy_table.setItem(row_position, 0, QTableWidgetItem(col))
spin_box = QSpinBox()
spin_box.setRange(0, 100)
spin_box.setSpecialValueText("Not in hierarchy")
self.hierarchy_table.setCellWidget(row_position, 1, spin_box)
def process_data(self):
try:
self.progress_bar.setValue(5)
# 1. Get hierarchy from UI
hierarchy = {}
for i in range(self.hierarchy_table.rowCount()):
priority = self.hierarchy_table.cellWidget(i, 1).value()
if priority > 0:
col_name = self.hierarchy_table.item(i, 0).text()
if priority in hierarchy:
self.show_message("Error", "Priorities must be unique.")
return
hierarchy[priority] = col_name
if len(hierarchy) < 2:
self.show_message("Error", "Please define a hierarchy for at least two columns.")
return
adjustment_order = [hierarchy[p] for p in sorted(hierarchy.keys())]
ageing_cols = list(hierarchy.values())
self.progress_bar.setValue(15)
# 2. Prepare DataFrame for vectorized operation
data_to_process = self.df.copy()
for col in ageing_cols:
data_to_process[col] = pd.to_numeric(data_to_process[col], errors='coerce').fillna(0)
# 2a. Negate figures if checkbox is ticked
if self.negate_checkbox.isChecked():
data_to_process[ageing_cols] *= -1
self.progress_bar.setValue(25)
# 3. Separate debits (positive) and credits (negative)
debits = data_to_process[ageing_cols].clip(lower=0)
credits = data_to_process[ageing_cols].clip(upper=0).abs()
credit_pool = credits.sum(axis=1) # Total credit available per row
self.progress_bar.setValue(40)
# 4. Apply credit pool to knock down debits, starting from oldest
final_debits = debits.copy()
progress_step = 30 / len(adjustment_order)
current_progress = 40
for col in adjustment_order:
adjustment = pd.concat([final_debits[col], credit_pool], axis=1).min(axis=1)
final_debits[col] -= adjustment
credit_pool -= adjustment # Reduce the available credit pool
current_progress += progress_step
self.progress_bar.setValue(int(current_progress))
# `credit_pool` now contains the total *unused* credit (overpayment) for each row.
self.progress_bar.setValue(75)
# 5. Determine how much of the original credit remains.
# We assume newest credits are used first (LIFO).
credit_usage_order = adjustment_order[::-1]
credit_used = credits.sum(axis=1) - credit_pool # Total credit that was applied to debits
final_credits = credits.copy()
for col in credit_usage_order:
adjustment = pd.concat([final_credits[col], credit_used], axis=1).min(axis=1)
final_credits[col] -= adjustment
credit_used -= adjustment
self.progress_bar.setValue(90)
# 6. Combine final debits and final credits
# Start with a copy of the original data to preserve non-ageing columns
self.processed_df = self.df.copy()
# Combine the adjusted debits and remaining credits (as negative numbers)
final_ageing_data = final_debits - final_credits
# Update the ageing columns in the final dataframe
self.processed_df[ageing_cols] = final_ageing_data[ageing_cols]
self.processed_table.setModel(PandasModel(self.processed_df))
self.export_button.setEnabled(True)
self.progress_bar.setValue(100)
self.show_message("Success", "Ageing data has been successfully adjusted.")
except Exception as e:
self.show_message("Processing Error", f"An error occurred: {e}")
self.progress_bar.setValue(0)
def toggle_export_options(self, checked):
self.export_options_group.setEnabled(checked)
def export_file(self):
if self.processed_df is None:
self.show_message("Warning", "No processed data to export.")
return
save_path, _ = QFileDialog.getSaveFileName(self, "Save Excel File", "", "Excel Files (*.xlsx)")
if not save_path:
return
try:
if not self.export_compare_checkbox.isChecked():
self.processed_df.to_excel(save_path, index=False)
else:
if self.radio_separate_sheets.isChecked():
with pd.ExcelWriter(save_path) as writer:
self.df.to_excel(writer, sheet_name='Original Data', index=False)
self.processed_df.to_excel(writer, sheet_name='Adjusted Data', index=False)
else:
original_renamed = self.df.add_suffix('_orig')
processed_renamed = self.processed_df.add_suffix('_adj')
non_ageing_cols = [c for c in self.df.columns if c not in self.processed_df.columns]
combined = pd.concat([original_renamed, processed_renamed.drop(columns=[c+'_adj' for c in non_ageing_cols if c+'_adj' in processed_renamed.columns], errors='ignore')], axis=1)
combined.to_excel(save_path, index=False)
self.show_message("Success", f"File successfully exported to {save_path}")
except Exception as e:
self.show_message("Export Error", f"Failed to export file: {e}")
def show_message(self, title, message):
msg_box = QMessageBox()
msg_box.setWindowTitle(title)
msg_box.setText(message)
msg_box.exec_()
if __name__ == '__main__':
app = QApplication(sys.argv)
ex = AgeingApp()
ex.show()
sys.exit(app.exec_())