diff --git a/cleanup_script.py b/cleanup_script.py index ba7ec47..ba1b0ef 100755 --- a/cleanup_script.py +++ b/cleanup_script.py @@ -14,6 +14,16 @@ from rich.logging import RichHandler from rich.console import Console from rich.traceback import install install(show_locals=True, locals_max_length=150, locals_max_string=300) +global YEARS +YEARS = [2020, 2021, 2022, 2023, 2024] +global INFLATION_RATES +INFLATION_RATES = { + 2020: 0.4, + 2021: 3.2, + 2022: 8.7, + 2023: 6.0, + 2024: 2.5, + } class Company: def __init__(self, data, report, writer): @@ -23,208 +33,78 @@ class Company: self.cleaned_data = dict() self.cleaned_data["bvd_id"] = data["BvD ID Nummer"] self.cleaned_data["name"] = data["Unternehmensname"] + for year in YEARS: + self.clean_complex(year, "vor") + self.clean_complex(year, "nach") + self.clean_simple(year, "Eigenkapital") + self.clean_simple(year, "Steuern") + + + + def clean_simple(self, year: int, type: str): + """Clean simple data. This means tax and capital.""" try: - if "Gewinn/(Verlust) vor Steuern EUR 2020" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2020"]!= '' and not self.cleaned_data.get("gv2020"): - self.cleaned_data["gv2020"] = int(data["Gewinn/(Verlust) vor Steuern EUR 2020"]) - elif "Gewinn/Verlust vor Steuern EUR 2020" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2020"] != '' and not self.cleaned_data.get("gv2020"): - self.cleaned_data["gv2020"] = int(data["Gewinn/Verlust vor Steuern EUR 2020"]) + self.cleaned_data[f"{self.get_simple_suffix(type)}{year}"] = int(self.data[f"{type} EUR {year}"]) + except ValueError: + self.report.log.debug(f"{self.cleaned_data['name']}: {self.get_simple_suffix(type)}{year} ValueError") + + def get_simple_suffix(self, type: str) -> str: + """Get suffix for the simple cleaning process""" + return "ek" if type == "Eigenkapital" else "st" + + + def clean_complex(self, year: int, state: str): + """Clean the complex data. This means earnings before/after tax.""" + try: + if f"Gewinn/(Verlust) {state} Steuern EUR {year}" in self.data.keys() and self.data[f"Gewinn/(Verlust) {state} Steuern EUR {year}"] != '' and not self.cleaned_data.get(f"gn{year}"): + self.cleaned_data[f"g{self.get_suffix(state)}{year}"] = int(self.data[f"Gewinn/(Verlust) {state} Steuern EUR {year}"]) + elif f"Gewinn/Verlust {state} Steuern EUR {year}" in self.data.keys() and self.data[f"Gewinn/Verlust {state} Steuern EUR {year}"] != '' and not self.cleaned_data.get(f"gn{year}"): + self.cleaned_data[f"g{self.get_suffix(state)}{year}"] = int(self.data[f"Gewinn/Verlust {state} Steuern EUR {year}"]) else: - self.report.log.debug(f"{self.cleaned_data['name']}: GV2020 empty value") + self.report.log.debug(f"{self.cleaned_data['name']}:g{self.get_suffix(state)}{year} empty value") except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: GV2020 ValueError") + self.report.log.debug(f"{self.cleaned_data['name']}: g{self.get_suffix(state)}{year} ValueError") - try: - if "Gewinn/(Verlust) vor Steuern EUR 2021" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2021"] != '' and not self.cleaned_data.get("gv2021"): - self.cleaned_data["gv2021"] = int(data["Gewinn/(Verlust) vor Steuern EUR 2021"]) - elif "Gewinn/Verlust vor Steuern EUR 2021" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2021"] != '' and not self.cleaned_data.get("gv2021"): - self.cleaned_data["gv2021"] = int(data["Gewinn/Verlust vor Steuern EUR 2021"]) - else: - self.report.log.debug(f"{self.cleaned_data['name']}: GV2021 empty value") - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: GV2021 ValueError") - - try: - if "Gewinn/(Verlust) vor Steuern EUR 2022" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2022"] != '' and not self.cleaned_data.get("gv2022"): - self.cleaned_data["gv2022"] = int(data["Gewinn/(Verlust) vor Steuern EUR 2022"]) - elif "Gewinn/Verlust vor Steuern EUR 2022" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2022"] != '' and not self.cleaned_data.get("gv2022"): - self.cleaned_data["gv2022"] = int(data["Gewinn/Verlust vor Steuern EUR 2022"]) - else: - self.report.log.debug(f"{self.cleaned_data['name']}: GV2022 empty value") - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: GV2022 ValueError") - - try: - if "Gewinn/(Verlust) vor Steuern EUR 2023" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2023"] != '' and not self.cleaned_data.get("gv2023"): - self.cleaned_data["gv2023"] = int(data["Gewinn/(Verlust) vor Steuern EUR 2023"]) - elif "Gewinn/Verlust vor Steuern EUR 2023" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2023"] != '' and not self.cleaned_data.get("gv2023"): - self.cleaned_data["gv2023"] = int(data["Gewinn/Verlust vor Steuern EUR 2023"]) - else: - self.report.log.debug(f"{self.cleaned_data['name']}: GV2023 empty value") - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: GV2023 ValueError") - - try: - if "Gewinn/(Verlust) vor Steuern EUR 2024" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2024"] != '' and not self.cleaned_data.get("gv2024"): - self.cleaned_data["gv2024"] = int(data["Gewinn/(Verlust) vor Steuern EUR 2024"]) - elif "Gewinn/Verlust vor Steuern EUR 2024" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2024"] != '' and not self.cleaned_data.get("gv2024"): - self.cleaned_data["gv2024"] = int(data["Gewinn/Verlust vor Steuern EUR 2024"]) - else: - self.report.log.debug(f"{self.cleaned_data['name']}: GV2024 empty value") - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: GV2024 ValueError") + def get_suffix(self, state: str) -> str: + """Get suffix for the complex cleaning process.""" + return "n" if state == "nach" else "v" - try: - if "Gewinn/(Verlust) nach Steuern EUR 2020" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2020"] != '' and not self.cleaned_data.get("gn2020"): - self.cleaned_data["gn2020"] = int(data["Gewinn/(Verlust) nach Steuern EUR 2020"]) - elif "Gewinn/Verlust nach Steuern EUR 2020" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2020"] != '' and not self.cleaned_data.get("gn2020"): - self.cleaned_data["gn2020"] = int(data["Gewinn/Verlust nach Steuern EUR 2020"]) - else: - self.report.log.debug(f"{self.cleaned_data['name']}: GN2020 empty value") - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: GN2020 ValueError") + def calculate_all_tax(self): + """Calculate tax for all relevant years.""" + self.calculate_tax(2020) + self.calculate_tax(2021) + self.calculate_tax(2022) + self.calculate_tax(2023) + self.calculate_tax(2024) - try: - if "Gewinn/(Verlust) nach Steuern EUR 2021" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2021"] != '' and not self.cleaned_data.get("gn2021"): - self.cleaned_data["gn2021"] = int(data["Gewinn/(Verlust) nach Steuern EUR 2021"]) - elif "Gewinn/Verlust nach Steuern EUR 2021" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2021"] != '' and not self.cleaned_data.get("gn2021"): - self.cleaned_data["gn2021"] = int(data["Gewinn/Verlust nach Steuern EUR 2021"]) - else: - self.report.log.debug(f"{self.cleaned_data['name']}: GN2021 empty value") - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: GN2021 ValueError") - - try: - if "Gewinn/(Verlust) nach Steuern EUR 2022" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2022"] != '' and not self.cleaned_data.get("gn2022"): - self.cleaned_data["gn2022"] = int(data["Gewinn/(Verlust) nach Steuern EUR 2022"]) - elif "Gewinn/Verlust nach Steuern EUR 2022" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2022"] != '' and not self.cleaned_data.get("gn2022"): - self.cleaned_data["gn2022"] = int(data["Gewinn/Verlust nach Steuern EUR 2022"]) - else: - self.report.log.debug(f"{self.cleaned_data['name']}: GN2022 empty value") - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: GN2022 ValueError") - - try: - if "Gewinn/(Verlust) nach Steuern EUR 2023" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2023"] != '' and not self.cleaned_data.get("gn2023"): - self.cleaned_data["gn2023"] = int(data["Gewinn/(Verlust) nach Steuern EUR 2023"]) - elif "Gewinn/Verlust nach Steuern EUR 2023" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2023"] != '' and not self.cleaned_data.get("gn2023"): - self.cleaned_data["gn2023"] = int(data["Gewinn/Verlust nach Steuern EUR 2023"]) - else: - self.report.log.debug(f"{self.cleaned_data['name']}: GN2023 empty value") - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: GN2023 ValueError") - - try: - if "Gewinn/(Verlust) nach Steuern EUR 2024" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2024"] != '' and not self.cleaned_data.get("gn2024"): - self.cleaned_data["gn2024"] = int(data["Gewinn/(Verlust) nach Steuern EUR 2024"]) - elif "Gewinn/Verlust nach Steuern EUR 2024" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2024"] != '' and not self.cleaned_data.get("gn2024"): - self.cleaned_data["gn2024"] = int(data["Gewinn/Verlust nach Steuern EUR 2024"]) - else: - self.report.log.debug(f"{self.cleaned_data['name']}: GN2024 empty value") - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: GN2024 ValueError") + def calculate_tax(self, year: int): + """Calculate simple tax from provided values.""" + if not self.cleaned_data.get(f"st{year}") and self.cleaned_data.get(f"gv{year}") != None and self.cleaned_data.get(f"gn{year}") != None: + self.cleaned_data[f"st{year}"] = self.cleaned_data.get(f"gv{year}") - self.cleaned_data.get(f"gn{year}") - try: - self.cleaned_data["st2020"] = int(data["Steuern EUR 2020"]) - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: ST2020 ValueError") - try: - self.cleaned_data["st2021"] = int(data["Steuern EUR 2021"]) - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: ST2021 ValueError") - - try: - self.cleaned_data["st2022"] = int(data["Steuern EUR 2022"]) - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: ST2022 ValueError") - - try: - self.cleaned_data["st2023"] = int(data["Steuern EUR 2023"]) - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: ST2023 ValueError") - - try: - self.cleaned_data["st2024"] = int(data["Steuern EUR 2024"]) - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: ST2024 ValueError") - - - try: - self.cleaned_data["ek2020"] = int(data["Eigenkapital EUR 2020"]) - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: EK2020 ValueError") - - try: - self.cleaned_data["ek2021"] = int(data["Eigenkapital EUR 2021"]) - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: EK2021 ValueError") - - try: - self.cleaned_data["ek2022"] = int(data["Eigenkapital EUR 2022"]) - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: EK2022 ValueError") - - try: - self.cleaned_data["ek2023"] = int(data["Eigenkapital EUR 2023"]) - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: EK2023 ValueError") - - try: - self.cleaned_data["ek2024"] = int(data["Eigenkapital EUR 2024"]) - except ValueError: - self.report.log.debug(f"{self.cleaned_data['name']}: EK2024 ValueError") - - def calculate_tax(self): - if not self.cleaned_data.get("st2020") and self.cleaned_data.get("gv2020") != None and self.cleaned_data.get("gn2020") != None: - self.cleaned_data["st2020"] = self.cleaned_data.get("gv2020") - self.cleaned_data.get("gn2020") - if not self.cleaned_data.get("st2021") and self.cleaned_data.get("gv2021") != None and self.cleaned_data.get("gn2021") != None: - self.cleaned_data["st2021"] = self.cleaned_data.get("gv2021") - self.cleaned_data.get("gn2021") - if not self.cleaned_data.get("st2022") and self.cleaned_data.get("gv2022") != None and self.cleaned_data.get("gn2022") != None: - self.cleaned_data["st2022"] = self.cleaned_data.get("gv2022") - self.cleaned_data.get("gn2022") - if not self.cleaned_data.get("st2023") and self.cleaned_data.get("gv2023") != None and self.cleaned_data.get("gn2023") != None: - self.cleaned_data["st2023"] = self.cleaned_data.get("gv2023") - self.cleaned_data.get("gn2023") - if not self.cleaned_data.get("st2024") and self.cleaned_data.get("gv2024") != None and self.cleaned_data.get("gn2024") != None: - self.cleaned_data["st2024"] = self.cleaned_data.get("gv2024") - self.cleaned_data.get("gn2024") - - def validate(self): - #fallback, in case tax wasn't already calculated - self.calculate_tax() - if True: - if self.cleaned_data.get("st2020") and self.cleaned_data.get("ek2020"): - self.report.valid_data += 1 - else: - self.report.invalid_data +=1 - if self.cleaned_data.get("st2021") and self.cleaned_data.get("ek2021"): - self.report.valid_data += 1 - else: - self.report.invalid_data +=1 - if self.cleaned_data.get("st2022") and self.cleaned_data.get("ek2022"): - self.report.valid_data += 1 - else: - self.report.invalid_data +=1 - if self.cleaned_data.get("st2023") and self.cleaned_data.get("ek2023"): - self.report.valid_data += 1 - else: - self.report.invalid_data +=1 - if self.cleaned_data.get("st2024") and self.cleaned_data.get("ek2024"): + def reporter(self): + """Simple class to report valid and invalid data to the main import class.""" + for year in YEARS: + if self.cleaned_data.get(f"st{year}") and self.cleaned_data.get(f"ek{year}"): self.report.valid_data += 1 else: self.report.invalid_data +=1 + def calculate_data(self): - if self.cleaned_data.get("st2020") and self.cleaned_data.get("gv2020") and self.cleaned_data.get("gn2020") and self.cleaned_data.get("ek2020"): - self.cleaned_data["nomtax2020"] = self.cleaned_data.get("st2020") / self.cleaned_data.get("gv2020") - self.cleaned_data["realtax2020"] = (self.cleaned_data.get("st2020") + (0.4 * self.cleaned_data.get("gv2020"))) / self.cleaned_data.get("gv2020") - self.cleaned_data["realefftax2020"] = (self.cleaned_data.get("st2020") + (0.4 * self.cleaned_data.get("gv2020")) + (0.4 * self.cleaned_data.get("ek2020"))) / self.cleaned_data.get("gv2020") - print(self.cleaned_data.get("nomtax2020")) - print(self.cleaned_data.get("realtax2020")) - print(self.cleaned_data.get("realefftax2020")) + """Calculate data relevant to the project.""" + for year in YEARS: + if self.cleaned_data.get(f"st{year}") and self.cleaned_data.get(f"gv{year}") and self.cleaned_data.get(f"gn{year}") and self.cleaned_data.get(f"ek{year}"): + self.cleaned_data[f"nomtax{year}"] = self.cleaned_data.get(f"st{year}") / self.cleaned_data.get(f"gv{year}") + self.cleaned_data[f"realtax{year}"] = (self.cleaned_data.get(f"st{year}") + (INFLATION_RATES[year] * self.cleaned_data.get(f"gv{year}"))) / self.cleaned_data.get(f"gv{year}") + self.cleaned_data[f"realefftax{year}"] = (self.cleaned_data.get(f"st{year}") + (INFLATION_RATES[year] * self.cleaned_data.get(f"gv{year}")) + (INFLATION_RATES[year] * self.cleaned_data.get(f"ek{year}"))) / self.cleaned_data.get(f"gv{year}") def write(self): - """Write the current (validated!) dataset to CSV""" + """Write the current dataset to CSV""" with open(self.report.output) as out_csv: try: output_reader = pd.read_csv(out_csv) @@ -274,11 +154,12 @@ class dataimport: self.importer() def importer(self): + """Start the actual import process. Seperates process and setup.""" with self.progress: with open(self.filename, mode='r', encoding='utf-8-sig', newline='') as csv_file: with open(self.output, mode='a+', encoding='utf-8-sig', newline='') as output_csv: csv_reader = csv.DictReader(csv_file, delimiter=',') - fieldnames = ['bvd_id', 'name', 'gv2020', 'gn2020', 'st2020', 'ek2020', 'gv2021', 'gn2021', 'st2021', 'ek2021', 'gv2022', 'gn2022', 'st2022', 'ek2022', 'gv2023', 'gn2023', 'st2023', 'ek2023', 'gv2024', 'gn2024', 'st2024', 'ek2024'] + fieldnames = self.generate_fieldnames() output_writer = csv.DictWriter(output_csv, fieldnames=fieldnames) if self.get_total(self.output) == -1: self.log.warning(f"WRITING HEADER FOR FILE {self.output}!") @@ -308,11 +189,27 @@ class dataimport: self.log.critical("ERROR CALCULATION EXCEPTION") def get_total(self, file): + """Get total data rows in the input file. Corrected for header row.""" return sum(1 for _ in open(file, mode='r')) - 1 + + def generate_fieldnames(self): + """Generate fieldnames for the export CSV.""" + fieldnames = ['bvd_id', 'name'] + for year in YEARS: + fieldnames.append(f"gv{year}") + fieldnames.append(f"gn{year}") + fieldnames.append(f"st{year}") + fieldnames.append(f"ek{year}") + fieldnames.append(f"nomtax{year}") + fieldnames.append(f"realtax{year}") + fieldnames.append(f"realefftax{year}") + return fieldnames def comp_import(self, data, writer): + """Import the active dataset as a company. Give write directive.""" current = Company(data, report=self, writer=writer) - current.validate() + current.reporter() + current.calculate_all_tax() current.calculate_data() current.write()