bachelorarbeit_importer/cleanup_script.py

235 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import csv
import sys
import json
import logging
import requests
import hashlib
import io
import datetime
import pandas as pd
from rich import progress
from rich.logging import RichHandler
from rich.console import Console
from rich.traceback import install
install(show_locals=True, locals_max_length=150, locals_max_string=300)
global YEARS
YEARS = [2020, 2021, 2022, 2023, 2024]
global INFLATION_RATES
INFLATION_RATES = {
2014: 0.8,
2015: 0.7,
2016: 0.4,
2017: 1.7,
2018: 1.9,
2019: 1.4,
2020: 0.4,
2021: 3.2,
2022: 8.7,
2023: 6.0,
2024: 2.5,
}
class Company:
def __init__(self, data, report, writer) -> None:
self.data = data
self.writer = writer
self.report = report
self.cleaned_data = dict()
self.cleaned_data["bvd_id"] = data["BvD ID Nummer"]
self.cleaned_data["name"] = data["Unternehmensname"]
for year in YEARS:
self.clean_complex(year, "vor")
self.clean_complex(year, "nach")
self.clean_simple(year, "Eigenkapital")
self.clean_simple(year, "Steuern")
def clean_simple(self, year: int, type: str) -> None:
"""Clean simple data. This means tax and capital."""
try:
self.cleaned_data[f"{self.get_simple_suffix(type)}{year}"] = int(self.data[f"{type} EUR {year}"])
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: {self.get_simple_suffix(type)}{year} ValueError")
def get_simple_suffix(self, type: str) -> str:
"""Get suffix for the simple cleaning process"""
return "ek" if type == "Eigenkapital" else "st"
def clean_complex(self, year: int, state: str) -> None:
"""Clean the complex data. This means earnings before/after tax."""
try:
if f"Gewinn/(Verlust) {state} Steuern EUR {year}" in self.data.keys() and self.data[f"Gewinn/(Verlust) {state} Steuern EUR {year}"] != '' and not self.cleaned_data.get(f"gn{year}"):
self.cleaned_data[f"g{self.get_suffix(state)}{year}"] = int(self.data[f"Gewinn/(Verlust) {state} Steuern EUR {year}"])
elif f"Gewinn/Verlust {state} Steuern EUR {year}" in self.data.keys() and self.data[f"Gewinn/Verlust {state} Steuern EUR {year}"] != '' and not self.cleaned_data.get(f"gn{year}"):
self.cleaned_data[f"g{self.get_suffix(state)}{year}"] = int(self.data[f"Gewinn/Verlust {state} Steuern EUR {year}"])
else:
self.report.log.debug(f"{self.cleaned_data['name']}:g{self.get_suffix(state)}{year} empty value")
except ValueError:
self.report.log.debug(f"{self.cleaned_data['name']}: g{self.get_suffix(state)}{year} ValueError")
def get_suffix(self, state: str) -> str:
"""Get suffix for the complex cleaning process."""
return "n" if state == "nach" else "v"
def calculate_all_tax(self) -> None:
"""Calculate tax for all relevant years."""
self.calculate_tax(2020)
self.calculate_tax(2021)
self.calculate_tax(2022)
self.calculate_tax(2023)
self.calculate_tax(2024)
def calculate_tax(self, year: int) -> None:
"""Calculate simple tax from provided values."""
if not self.cleaned_data.get(f"st{year}") and self.cleaned_data.get(f"gv{year}") != None and self.cleaned_data.get(f"gn{year}") != None:
self.cleaned_data[f"st{year}"] = self.cleaned_data.get(f"gv{year}") - self.cleaned_data.get(f"gn{year}")
def reporter(self) -> None:
"""Simple class to report valid and invalid data to the main import class."""
for year in YEARS:
if self.cleaned_data.get(f"st{year}") and self.cleaned_data.get(f"ek{year}"):
self.report.valid_data += 1
else:
self.report.invalid_data +=1
def calculate_data(self) -> None:
"""Calculate data relevant to the project."""
for year in YEARS:
if self.cleaned_data.get(f"st{year}") and self.cleaned_data.get(f"gv{year}") and self.cleaned_data.get(f"gn{year}") and self.cleaned_data.get(f"ek{year}"):
self.cleaned_data[f"nomtax{year}"] = self.cleaned_data.get(f"st{year}") / self.cleaned_data.get(f"gv{year}")
self.cleaned_data[f"realtax{year}"] = (self.cleaned_data.get(f"st{year}") + (INFLATION_RATES[year] * self.cleaned_data.get(f"gv{year}"))) / self.cleaned_data.get(f"gv{year}")
self.cleaned_data[f"realefftax{year}"] = (self.cleaned_data.get(f"st{year}") + (INFLATION_RATES[year] * self.cleaned_data.get(f"gv{year}")) + (INFLATION_RATES[year] * self.cleaned_data.get(f"ek{year}"))) / self.cleaned_data.get(f"gv{year}")
def write(self) -> None:
"""Write the current dataset to CSV"""
with open(self.report.output) as out_csv:
try:
output_reader = pd.read_csv(out_csv)
bvd_id = output_reader["bvd_id"]
if not self.cleaned_data.get("bvd_id") in bvd_id:
self.writer.writerow(self.cleaned_data)
except pd.errors.EmptyDataError:
self.writer.writerow(self.cleaned_data)
class dataimport:
def __init__(self, filename, logfile, output, seek=0) -> None:
self.seek = seek
self.progress = progress.Progress(
*progress.Progress.get_default_columns(),
progress.MofNCompleteColumn(),
progress.TimeElapsedColumn(),
expand=True
)
self.filename = filename
FORMAT = "%(message)s"
self.logfile = open(logfile, 'a')
self.output = output
if self.logfile != "NONE":
self.logconsole = Console(file=self.logfile)
logging.basicConfig(
level="INFO", format=FORMAT, datefmt="[%X]", handlers=[
RichHandler(rich_tracebacks=True, console=self.progress.console,
show_path=False, show_time=False, level="NOTSET"),
RichHandler(rich_tracebacks=True, console=self.logconsole,
show_path=False, level="WARNING")])
else:
logging.basicConfig(
level="INFO", format=FORMAT, datefmt="[%X]", handlers=[
RichHandler(rich_tracebacks=True, console=self.progress.console,
show_path=False, show_time=False, level="NOTSET")])
self.log = logging.getLogger("import")
self.total_rows = self.get_total(self.filename)
self.errors = 0
self.data = {}
self.duplicate_database_id = None
self.task = self.progress.add_task(f"Importing {self.filename.split('/')[-1]}", total=self.get_total(self.filename))
self.progress.update(self.task, advance=self.seek)
self.valid_data = 0
self.invalid_data = 0
self.importer()
def importer(self) -> None:
"""Start the actual import process. Seperates process and setup."""
with self.progress:
with open(self.filename, mode='r', encoding='utf-8-sig', newline='') as csv_file:
with open(self.output, mode='a+', encoding='utf-8-sig', newline='') as output_csv:
csv_reader = csv.DictReader(csv_file, delimiter=',')
fieldnames = self.generate_fieldnames()
output_writer = csv.DictWriter(output_csv, fieldnames=fieldnames)
if self.get_total(self.output) == -1:
self.log.warning(f"WRITING HEADER FOR FILE {self.output}!")
output_writer.writeheader()
rownum = 0
for row in csv_reader:
if rownum < self.seek:
rownum += 1
continue
for key in csv_reader.fieldnames:
self.data[key] = row[key]
self.comp_import(self.data, output_writer)
self.data = {}
rownum += 1
self.progress.update(self.task, advance=1)
self.progress.console.rule()
self.log.info(f"Rows: {self.total_rows}")
self.log.info(f"Valid: {self.valid_data}")
self.log.info(f"Invalid: {self.invalid_data}")
if self.errors == 0:
self.log.info(f"Errors: {self.errors}")
self.progress.console.rule()
elif self.errors > 0:
self.log.error(f"Errors: {self.errors}")
self.progress.console.rule()
else:
self.log.critical("ERROR CALCULATION EXCEPTION")
def get_total(self, file) -> int:
"""Get total data rows in the input file. Corrected for header row."""
return sum(1 for _ in open(file, mode='r')) - 1
def generate_fieldnames(self) -> list:
"""Generate fieldnames for the export CSV."""
fieldnames = ['bvd_id', 'name']
for year in YEARS:
fieldnames.append(f"gv{year}")
fieldnames.append(f"gn{year}")
fieldnames.append(f"st{year}")
fieldnames.append(f"ek{year}")
fieldnames.append(f"nomtax{year}")
fieldnames.append(f"realtax{year}")
fieldnames.append(f"realefftax{year}")
return fieldnames
def comp_import(self, data: dict, writer) -> None:
"""Import the active dataset as a company. Give write directive."""
current = Company(data, report=self, writer=writer)
current.reporter()
current.calculate_all_tax()
current.calculate_data()
current.write()
parser = argparse.ArgumentParser(description='Import data from ORBIS', epilog='Copyright Denkena Consulting')
parser.add_argument('filename', nargs="+")
parser.add_argument('-l', '--logfile', default="log_importer", nargs="?")
parser.add_argument('-o', '--output', default="export_cleaned.csv", nargs="?")
parser.add_argument('-s', '--seek', type=int, default=0)
args = parser.parse_args()
if len(args.filename) > 1 and args.seek > 0:
parser.error("Seek combined with multiple files is a bad idea!")
for filename in args.filename:
dataimport(filename, args.logfile, args.output, args.seek)