bachelorarbeit_importer/cleanup_script.py

297 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import csv
import secrets
import sys
import json
import logging
import requests
import hashlib
import io
import datetime
from rich import progress
from rich.logging import RichHandler
from rich.console import Console
from rich.traceback import install
install(show_locals=True, locals_max_length=150, locals_max_string=300)
class Company:
def __init__(self, data, report, out):
self.data = data
self.out = out
self.bvdid = data["BvD ID Nummer"]
self.name = data["Unternehmensname"]
self.gv2020 = None
self.gv2021 = None
self.gv2022 = None
self.gv2023 = None
self.gv2024 = None
self.gn2020 = None
self.gn2021 = None
self.gn2022 = None
self.gn2023 = None
self.gn2024 = None
try:
if "Gewinn/(Verlust) vor Steuern EUR 2020" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2020"]!= '' and not self.gv2020:
self.gv2020 = int(data["Gewinn/(Verlust) vor Steuern EUR 2020"])
elif "Gewinn/Verlust vor Steuern EUR 2020" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2020"] != '' and not self.gv2020:
self.gv2020 = int(data["Gewinn/Verlust vor Steuern EUR 2020"])
else:
self.gv2020 = None
except ValueError:
self.gv2020 = None
try:
if "Gewinn/(Verlust) vor Steuern EUR 2021" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2021"] != '' and not self.gv2021:
self.gv2021 = int(data["Gewinn/(Verlust) vor Steuern EUR 2021"])
elif "Gewinn/Verlust vor Steuern EUR 2021" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2021"] != '' and not self.gv2021:
self.gv2021 = int(data["Gewinn/Verlust vor Steuern EUR 2021"])
else:
self.gv2021 = None
except ValueError:
self.gv2021 = None
try:
if "Gewinn/(Verlust) vor Steuern EUR 2022" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2022"] != '' and not self.gv2022:
self.gv2022 = int(data["Gewinn/(Verlust) vor Steuern EUR 2022"])
elif "Gewinn/Verlust vor Steuern EUR 2022" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2022"] != '' and not self.gv2022:
self.gv2022 = int(data["Gewinn/Verlust vor Steuern EUR 2022"])
else:
self.gv2022 = None
except ValueError:
self.gv2022 = None
try:
if "Gewinn/(Verlust) vor Steuern EUR 2023" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2023"] != '' and not self.gv2023:
self.gv2023 = int(data["Gewinn/(Verlust) vor Steuern EUR 2023"])
elif "Gewinn/Verlust vor Steuern EUR 2023" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2023"] != '' and not self.gv2023:
self.gv2023 = int(data["Gewinn/Verlust vor Steuern EUR 2023"])
else:
self.gv2023 = None
except ValueError:
self.gv2023 = None
try:
if "Gewinn/(Verlust) vor Steuern EUR 2024" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2024"] != '' and not self.gv2024:
self.gv2024 = int(data["Gewinn/(Verlust) vor Steuern EUR 2024"])
elif "Gewinn/Verlust vor Steuern EUR 2024" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2024"] != '' and not self.gv2024:
self.gv2024 = int(data["Gewinn/Verlust vor Steuern EUR 2024"])
else:
self.gv2024 = None
except ValueError:
self.gv2024 = None
try:
if "Gewinn/(Verlust) nach Steuern EUR 2020" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2020"] != '' and not self.gn2020:
self.gn2020 = int(data["Gewinn/(Verlust) nach Steuern EUR 2020"])
elif "Gewinn/Verlust nach Steuern EUR 2020" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2020"] != '' and not self.gn2020:
self.gn2020 = int(data["Gewinn/Verlust nach Steuern EUR 2020"])
else:
self.gn2020 = None
except ValueError:
self.gn2020 = None
try:
if "Gewinn/(Verlust) nach Steuern EUR 2021" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2021"] != '' and not self.gn2021:
self.gn2021 = int(data["Gewinn/(Verlust) nach Steuern EUR 2021"])
elif "Gewinn/Verlust nach Steuern EUR 2021" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2021"] != '' and not self.gn2021:
self.gn2021 = int(data["Gewinn/Verlust nach Steuern EUR 2021"])
else:
self.gn2021 = None
except ValueError:
self.gn2021 = None
try:
if "Gewinn/(Verlust) nach Steuern EUR 2022" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2022"] != '' and not self.gn2022:
self.gn2022 = int(data["Gewinn/(Verlust) nach Steuern EUR 2022"])
elif "Gewinn/Verlust nach Steuern EUR 2022" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2022"] != '' and not self.gn2022:
self.gn2022 = int(data["Gewinn/Verlust nach Steuern EUR 2022"])
else:
self.gn2022 = None
except ValueError:
self.gn2022 = None
try:
if "Gewinn/(Verlust) nach Steuern EUR 2023" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2023"] != '' and not self.gn2023:
self.gn2023 = int(data["Gewinn/(Verlust) nach Steuern EUR 2023"])
elif "Gewinn/Verlust nach Steuern EUR 2023" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2023"] != '' and not self.gn2023:
self.gn2023 = int(data["Gewinn/Verlust nach Steuern EUR 2023"])
else:
self.gn2023 = None
except ValueError:
self.gn2023 = None
try:
if "Gewinn/(Verlust) nach Steuern EUR 2024" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2024"] != '' and not self.gn2024:
self.gn2024 = int(data["Gewinn/(Verlust) nach Steuern EUR 2024"])
elif "Gewinn/Verlust nach Steuern EUR 2024" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2024"] != '' and not self.gn2024:
self.gn2024 = int(data["Gewinn/Verlust nach Steuern EUR 2024"])
else:
self.gn2024 = None
except ValueError:
self.gn2024 = None
try:
self.st2020 = int(data["Steuern EUR 2020"])
except ValueError:
self.st2020 = None
try:
self.st2021 = int(data["Steuern EUR 2021"])
except ValueError:
self.st2021 = None
try:
self.st2022 = int(data["Steuern EUR 2022"])
except ValueError:
self.st2022 = None
try:
self.st2023 = int(data["Steuern EUR 2023"])
except ValueError:
self.st2023 = None
try:
self.st2024 = int(data["Steuern EUR 2024"])
except ValueError:
self.st2024 = None
try:
self.ek2020 = int(data["Eigenkapital EUR 2020"])
except ValueError:
self.ek2020 = None
try:
self.ek2021 = int(data["Eigenkapital EUR 2021"])
except ValueError:
self.ek2021 = None
try:
self.ek2022 = int(data["Eigenkapital EUR 2022"])
except ValueError:
self.ek2022 = None
try:
self.ek2023 = int(data["Eigenkapital EUR 2023"])
except ValueError:
self.ek2023 = None
try:
self.ek2024 = int(data["Eigenkapital EUR 2024"])
except ValueError:
self.ek2024 = None
self.report = report
def calculate_tax(self):
if not self.st2020 and self.gv2020 != None and self.gn2020 != None:
self.st2020 = self.gv2020 - self.gn2020
if not self.st2021 and self.gv2021 != None and self.gn2021 != None:
self.st2021 = self.gv2021 - self.gn2021
if not self.st2022 and self.gv2022 != None and self.gn2022 != None:
self.st2022 = self.gv2022 - self.gn2022
if not self.st2023 and self.gv2023 != None and self.gn2023 != None:
self.st2023 = self.gv2023 - self.gn2023
if not self.st2024 and self.gv2024 != None and self.gn2024 != None:
self.st2024 = self.gv2024 - self.gn2024
def validate(self):
#fallback, in case tax wasn't already calculated
self.calculate_tax()
if True:
if self.st2020 and self.ek2020:
self.report.valid_data += 1
else:
self.report.invalid_data +=1
if self.st2021 and self.ek2021:
self.report.valid_data += 1
else:
self.report.invalid_data +=1
if self.st2022 and self.ek2022:
self.report.valid_data += 1
else:
self.report.invalid_data +=1
if self.st2023 and self.ek2023:
self.report.valid_data += 1
else:
self.report.invalid_data +=1
if self.st2024 and self.ek2024:
self.report.valid_data += 1
else:
self.report.invalid_data +=1
class dataimport:
def __init__(self, filename, logfile, output, seek=0):
self.seek = seek
self.progress = progress.Progress(
*progress.Progress.get_default_columns(),
progress.MofNCompleteColumn(),
progress.TimeElapsedColumn(),
expand=True
)
self.filename = filename
FORMAT = "%(message)s"
self.logfile = open(logfile, 'a')
self.output = output
if self.logfile != "NONE":
self.logconsole = Console(file=self.logfile)
logging.basicConfig(
level="NOTSET", format=FORMAT, datefmt="[%X]", handlers=[
RichHandler(rich_tracebacks=True, console=self.progress.console,
show_path=False, show_time=False, level="NOTSET"),
RichHandler(rich_tracebacks=True, console=self.logconsole,
show_path=False, level="WARNING")])
else:
logging.basicConfig(
level="NOTSET", format=FORMAT, datefmt="[%X]", handlers=[
RichHandler(rich_tracebacks=True, console=self.progress.console,
show_path=False, show_time=False, level="NOTSET")])
self.log = logging.getLogger("import")
self.total_rows = self.get_total(self.filename)
self.errors = 0
self.data = {}
self.duplicate_database_id = None
self.task = self.progress.add_task(f"Importing {self.filename.split('/')[-1]}", total=self.get_total(self.filename))
self.progress.update(self.task, advance=self.seek)
self.valid_data = 0
self.invalid_data = 0
self.importer()
def importer(self):
with self.progress:
with open(self.filename, mode='r', encoding='utf-8-sig', newline='') as csv_file:
with open(self.output, mode='a+', encoding='utf-8-sig', newline='') as output_csv:
csv_reader = csv.DictReader(csv_file, delimiter=',')
out_names = []
output_writer = csv.DictWriter(output_csv, fieldnames=out_names)
self.log.warning(self.get_total(self.output))
if self.get_total(self.output) <= 0:
self.log.warning(f"WRITING HEADER FOR FILE {self.output}!")
output_writer.writeheader()
rownum = 0
for row in csv_reader:
if rownum < self.seek:
rownum += 1
continue
for key in csv_reader.fieldnames:
self.data[key] = row[key]
self.comp_import(self.data, output_writer)
self.data = {}
rownum += 1
self.progress.update(self.task, advance=1)
self.progress.console.rule()
self.log.info(f"Rows: {self.total_rows}")
self.log.info(f"Valid: {self.valid_data}")
self.log.info(f"Invalid: {self.invalid_data}")
if self.errors == 0:
self.log.info(f"Errors: {self.errors}")
self.progress.console.rule()
elif self.errors > 0:
self.log.error(f"Errors: {self.errors}")
self.progress.console.rule()
else:
self.log.critical("ERROR CALCULATION EXCEPTION")
def get_total(self, file):
return sum(1 for _ in open(file, mode='r')) - 1
def comp_import(self, data, out):
current = Company(data, report=self, out=out)
current.validate()
parser = argparse.ArgumentParser(description='Import data from ORBIS', epilog='Copyright Denkena Consulting')
parser.add_argument('filename', nargs="+")
parser.add_argument('-l', '--logfile', default="log_importer", nargs="?")
parser.add_argument('-o', '--output', default="export_cleaned.csv", nargs="?")
parser.add_argument('-s', '--seek', type=int, default=0)
args = parser.parse_args()
if len(args.filename) > 1 and args.seek > 0:
parser.error("Seek combined with multiple files is a bad idea!")
for filename in args.filename:
dataimport(filename, args.logfile, args.output, args.seek)