bachelorarbeit_importer/cleanup_script.py

219 lines
9.8 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import csv
import logging
from rich import progress
from rich.logging import RichHandler
from rich.console import Console
from rich.traceback import install
install(show_locals=True, locals_max_length=150, locals_max_string=300)
global YEARS
YEARS = [2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024]
global INFLATION_RATES
#Harmonisierter Verbraucherpreisindex des statistischen Bundesamts
INFLATION_RATES = {
2014: 0.008,
2015: 0.007,
2016: 0.004,
2017: 0.017,
2018: 0.019,
2019: 0.014,
2020: 0.004,
2021: 0.032,
2022: 0.087,
2023: 0.060,
2024: 0.025,
}
class Company:
def __init__(self, data, report, writer) -> None:
self.data = data
self.writer = writer
self.report = report
self.cleaned_data = dict()
self.cleaned_data["bvd_id"] = data["BvD ID Nummer"]
self.cleaned_data["name"] = data["Unternehmensname"]
for year in YEARS:
self.clean_complex(year, "vor")
self.clean_complex(year, "nach")
self.clean_simple(year, "Eigenkapital")
self.clean_simple(year, "Steuern")
def clean_simple(self, year: int, type: str) -> None:
"""Clean simple data. This means tax and capital."""
try:
self.cleaned_data[f"{self.get_simple_suffix(type)}{year}"] = int(self.data[f"{type} EUR {year}"])
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: {self.get_simple_suffix(type)}{year} ValueError")
def get_simple_suffix(self, type: str) -> str:
"""Get suffix for the simple cleaning process"""
return "ek" if type == "Eigenkapital" else "st"
def clean_complex(self, year: int, state: str) -> None:
"""Clean the complex data. This means earnings before/after tax."""
try:
if f"Gewinn/(Verlust) {state} Steuern EUR {year}" in self.data.keys() and self.data[f"Gewinn/(Verlust) {state} Steuern EUR {year}"] != '' and not self.cleaned_data.get(f"gn{year}"):
self.cleaned_data[f"g{self.get_suffix(state)}{year}"] = int(self.data[f"Gewinn/(Verlust) {state} Steuern EUR {year}"])
elif f"Gewinn/Verlust {state} Steuern EUR {year}" in self.data.keys() and self.data[f"Gewinn/Verlust {state} Steuern EUR {year}"] != '' and not self.cleaned_data.get(f"gn{year}"):
self.cleaned_data[f"g{self.get_suffix(state)}{year}"] = int(self.data[f"Gewinn/Verlust {state} Steuern EUR {year}"])
else:
self.report.log.debug(f"{self.cleaned_data['name']}:g{self.get_suffix(state)}{year} empty value")
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: g{self.get_suffix(state)}{year} ValueError")
def get_suffix(self, state: str) -> str:
"""Get suffix for the complex cleaning process."""
return "n" if state == "nach" else "v"
def calculate_all_tax(self) -> None:
"""Calculate tax for all relevant years."""
for year in YEARS:
self.calculate_tax(year)
def calculate_tax(self, year: int) -> None:
"""Calculate simple tax from provided values."""
if not self.cleaned_data.get(f"st{year}") and self.cleaned_data.get(f"gv{year}") != None and self.cleaned_data.get(f"gn{year}") != None:
self.cleaned_data[f"st{year}"] = self.cleaned_data.get(f"gv{year}") - self.cleaned_data.get(f"gn{year}")
def reporter(self) -> None:
"""Simple class to report valid and invalid data to the main import class."""
for year in YEARS:
if self.cleaned_data.get(f"st{year}") and self.cleaned_data.get(f"ek{year}"):
self.report.valid_data += 1
else:
self.report.invalid_data +=1
def calculate_data(self) -> None:
"""Calculate data relevant to the project."""
for year in YEARS:
if self.cleaned_data.get(f"st{year}") and self.cleaned_data.get(f"gv{year}") and self.cleaned_data.get(f"gn{year}") and self.cleaned_data.get(f"ek{year}"):
self.cleaned_data[f"nomtax{year}"] = self.cleaned_data.get(f"st{year}") / self.cleaned_data.get(f"gv{year}")
self.cleaned_data[f"realtax{year}"] = (self.cleaned_data.get(f"st{year}") + (INFLATION_RATES[year] * self.cleaned_data.get(f"gv{year}"))) / self.cleaned_data.get(f"gv{year}")
self.cleaned_data[f"realefftax{year}"] = (self.cleaned_data.get(f"st{year}") + (INFLATION_RATES[year] * self.cleaned_data.get(f"gv{year}")) + (INFLATION_RATES[year] * self.cleaned_data.get(f"ek{year}"))) / self.cleaned_data.get(f"gv{year}")
def write(self) -> None:
"""Write the current dataset to CSV"""
self.writer.writerow(self.cleaned_data)
class dataimport:
def __init__(self, filename, logfile, output, seek=0) -> None:
self.seek = seek
self.progress = progress.Progress(
*progress.Progress.get_default_columns(),
progress.MofNCompleteColumn(),
progress.TimeElapsedColumn(),
expand=True
)
self.filename = filename
FORMAT = "%(message)s"
self.logfile = open(logfile, 'a')
self.output = output
if self.logfile != "NONE":
self.logconsole = Console(file=self.logfile)
logging.basicConfig(
level="INFO", format=FORMAT, datefmt="[%X]", handlers=[
RichHandler(rich_tracebacks=True, console=self.progress.console,
show_path=False, show_time=False, level="NOTSET"),
RichHandler(rich_tracebacks=True, console=self.logconsole,
show_path=False, level="WARNING")])
else:
logging.basicConfig(
level="INFO", format=FORMAT, datefmt="[%X]", handlers=[
RichHandler(rich_tracebacks=True, console=self.progress.console,
show_path=False, show_time=False, level="NOTSET")])
self.log = logging.getLogger("import")
self.total_rows = self.get_total(self.filename)
self.errors = 0
self.data = {}
self.duplicate_database_id = None
self.task = self.progress.add_task(f"Importing {self.filename.split('/')[-1]}", total=self.get_total(self.filename))
self.progress.update(self.task, advance=self.seek)
self.valid_data = 0
self.invalid_data = 0
self.importer()
def importer(self) -> None:
"""Start the actual import process. Seperates process and setup."""
with self.progress:
with open(self.filename, mode='r', encoding='utf-8-sig', newline='') as csv_file:
with open(self.output, mode='a+', encoding='utf-8-sig', newline='') as output_csv:
csv_reader = csv.DictReader(csv_file, delimiter=',')
fieldnames = self.generate_fieldnames()
output_writer = csv.DictWriter(output_csv, fieldnames=fieldnames)
if self.get_total(self.output) == -1:
self.log.warning(f"WRITING HEADER FOR FILE {self.output}!")
output_writer.writeheader()
rownum = 0
for row in csv_reader:
if rownum < self.seek:
rownum += 1
continue
for key in csv_reader.fieldnames:
self.data[key] = row[key]
self.comp_import(self.data, output_writer)
self.data = {}
rownum += 1
self.progress.update(self.task, advance=1)
self.progress.console.rule()
self.log.info(f"Rows: {self.total_rows}")
self.log.info(f"Valid: {self.valid_data}")
self.log.info(f"Invalid: {self.invalid_data}")
if self.errors == 0:
self.log.info(f"Errors: {self.errors}")
self.progress.console.rule()
elif self.errors > 0:
self.log.error(f"Errors: {self.errors}")
self.progress.console.rule()
else:
self.log.critical("ERROR CALCULATION EXCEPTION")
def get_total(self, file) -> int:
"""Get total data rows in the input file. Corrected for header row."""
return sum(1 for _ in open(file, mode='r')) - 1
def generate_fieldnames(self) -> list:
"""Generate fieldnames for the export CSV."""
fieldnames = ['bvd_id', 'name']
for year in YEARS:
fieldnames.append(f"gv{year}")
fieldnames.append(f"gn{year}")
fieldnames.append(f"st{year}")
fieldnames.append(f"ek{year}")
fieldnames.append(f"nomtax{year}")
fieldnames.append(f"realtax{year}")
fieldnames.append(f"realefftax{year}")
return fieldnames
def comp_import(self, data: dict, writer) -> None:
"""Import the active dataset as a company. Give write directive."""
current = Company(data, report=self, writer=writer)
current.reporter()
current.calculate_all_tax()
current.calculate_data()
current.write()
parser = argparse.ArgumentParser(description='Import data from ORBIS', epilog='Copyright Denkena Consulting')
parser.add_argument('filename', nargs="+")
parser.add_argument('-l', '--logfile', default="log_importer", nargs="?")
parser.add_argument('-o', '--output', default="export_cleaned.csv", nargs="?")
parser.add_argument('-s', '--seek', type=int, default=0)
args = parser.parse_args()
if len(args.filename) > 1 and args.seek > 0:
parser.error("Seek combined with multiple files is a bad idea!")
for filename in args.filename:
dataimport(filename, args.logfile, args.output, args.seek)