#!/usr/bin/env python3 import argparse import csv import logging from rich import progress from rich.logging import RichHandler from rich.console import Console from rich.traceback import install install(show_locals=True, locals_max_length=150, locals_max_string=300) global YEARS YEARS = [2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024] global INFLATION_RATES #Harmonisierter Verbraucherpreisindex des statistischen Bundesamts INFLATION_RATES = { 2014: 0.008, 2015: 0.007, 2016: 0.004, 2017: 0.017, 2018: 0.019, 2019: 0.014, 2020: 0.004, 2021: 0.032, 2022: 0.087, 2023: 0.060, 2024: 0.025, } class Company: def __init__(self, data, report, writer) -> None: self.data = data self.writer = writer self.report = report self.cleaned_data = dict() self.cleaned_data["bvd_id"] = data["BvD ID Nummer"] self.cleaned_data["name"] = data["Unternehmensname"] for year in YEARS: self.clean_complex(year, "vor") self.clean_complex(year, "nach") self.clean_simple(year, "Eigenkapital") self.clean_simple(year, "Steuern") def clean_simple(self, year: int, type: str) -> None: """Clean simple data. This means tax and capital.""" try: self.cleaned_data[f"{self.get_simple_suffix(type)}{year}"] = int(self.data[f"{type} EUR {year}"]) except ValueError: self.report.log.debug(f"{self.cleaned_data['name']}: {self.get_simple_suffix(type)}{year} ValueError") def get_simple_suffix(self, type: str) -> str: """Get suffix for the simple cleaning process""" return "ek" if type == "Eigenkapital" else "st" def clean_complex(self, year: int, state: str) -> None: """Clean the complex data. This means earnings before/after tax.""" try: if f"Gewinn/(Verlust) {state} Steuern EUR {year}" in self.data.keys() and self.data[f"Gewinn/(Verlust) {state} Steuern EUR {year}"] != '' and not self.cleaned_data.get(f"gn{year}"): self.cleaned_data[f"g{self.get_suffix(state)}{year}"] = int(self.data[f"Gewinn/(Verlust) {state} Steuern EUR {year}"]) elif f"Gewinn/Verlust {state} Steuern EUR {year}" in self.data.keys() and self.data[f"Gewinn/Verlust {state} Steuern EUR {year}"] != '' and not self.cleaned_data.get(f"gn{year}"): self.cleaned_data[f"g{self.get_suffix(state)}{year}"] = int(self.data[f"Gewinn/Verlust {state} Steuern EUR {year}"]) else: self.report.log.debug(f"{self.cleaned_data['name']}:g{self.get_suffix(state)}{year} empty value") except ValueError: self.report.log.debug(f"{self.cleaned_data['name']}: g{self.get_suffix(state)}{year} ValueError") def get_suffix(self, state: str) -> str: """Get suffix for the complex cleaning process.""" return "n" if state == "nach" else "v" def calculate_all_tax(self) -> None: """Calculate tax for all relevant years.""" for year in YEARS: self.calculate_tax(year) def calculate_tax(self, year: int) -> None: """Calculate simple tax from provided values.""" if not self.cleaned_data.get(f"st{year}") and self.cleaned_data.get(f"gv{year}") != None and self.cleaned_data.get(f"gn{year}") != None: self.cleaned_data[f"st{year}"] = self.cleaned_data.get(f"gv{year}") - self.cleaned_data.get(f"gn{year}") def reporter(self) -> None: """Simple class to report valid and invalid data to the main import class.""" for year in YEARS: if self.cleaned_data.get(f"st{year}") and self.cleaned_data.get(f"ek{year}"): self.report.valid_data += 1 else: self.report.invalid_data +=1 def calculate_data(self) -> None: """Calculate data relevant to the project.""" for year in YEARS: if self.cleaned_data.get(f"st{year}") and self.cleaned_data.get(f"gv{year}") and self.cleaned_data.get(f"gn{year}") and self.cleaned_data.get(f"ek{year}"): self.cleaned_data[f"nomtax{year}"] = self.cleaned_data.get(f"st{year}") / self.cleaned_data.get(f"gv{year}") self.cleaned_data[f"realtax{year}"] = (self.cleaned_data.get(f"st{year}") + (INFLATION_RATES[year] * self.cleaned_data.get(f"gv{year}"))) / self.cleaned_data.get(f"gv{year}") self.cleaned_data[f"realefftax{year}"] = (self.cleaned_data.get(f"st{year}") + (INFLATION_RATES[year] * self.cleaned_data.get(f"gv{year}")) + (INFLATION_RATES[year] * self.cleaned_data.get(f"ek{year}"))) / self.cleaned_data.get(f"gv{year}") def write(self) -> None: """Write the current dataset to CSV""" self.writer.writerow(self.cleaned_data) class dataimport: def __init__(self, filename, logfile, output, seek=0) -> None: self.seek = seek self.progress = progress.Progress( *progress.Progress.get_default_columns(), progress.MofNCompleteColumn(), progress.TimeElapsedColumn(), expand=True ) self.filename = filename FORMAT = "%(message)s" self.logfile = open(logfile, 'a') self.output = output if self.logfile != "NONE": self.logconsole = Console(file=self.logfile) logging.basicConfig( level="INFO", format=FORMAT, datefmt="[%X]", handlers=[ RichHandler(rich_tracebacks=True, console=self.progress.console, show_path=False, show_time=False, level="NOTSET"), RichHandler(rich_tracebacks=True, console=self.logconsole, show_path=False, level="WARNING")]) else: logging.basicConfig( level="INFO", format=FORMAT, datefmt="[%X]", handlers=[ RichHandler(rich_tracebacks=True, console=self.progress.console, show_path=False, show_time=False, level="NOTSET")]) self.log = logging.getLogger("import") self.total_rows = self.get_total(self.filename) self.errors = 0 self.data = {} self.duplicate_database_id = None self.task = self.progress.add_task(f"Importing {self.filename.split('/')[-1]}", total=self.get_total(self.filename)) self.progress.update(self.task, advance=self.seek) self.valid_data = 0 self.invalid_data = 0 self.importer() def importer(self) -> None: """Start the actual import process. Seperates process and setup.""" with self.progress: with open(self.filename, mode='r', encoding='utf-8-sig', newline='') as csv_file: with open(self.output, mode='a+', encoding='utf-8-sig', newline='') as output_csv: csv_reader = csv.DictReader(csv_file, delimiter=',') fieldnames = self.generate_fieldnames() output_writer = csv.DictWriter(output_csv, fieldnames=fieldnames) if self.get_total(self.output) == -1: self.log.warning(f"WRITING HEADER FOR FILE {self.output}!") output_writer.writeheader() rownum = 0 for row in csv_reader: if rownum < self.seek: rownum += 1 continue for key in csv_reader.fieldnames: self.data[key] = row[key] self.comp_import(self.data, output_writer) self.data = {} rownum += 1 self.progress.update(self.task, advance=1) self.progress.console.rule() self.log.info(f"Rows: {self.total_rows}") self.log.info(f"Valid: {self.valid_data}") self.log.info(f"Invalid: {self.invalid_data}") if self.errors == 0: self.log.info(f"Errors: {self.errors}") self.progress.console.rule() elif self.errors > 0: self.log.error(f"Errors: {self.errors}") self.progress.console.rule() else: self.log.critical("ERROR CALCULATION EXCEPTION") def get_total(self, file) -> int: """Get total data rows in the input file. Corrected for header row.""" return sum(1 for _ in open(file, mode='r')) - 1 def generate_fieldnames(self) -> list: """Generate fieldnames for the export CSV.""" fieldnames = ['bvd_id', 'name'] for year in YEARS: fieldnames.append(f"gv{year}") fieldnames.append(f"gn{year}") fieldnames.append(f"st{year}") fieldnames.append(f"ek{year}") fieldnames.append(f"nomtax{year}") fieldnames.append(f"realtax{year}") fieldnames.append(f"realefftax{year}") return fieldnames def comp_import(self, data: dict, writer) -> None: """Import the active dataset as a company. Give write directive.""" current = Company(data, report=self, writer=writer) current.reporter() current.calculate_all_tax() current.calculate_data() current.write() parser = argparse.ArgumentParser(description='Import data from ORBIS', epilog='Copyright Denkena Consulting') parser.add_argument('filename', nargs="+") parser.add_argument('-l', '--logfile', default="log_importer", nargs="?") parser.add_argument('-o', '--output', default="export_cleaned.csv", nargs="?") parser.add_argument('-s', '--seek', type=int, default=0) args = parser.parse_args() if len(args.filename) > 1 and args.seek > 0: parser.error("Seek combined with multiple files is a bad idea!") for filename in args.filename: dataimport(filename, args.logfile, args.output, args.seek)