bachelorarbeit_importer/cleanup_script.py

228 lines
8.3 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import csv
import secrets
import sys
import time
import json
import logging
import requests
import hashlib
import io
import psycopg
from rich import progress
from rich.logging import RichHandler
from rich.console import Console
from rich.traceback import install
install(show_locals=True, locals_max_length=150, locals_max_string=300)
class Company:
def __init__(self, data, report):
self.bvdid = data["BvD-ID"]
try:
self.gv2020 = int(data["Gewinn/Verlust vor Steuern EUR 2020"])
except ValueError:
self.gv2020 = None
try:
self.gv2021 = int(data["Gewinn/Verlust vor Steuern EUR 2021"])
except ValueError:
self.gv2021 = None
try:
self.gv2022 = int(data["Gewinn/Verlust vor Steuern EUR 2022"])
except ValueError:
self.gv2022 = None
try:
self.gv2023 = int(data["Gewinn/Verlust vor Steuern EUR 2023"])
except ValueError:
self.gv2023 = None
try:
self.gv2024 = int(data["Gewinn/Verlust vor Steuern EUR 2024"])
except ValueError:
self.gv2024 = None
try:
self.gn2020 = int(data["Gewinn/Verlust nach Steuern EUR 2020"])
except ValueError:
self.gn2020 = None
try:
self.gn2021 = int(data["Gewinn/Verlust nach Steuern EUR 2021"])
except ValueError:
self.gn2021 = None
try:
self.gn2022 = int(data["Gewinn/Verlust nach Steuern EUR 2022"])
except ValueError:
self.gn2022 = None
try:
self.gn2023 = int(data["Gewinn/Verlust nach Steuern EUR 2023"])
except ValueError:
self.gn2023 = None
try:
self.gn2024 = int(data["Gewinn/Verlust nach Steuern EUR 2024"])
except ValueError:
self.gn2024 = None
try:
self.st2020 = int(data["Steuern EUR 2020"])
except ValueError:
self.st2020 = None
try:
self.st2021 = int(data["Steuern EUR 2021"])
except ValueError:
self.st2021 = None
try:
self.st2022 = int(data["Steuern EUR 2022"])
except ValueError:
self.st2022 = None
try:
self.st2023 = int(data["Steuern EUR 2023"])
except ValueError:
self.st2023 = None
try:
self.st2024 = int(data["Steuern EUR 2024"])
except ValueError:
self.st2024 = None
try:
self.ek2020 = int(data["Eigenkapital EUR 2020"])
except ValueError:
self.ek2020 = None
try:
self.ek2021 = int(data["Eigenkapital EUR 2021"])
except ValueError:
self.ek2021 = None
try:
self.ek2022 = int(data["Eigenkapital EUR 2022"])
except ValueError:
self.ek2022 = None
try:
self.ek2023 = int(data["Eigenkapital EUR 2023"])
except ValueError:
self.ek2023 = None
try:
self.ek2024 = int(data["Eigenkapital EUR 2024"])
except ValueError:
self.ek2024 = None
self.report = report
def calculate_tax(self):
if not self.st2020:
self.st2020 = self.gv2020 - self.gn2020
if not self.st2021:
self.st2021 = self.gv2021 - self.gn2021
if not self.st2022:
self.st2022 = self.gv2022 - self.gn2022
if not self.st2023:
self.st2023 = self.gv2023 - self.gn2023
if not self.st2024:
self.st2024 = self.gv2024 - self.gn2024
def validate(self):
#fallback, in case tax wasn't already calculated
self.calculate_tax()
if self.gv2020 and self.gv2021 and self.gv2022 and self.gv2023 and self.gv2024 and self.gn2020 and self.gn2021 and self.gn2022 and self.gn2023 and self.gn2024 and self.st2020 and self.st2021 and self.st2022 and self.st2023 and self.st2024 and self.ek2020 and self.ek2021 and self.ek2022 and self.ek2023 and self.ek2024:
self.report.valid_data += 1
return True
self.report.invalid_data +=1
return False
class dataimport:
def __init__(self, filename, logfile, seek=0):
self.seek = seek
self.progress = progress.Progress(
*progress.Progress.get_default_columns(),
progress.MofNCompleteColumn(),
progress.TimeElapsedColumn(),
expand=True
)
self.filename = filename
FORMAT = "%(message)s"
self.logfile = open(logfile, 'a')
if self.logfile != "NONE":
self.logconsole = Console(file=self.logfile)
logging.basicConfig(
level="NOTSET", format=FORMAT, datefmt="[%X]", handlers=[
RichHandler(rich_tracebacks=True, console=self.progress.console,
show_path=False, show_time=False, level="NOTSET"),
RichHandler(rich_tracebacks=True, console=self.logconsole,
show_path=False, level="WARNING")])
else:
logging.basicConfig(
level="NOTSET", format=FORMAT, datefmt="[%X]", handlers=[
RichHandler(rich_tracebacks=True, console=self.progress.console,
show_path=False, show_time=False, level="NOTSET")])
self.log = logging.getLogger("import")
self.total_rows = self.get_total()
self.errors = 0
self.data = {}
self.duplicate_database_id = None
self.task = self.progress.add_task(f"Importing {self.filename.split('/')[-1]}", total=self.get_total())
self.progress.update(self.task, advance=self.seek)
global AUTHTOKEN
AUTHTOKEN = None
self.valid_data = 0
self.invalid_data = 0
self.importer()
#AUTHTOKEN = self.authtoken
#self.log.info('AUTHTOKEN SET!')
def importer(self):
with self.progress:
if AUTHTOKEN is not None:
self.authtoken = AUTHTOKEN
self.log.info('AUTHTOKEN obtained!')
else:
pass
with open(self.filename, mode='r', encoding='utf-8-sig', newline='') as csv_file:
csv_reader = csv.DictReader(csv_file, delimiter=',')
rownum = 0
for row in csv_reader:
if rownum < self.seek:
rownum += 1
continue
for key in csv_reader.fieldnames:
if not row[key] == '':
self.data[key] = row[key]
self.comp_import(self.data)
#if self.check_duplicate(data):
# self.patch_record(data)
# self.duplicate_database_id = None
#else:
# self.create_record(data)
self.data = {}
rownum += 1
self.progress.update(self.task, advance=1)
self.progress.console.rule()
self.log.info(f"Rows: {self.total_rows}")
self.log.info(f"Valid: {self.valid_data}")
self.log.info(f"Invalid: {self.invalid_data}")
if self.errors == 0:
self.log.info(f"Errors: {self.errors}")
self.progress.console.rule()
elif self.errors > 0:
self.log.error(f"Errors: {self.errors}")
self.progress.console.rule()
else:
self.log.critical("ERROR CALCULATION EXCEPTION")
def get_total(self):
return sum(1 for _ in open(self.filename, mode='r')) - 1
def comp_import(self, data):
current = Company(data, report=self)
current.all_valid()
parser = argparse.ArgumentParser(description='Import data from ORBIS', epilog='Copyright Denkena Consulting')
parser.add_argument('filename', nargs="+")
parser.add_argument('-l', '--logfile', default="log_importer", nargs="?")
parser.add_argument('-s', '--seek', type=int, default=0)
args = parser.parse_args()
if len(args.filename) > 1 and args.seek > 0:
parser.error("Seek combined with multiple files is a bad idea!")
for filename in args.filename:
dataimport(filename, args.logfile, args.seek)