#!/usr/bin/env python3 import argparse import csv import sys import json import logging import requests import hashlib import io import datetime import pandas as pd from rich import progress from rich.logging import RichHandler from rich.console import Console from rich.traceback import install install(show_locals=True, locals_max_length=150, locals_max_string=300) global YEARS YEARS = [2020, 2021, 2022, 2023, 2024] global INFLATION_RATES INFLATION_RATES = { 2020: 0.4, 2021: 3.2, 2022: 8.7, 2023: 6.0, 2024: 2.5, } class Company: def __init__(self, data, report, writer): self.data = data self.writer = writer self.report = report self.cleaned_data = dict() self.cleaned_data["bvd_id"] = data["BvD ID Nummer"] self.cleaned_data["name"] = data["Unternehmensname"] for year in YEARS: self.clean_complex(year, "vor") self.clean_complex(year, "nach") self.clean_simple(year, "Eigenkapital") self.clean_simple(year, "Steuern") def clean_simple(self, year: int, type: str): """Clean simple data. This means tax and capital.""" try: self.cleaned_data[f"{self.get_simple_suffix(type)}{year}"] = int(self.data[f"{type} EUR {year}"]) except ValueError: self.report.log.debug(f"{self.cleaned_data['name']}: {self.get_simple_suffix(type)}{year} ValueError") def get_simple_suffix(self, type: str) -> str: """Get suffix for the simple cleaning process""" return "ek" if type == "Eigenkapital" else "st" def clean_complex(self, year: int, state: str): """Clean the complex data. This means earnings before/after tax.""" try: if f"Gewinn/(Verlust) {state} Steuern EUR {year}" in self.data.keys() and self.data[f"Gewinn/(Verlust) {state} Steuern EUR {year}"] != '' and not self.cleaned_data.get(f"gn{year}"): self.cleaned_data[f"g{self.get_suffix(state)}{year}"] = int(self.data[f"Gewinn/(Verlust) {state} Steuern EUR {year}"]) elif f"Gewinn/Verlust {state} Steuern EUR {year}" in self.data.keys() and self.data[f"Gewinn/Verlust {state} Steuern EUR {year}"] != '' and not self.cleaned_data.get(f"gn{year}"): self.cleaned_data[f"g{self.get_suffix(state)}{year}"] = int(self.data[f"Gewinn/Verlust {state} Steuern EUR {year}"]) else: self.report.log.debug(f"{self.cleaned_data['name']}:g{self.get_suffix(state)}{year} empty value") except ValueError: self.report.log.debug(f"{self.cleaned_data['name']}: g{self.get_suffix(state)}{year} ValueError") def get_suffix(self, state: str) -> str: """Get suffix for the complex cleaning process.""" return "n" if state == "nach" else "v" def calculate_all_tax(self): """Calculate tax for all relevant years.""" self.calculate_tax(2020) self.calculate_tax(2021) self.calculate_tax(2022) self.calculate_tax(2023) self.calculate_tax(2024) def calculate_tax(self, year: int): """Calculate simple tax from provided values.""" if not self.cleaned_data.get(f"st{year}") and self.cleaned_data.get(f"gv{year}") != None and self.cleaned_data.get(f"gn{year}") != None: self.cleaned_data[f"st{year}"] = self.cleaned_data.get(f"gv{year}") - self.cleaned_data.get(f"gn{year}") def reporter(self): """Simple class to report valid and invalid data to the main import class.""" for year in YEARS: if self.cleaned_data.get(f"st{year}") and self.cleaned_data.get(f"ek{year}"): self.report.valid_data += 1 else: self.report.invalid_data +=1 def calculate_data(self): """Calculate data relevant to the project.""" for year in YEARS: if self.cleaned_data.get(f"st{year}") and self.cleaned_data.get(f"gv{year}") and self.cleaned_data.get(f"gn{year}") and self.cleaned_data.get(f"ek{year}"): self.cleaned_data[f"nomtax{year}"] = self.cleaned_data.get(f"st{year}") / self.cleaned_data.get(f"gv{year}") self.cleaned_data[f"realtax{year}"] = (self.cleaned_data.get(f"st{year}") + (INFLATION_RATES[year] * self.cleaned_data.get(f"gv{year}"))) / self.cleaned_data.get(f"gv{year}") self.cleaned_data[f"realefftax{year}"] = (self.cleaned_data.get(f"st{year}") + (INFLATION_RATES[year] * self.cleaned_data.get(f"gv{year}")) + (INFLATION_RATES[year] * self.cleaned_data.get(f"ek{year}"))) / self.cleaned_data.get(f"gv{year}") def write(self): """Write the current dataset to CSV""" with open(self.report.output) as out_csv: try: output_reader = pd.read_csv(out_csv) bvd_id = output_reader["bvd_id"] if not self.cleaned_data.get("bvd_id") in bvd_id: self.writer.writerow(self.cleaned_data) except pd.errors.EmptyDataError: self.writer.writerow(self.cleaned_data) class dataimport: def __init__(self, filename, logfile, output, seek=0): self.seek = seek self.progress = progress.Progress( *progress.Progress.get_default_columns(), progress.MofNCompleteColumn(), progress.TimeElapsedColumn(), expand=True ) self.filename = filename FORMAT = "%(message)s" self.logfile = open(logfile, 'a') self.output = output if self.logfile != "NONE": self.logconsole = Console(file=self.logfile) logging.basicConfig( level="INFO", format=FORMAT, datefmt="[%X]", handlers=[ RichHandler(rich_tracebacks=True, console=self.progress.console, show_path=False, show_time=False, level="NOTSET"), RichHandler(rich_tracebacks=True, console=self.logconsole, show_path=False, level="WARNING")]) else: logging.basicConfig( level="INFO", format=FORMAT, datefmt="[%X]", handlers=[ RichHandler(rich_tracebacks=True, console=self.progress.console, show_path=False, show_time=False, level="NOTSET")]) self.log = logging.getLogger("import") self.total_rows = self.get_total(self.filename) self.errors = 0 self.data = {} self.duplicate_database_id = None self.task = self.progress.add_task(f"Importing {self.filename.split('/')[-1]}", total=self.get_total(self.filename)) self.progress.update(self.task, advance=self.seek) self.valid_data = 0 self.invalid_data = 0 self.importer() def importer(self): """Start the actual import process. Seperates process and setup.""" with self.progress: with open(self.filename, mode='r', encoding='utf-8-sig', newline='') as csv_file: with open(self.output, mode='a+', encoding='utf-8-sig', newline='') as output_csv: csv_reader = csv.DictReader(csv_file, delimiter=',') fieldnames = self.generate_fieldnames() output_writer = csv.DictWriter(output_csv, fieldnames=fieldnames) if self.get_total(self.output) == -1: self.log.warning(f"WRITING HEADER FOR FILE {self.output}!") output_writer.writeheader() rownum = 0 for row in csv_reader: if rownum < self.seek: rownum += 1 continue for key in csv_reader.fieldnames: self.data[key] = row[key] self.comp_import(self.data, output_writer) self.data = {} rownum += 1 self.progress.update(self.task, advance=1) self.progress.console.rule() self.log.info(f"Rows: {self.total_rows}") self.log.info(f"Valid: {self.valid_data}") self.log.info(f"Invalid: {self.invalid_data}") if self.errors == 0: self.log.info(f"Errors: {self.errors}") self.progress.console.rule() elif self.errors > 0: self.log.error(f"Errors: {self.errors}") self.progress.console.rule() else: self.log.critical("ERROR CALCULATION EXCEPTION") def get_total(self, file): """Get total data rows in the input file. Corrected for header row.""" return sum(1 for _ in open(file, mode='r')) - 1 def generate_fieldnames(self): """Generate fieldnames for the export CSV.""" fieldnames = ['bvd_id', 'name'] for year in YEARS: fieldnames.append(f"gv{year}") fieldnames.append(f"gn{year}") fieldnames.append(f"st{year}") fieldnames.append(f"ek{year}") fieldnames.append(f"nomtax{year}") fieldnames.append(f"realtax{year}") fieldnames.append(f"realefftax{year}") return fieldnames def comp_import(self, data, writer): """Import the active dataset as a company. Give write directive.""" current = Company(data, report=self, writer=writer) current.reporter() current.calculate_all_tax() current.calculate_data() current.write() parser = argparse.ArgumentParser(description='Import data from ORBIS', epilog='Copyright Denkena Consulting') parser.add_argument('filename', nargs="+") parser.add_argument('-l', '--logfile', default="log_importer", nargs="?") parser.add_argument('-o', '--output', default="export_cleaned.csv", nargs="?") parser.add_argument('-s', '--seek', type=int, default=0) args = parser.parse_args() if len(args.filename) > 1 and args.seek > 0: parser.error("Seek combined with multiple files is a bad idea!") for filename in args.filename: dataimport(filename, args.logfile, args.output, args.seek)