bachelorarbeit_importer/cleanup_script.py

332 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import csv
import sys
import json
import logging
import requests
import hashlib
import io
import datetime
import pandas as pd
from rich import progress
from rich.logging import RichHandler
from rich.console import Console
from rich.traceback import install
install(show_locals=True, locals_max_length=150, locals_max_string=300)
class Company:
def __init__(self, data, report, writer):
self.data = data
self.writer = writer
self.report = report
self.cleaned_data = dict()
self.cleaned_data["bvd_id"] = data["BvD ID Nummer"]
self.cleaned_data["name"] = data["Unternehmensname"]
try:
if "Gewinn/(Verlust) vor Steuern EUR 2020" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2020"]!= '' and not self.cleaned_data.get("gv2020"):
self.cleaned_data["gv2020"] = int(data["Gewinn/(Verlust) vor Steuern EUR 2020"])
elif "Gewinn/Verlust vor Steuern EUR 2020" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2020"] != '' and not self.cleaned_data.get("gv2020"):
self.cleaned_data["gv2020"] = int(data["Gewinn/Verlust vor Steuern EUR 2020"])
else:
self.report.log.debug(f"{self.cleaned_data['name']}: GV2020 empty value")
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: GV2020 ValueError")
try:
if "Gewinn/(Verlust) vor Steuern EUR 2021" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2021"] != '' and not self.cleaned_data.get("gv2021"):
self.cleaned_data["gv2021"] = int(data["Gewinn/(Verlust) vor Steuern EUR 2021"])
elif "Gewinn/Verlust vor Steuern EUR 2021" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2021"] != '' and not self.cleaned_data.get("gv2021"):
self.cleaned_data["gv2021"] = int(data["Gewinn/Verlust vor Steuern EUR 2021"])
else:
self.report.log.debug(f"{self.cleaned_data['name']}: GV2021 empty value")
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: GV2021 ValueError")
try:
if "Gewinn/(Verlust) vor Steuern EUR 2022" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2022"] != '' and not self.cleaned_data.get("gv2022"):
self.cleaned_data["gv2022"] = int(data["Gewinn/(Verlust) vor Steuern EUR 2022"])
elif "Gewinn/Verlust vor Steuern EUR 2022" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2022"] != '' and not self.cleaned_data.get("gv2022"):
self.cleaned_data["gv2022"] = int(data["Gewinn/Verlust vor Steuern EUR 2022"])
else:
self.report.log.debug(f"{self.cleaned_data['name']}: GV2022 empty value")
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: GV2022 ValueError")
try:
if "Gewinn/(Verlust) vor Steuern EUR 2023" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2023"] != '' and not self.cleaned_data.get("gv2023"):
self.cleaned_data["gv2023"] = int(data["Gewinn/(Verlust) vor Steuern EUR 2023"])
elif "Gewinn/Verlust vor Steuern EUR 2023" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2023"] != '' and not self.cleaned_data.get("gv2023"):
self.cleaned_data["gv2023"] = int(data["Gewinn/Verlust vor Steuern EUR 2023"])
else:
self.report.log.debug(f"{self.cleaned_data['name']}: GV2023 empty value")
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: GV2023 ValueError")
try:
if "Gewinn/(Verlust) vor Steuern EUR 2024" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2024"] != '' and not self.cleaned_data.get("gv2024"):
self.cleaned_data["gv2024"] = int(data["Gewinn/(Verlust) vor Steuern EUR 2024"])
elif "Gewinn/Verlust vor Steuern EUR 2024" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2024"] != '' and not self.cleaned_data.get("gv2024"):
self.cleaned_data["gv2024"] = int(data["Gewinn/Verlust vor Steuern EUR 2024"])
else:
self.report.log.debug(f"{self.cleaned_data['name']}: GV2024 empty value")
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: GV2024 ValueError")
try:
if "Gewinn/(Verlust) nach Steuern EUR 2020" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2020"] != '' and not self.cleaned_data.get("gn2020"):
self.cleaned_data["gn2020"] = int(data["Gewinn/(Verlust) nach Steuern EUR 2020"])
elif "Gewinn/Verlust nach Steuern EUR 2020" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2020"] != '' and not self.cleaned_data.get("gn2020"):
self.cleaned_data["gn2020"] = int(data["Gewinn/Verlust nach Steuern EUR 2020"])
else:
self.report.log.debug(f"{self.cleaned_data['name']}: GN2020 empty value")
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: GN2020 ValueError")
try:
if "Gewinn/(Verlust) nach Steuern EUR 2021" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2021"] != '' and not self.cleaned_data.get("gn2021"):
self.cleaned_data["gn2021"] = int(data["Gewinn/(Verlust) nach Steuern EUR 2021"])
elif "Gewinn/Verlust nach Steuern EUR 2021" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2021"] != '' and not self.cleaned_data.get("gn2021"):
self.cleaned_data["gn2021"] = int(data["Gewinn/Verlust nach Steuern EUR 2021"])
else:
self.report.log.debug(f"{self.cleaned_data['name']}: GN2021 empty value")
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: GN2021 ValueError")
try:
if "Gewinn/(Verlust) nach Steuern EUR 2022" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2022"] != '' and not self.cleaned_data.get("gn2022"):
self.cleaned_data["gn2022"] = int(data["Gewinn/(Verlust) nach Steuern EUR 2022"])
elif "Gewinn/Verlust nach Steuern EUR 2022" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2022"] != '' and not self.cleaned_data.get("gn2022"):
self.cleaned_data["gn2022"] = int(data["Gewinn/Verlust nach Steuern EUR 2022"])
else:
self.report.log.debug(f"{self.cleaned_data['name']}: GN2022 empty value")
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: GN2022 ValueError")
try:
if "Gewinn/(Verlust) nach Steuern EUR 2023" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2023"] != '' and not self.cleaned_data.get("gn2023"):
self.cleaned_data["gn2023"] = int(data["Gewinn/(Verlust) nach Steuern EUR 2023"])
elif "Gewinn/Verlust nach Steuern EUR 2023" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2023"] != '' and not self.cleaned_data.get("gn2023"):
self.cleaned_data["gn2023"] = int(data["Gewinn/Verlust nach Steuern EUR 2023"])
else:
self.report.log.debug(f"{self.cleaned_data['name']}: GN2023 empty value")
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: GN2023 ValueError")
try:
if "Gewinn/(Verlust) nach Steuern EUR 2024" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2024"] != '' and not self.cleaned_data.get("gn2024"):
self.cleaned_data["gn2024"] = int(data["Gewinn/(Verlust) nach Steuern EUR 2024"])
elif "Gewinn/Verlust nach Steuern EUR 2024" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2024"] != '' and not self.cleaned_data.get("gn2024"):
self.cleaned_data["gn2024"] = int(data["Gewinn/Verlust nach Steuern EUR 2024"])
else:
self.report.log.debug(f"{self.cleaned_data['name']}: GN2024 empty value")
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: GN2024 ValueError")
try:
self.cleaned_data["st2020"] = int(data["Steuern EUR 2020"])
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: ST2020 ValueError")
try:
self.cleaned_data["st2021"] = int(data["Steuern EUR 2021"])
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: ST2021 ValueError")
try:
self.cleaned_data["st2022"] = int(data["Steuern EUR 2022"])
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: ST2022 ValueError")
try:
self.cleaned_data["st2023"] = int(data["Steuern EUR 2023"])
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: ST2023 ValueError")
try:
self.cleaned_data["st2024"] = int(data["Steuern EUR 2024"])
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: ST2024 ValueError")
try:
self.cleaned_data["ek2020"] = int(data["Eigenkapital EUR 2020"])
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: EK2020 ValueError")
try:
self.cleaned_data["ek2021"] = int(data["Eigenkapital EUR 2021"])
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: EK2021 ValueError")
try:
self.cleaned_data["ek2022"] = int(data["Eigenkapital EUR 2022"])
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: EK2022 ValueError")
try:
self.cleaned_data["ek2023"] = int(data["Eigenkapital EUR 2023"])
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: EK2023 ValueError")
try:
self.cleaned_data["ek2024"] = int(data["Eigenkapital EUR 2024"])
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: EK2024 ValueError")
def calculate_tax(self):
if not self.cleaned_data.get("st2020") and self.cleaned_data.get("gv2020") != None and self.cleaned_data.get("gn2020") != None:
self.cleaned_data["st2020"] = self.cleaned_data.get("gv2020") - self.cleaned_data.get("gn2020")
if not self.cleaned_data.get("st2021") and self.cleaned_data.get("gv2021") != None and self.cleaned_data.get("gn2021") != None:
self.cleaned_data["st2021"] = self.cleaned_data.get("gv2021") - self.cleaned_data.get("gn2021")
if not self.cleaned_data.get("st2022") and self.cleaned_data.get("gv2022") != None and self.cleaned_data.get("gn2022") != None:
self.cleaned_data["st2022"] = self.cleaned_data.get("gv2022") - self.cleaned_data.get("gn2022")
if not self.cleaned_data.get("st2023") and self.cleaned_data.get("gv2023") != None and self.cleaned_data.get("gn2023") != None:
self.cleaned_data["st2023"] = self.cleaned_data.get("gv2023") - self.cleaned_data.get("gn2023")
if not self.cleaned_data.get("st2024") and self.cleaned_data.get("gv2024") != None and self.cleaned_data.get("gn2024") != None:
self.cleaned_data["st2024"] = self.cleaned_data.get("gv2024") - self.cleaned_data.get("gn2024")
def validate(self):
#fallback, in case tax wasn't already calculated
self.calculate_tax()
if True:
if self.cleaned_data.get("st2020") and self.cleaned_data.get("ek2020"):
self.report.valid_data += 1
else:
self.report.invalid_data +=1
if self.cleaned_data.get("st2021") and self.cleaned_data.get("ek2021"):
self.report.valid_data += 1
else:
self.report.invalid_data +=1
if self.cleaned_data.get("st2022") and self.cleaned_data.get("ek2022"):
self.report.valid_data += 1
else:
self.report.invalid_data +=1
if self.cleaned_data.get("st2023") and self.cleaned_data.get("ek2023"):
self.report.valid_data += 1
else:
self.report.invalid_data +=1
if self.cleaned_data.get("st2024") and self.cleaned_data.get("ek2024"):
self.report.valid_data += 1
else:
self.report.invalid_data +=1
def calculate_data(self):
if self.cleaned_data.get("st2020") and self.cleaned_data.get("gv2020") and self.cleaned_data.get("gn2020") and self.cleaned_data.get("ek2020"):
self.cleaned_data["nomtax2020"] = self.cleaned_data.get("st2020") / self.cleaned_data.get("gv2020")
self.cleaned_data["realtax2020"] = (self.cleaned_data.get("st2020") + (0.4 * self.cleaned_data.get("gv2020"))) / self.cleaned_data.get("gv2020")
self.cleaned_data["realefftax2020"] = (self.cleaned_data.get("st2020") + (0.4 * self.cleaned_data.get("gv2020")) + (0.4 * self.cleaned_data.get("ek2020"))) / self.cleaned_data.get("gv2020")
print(self.cleaned_data.get("nomtax2020"))
print(self.cleaned_data.get("realtax2020"))
print(self.cleaned_data.get("realefftax2020"))
def write(self):
"""Write the current (validated!) dataset to CSV"""
with open(self.report.output) as out_csv:
try:
output_reader = pd.read_csv(out_csv)
bvd_id = output_reader["bvd_id"]
if not self.cleaned_data.get("bvd_id") in bvd_id:
self.writer.writerow(self.cleaned_data)
except pd.errors.EmptyDataError:
self.writer.writerow(self.cleaned_data)
class dataimport:
def __init__(self, filename, logfile, output, seek=0):
self.seek = seek
self.progress = progress.Progress(
*progress.Progress.get_default_columns(),
progress.MofNCompleteColumn(),
progress.TimeElapsedColumn(),
expand=True
)
self.filename = filename
FORMAT = "%(message)s"
self.logfile = open(logfile, 'a')
self.output = output
if self.logfile != "NONE":
self.logconsole = Console(file=self.logfile)
logging.basicConfig(
level="INFO", format=FORMAT, datefmt="[%X]", handlers=[
RichHandler(rich_tracebacks=True, console=self.progress.console,
show_path=False, show_time=False, level="NOTSET"),
RichHandler(rich_tracebacks=True, console=self.logconsole,
show_path=False, level="WARNING")])
else:
logging.basicConfig(
level="INFO", format=FORMAT, datefmt="[%X]", handlers=[
RichHandler(rich_tracebacks=True, console=self.progress.console,
show_path=False, show_time=False, level="NOTSET")])
self.log = logging.getLogger("import")
self.total_rows = self.get_total(self.filename)
self.errors = 0
self.data = {}
self.duplicate_database_id = None
self.task = self.progress.add_task(f"Importing {self.filename.split('/')[-1]}", total=self.get_total(self.filename))
self.progress.update(self.task, advance=self.seek)
self.valid_data = 0
self.invalid_data = 0
self.importer()
def importer(self):
with self.progress:
with open(self.filename, mode='r', encoding='utf-8-sig', newline='') as csv_file:
with open(self.output, mode='a+', encoding='utf-8-sig', newline='') as output_csv:
csv_reader = csv.DictReader(csv_file, delimiter=',')
fieldnames = ['bvd_id', 'name', 'gv2020', 'gn2020', 'st2020', 'ek2020', 'gv2021', 'gn2021', 'st2021', 'ek2021', 'gv2022', 'gn2022', 'st2022', 'ek2022', 'gv2023', 'gn2023', 'st2023', 'ek2023', 'gv2024', 'gn2024', 'st2024', 'ek2024']
output_writer = csv.DictWriter(output_csv, fieldnames=fieldnames)
if self.get_total(self.output) == -1:
self.log.warning(f"WRITING HEADER FOR FILE {self.output}!")
output_writer.writeheader()
rownum = 0
for row in csv_reader:
if rownum < self.seek:
rownum += 1
continue
for key in csv_reader.fieldnames:
self.data[key] = row[key]
self.comp_import(self.data, output_writer)
self.data = {}
rownum += 1
self.progress.update(self.task, advance=1)
self.progress.console.rule()
self.log.info(f"Rows: {self.total_rows}")
self.log.info(f"Valid: {self.valid_data}")
self.log.info(f"Invalid: {self.invalid_data}")
if self.errors == 0:
self.log.info(f"Errors: {self.errors}")
self.progress.console.rule()
elif self.errors > 0:
self.log.error(f"Errors: {self.errors}")
self.progress.console.rule()
else:
self.log.critical("ERROR CALCULATION EXCEPTION")
def get_total(self, file):
return sum(1 for _ in open(file, mode='r')) - 1
def comp_import(self, data, writer):
current = Company(data, report=self, writer=writer)
current.validate()
current.calculate_data()
current.write()
parser = argparse.ArgumentParser(description='Import data from ORBIS', epilog='Copyright Denkena Consulting')
parser.add_argument('filename', nargs="+")
parser.add_argument('-l', '--logfile', default="log_importer", nargs="?")
parser.add_argument('-o', '--output', default="export_cleaned.csv", nargs="?")
parser.add_argument('-s', '--seek', type=int, default=0)
args = parser.parse_args()
if len(args.filename) > 1 and args.seek > 0:
parser.error("Seek combined with multiple files is a bad idea!")
for filename in args.filename:
dataimport(filename, args.logfile, args.output, args.seek)