#!/usr/bin/env python3 import argparse import csv import secrets import sys import time import json import logging import requests import hashlib import io import psycopg as ps import psycopg_pool as ps_pool from rich import progress from rich.logging import RichHandler from rich.console import Console from rich.traceback import install install(show_locals=True, locals_max_length=150, locals_max_string=300) class Company: def __init__(self, data, report): self.data = data self.bvdid = data["BvD ID Nummer"] self.name = data["Unternehmensname"] try: if "Gewinn/(Verlust) vor Steuern EUR 2020" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2020"]!= '': self.gv2020 = int(data["Gewinn/(Verlust) vor Steuern EUR 2020"]) elif "Gewinn/Verlust vor Steuern EUR 2020" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2020"] != '': self.gv2020 = int(data["Gewinn/Verlust vor Steuern EUR 2020"]) else: self.gv2020 = None except ValueError: self.gv2020 = None try: if "Gewinn/(Verlust) vor Steuern EUR 2021" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2021"] != '': self.gv2021 = int(data["Gewinn/(Verlust) vor Steuern EUR 2021"]) elif "Gewinn/Verlust vor Steuern EUR 2021" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2021"] != '': self.gv2021 = int(data["Gewinn/Verlust vor Steuern EUR 2021"]) else: self.gv2021 = None except ValueError: self.gv2021 = None try: if "Gewinn/(Verlust) vor Steuern EUR 2022" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2022"] != '': self.gv2022 = int(data["Gewinn/(Verlust) vor Steuern EUR 2022"]) elif "Gewinn/Verlust vor Steuern EUR 2022" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2022"] != '': self.gv2022 = int(data["Gewinn/Verlust vor Steuern EUR 2022"]) else: self.gv2022 = None except ValueError: self.gv2022 = None try: if "Gewinn/(Verlust) vor Steuern EUR 2023" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2023"] != '': self.gv2023 = int(data["Gewinn/(Verlust) vor Steuern EUR 2023"]) elif "Gewinn/Verlust vor Steuern EUR 2023" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2023"] != '': self.gv2023 = int(data["Gewinn/Verlust vor Steuern EUR 2023"]) else: self.gv2023 = None except ValueError: self.gv2023 = None try: if "Gewinn/(Verlust) vor Steuern EUR 2024" in data.keys() and data["Gewinn/(Verlust) vor Steuern EUR 2024"] != '': self.gv2024 = int(data["Gewinn/(Verlust) vor Steuern EUR 2024"]) elif "Gewinn/Verlust vor Steuern EUR 2024" in data.keys() and data["Gewinn/Verlust vor Steuern EUR 2024"] != '': self.gv2024 = int(data["Gewinn/Verlust vor Steuern EUR 2024"]) else: self.gv2024 = None except ValueError: self.gv2024 = None try: if "Gewinn/(Verlust) nach Steuern EUR 2020" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2020"] != '': self.gn2020 = int(data["Gewinn/(Verlust) nach Steuern EUR 2020"]) elif "Gewinn/Verlust nach Steuern EUR 2020" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2020"] != '': self.gn2020 = int(data["Gewinn/Verlust nach Steuern EUR 2020"]) else: self.gn2020 = None except ValueError: self.gn2020 = None try: if "Gewinn/(Verlust) nach Steuern EUR 2021" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2021"] != '': self.gn2021 = int(data["Gewinn/(Verlust) nach Steuern EUR 2021"]) elif "Gewinn/Verlust nach Steuern EUR 2021" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2021"] != '': self.gn2021 = int(data["Gewinn/Verlust nach Steuern EUR 2021"]) else: self.gn2021 = None except ValueError: self.gn2021 = None try: if "Gewinn/(Verlust) nach Steuern EUR 2022" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2022"] != '': self.gn2022 = int(data["Gewinn/(Verlust) nach Steuern EUR 2022"]) elif "Gewinn/Verlust nach Steuern EUR 2022" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2022"] != '': self.gn2022 = int(data["Gewinn/Verlust nach Steuern EUR 2022"]) else: self.gn2022 = None except ValueError: self.gn2022 = None try: if "Gewinn/(Verlust) nach Steuern EUR 2023" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2023"] != '': self.gn2023 = int(data["Gewinn/(Verlust) nach Steuern EUR 2023"]) elif "Gewinn/Verlust nach Steuern EUR 2023" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2023"] != '': self.gn2023 = int(data["Gewinn/Verlust nach Steuern EUR 2023"]) else: self.gn2023 = None except ValueError: self.gn2023 = None try: if "Gewinn/(Verlust) nach Steuern EUR 2024" in data.keys() and data["Gewinn/(Verlust) nach Steuern EUR 2024"] != '': self.gn2024 = int(data["Gewinn/(Verlust) nach Steuern EUR 2024"]) elif "Gewinn/Verlust nach Steuern EUR 2024" in data.keys() and data["Gewinn/Verlust nach Steuern EUR 2024"] != '': self.gn2024 = int(data["Gewinn/Verlust nach Steuern EUR 2024"]) else: self.gn2024 = None except ValueError: self.gn2024 = None try: self.st2020 = int(data["Steuern EUR 2020"]) except ValueError: self.st2020 = None try: self.st2021 = int(data["Steuern EUR 2021"]) except ValueError: self.st2021 = None try: self.st2022 = int(data["Steuern EUR 2022"]) except ValueError: self.st2022 = None try: self.st2023 = int(data["Steuern EUR 2023"]) except ValueError: self.st2023 = None try: self.st2024 = int(data["Steuern EUR 2024"]) except ValueError: self.st2024 = None try: self.ek2020 = int(data["Eigenkapital EUR 2020"]) except ValueError: self.ek2020 = None try: self.ek2021 = int(data["Eigenkapital EUR 2021"]) except ValueError: self.ek2021 = None try: self.ek2022 = int(data["Eigenkapital EUR 2022"]) except ValueError: self.ek2022 = None try: self.ek2023 = int(data["Eigenkapital EUR 2023"]) except ValueError: self.ek2023 = None try: self.ek2024 = int(data["Eigenkapital EUR 2024"]) except ValueError: self.ek2024 = None self.report = report def calculate_tax(self): if not self.st2020 and self.gv2020 != None and self.gn2020 != None: self.st2020 = self.gv2020 - self.gn2020 if not self.st2021 and self.gv2021 != None and self.gn2021 != None: self.st2021 = self.gv2021 - self.gn2021 if not self.st2022 and self.gv2022 != None and self.gn2022 != None: self.st2022 = self.gv2022 - self.gn2022 if not self.st2023 and self.gv2023 != None and self.gn2023 != None: self.st2023 = self.gv2023 - self.gn2023 if not self.st2024 and self.gv2024 != None and self.gn2024 != None: self.st2024 = self.gv2024 - self.gn2024 def validate(self): #fallback, in case tax wasn't already calculated self.calculate_tax() #if self.gv2020 and self.gn2020 and self.st2020 and self.ek2020: if self.st2020 and self.ek2020: self.report.valid_data += 1 return True self.report.invalid_data +=1 #if self.gv2021 and self.gn2021 and self.st2021 and self.ek2021: if self.st2021 and self.ek2021: self.report.valid_data += 1 return True self.report.invalid_data +=1 #if self.gv2022 and self.gn2022 and self.st2022 and self.ek2022: if self.st2022 and self.ek2022: self.report.valid_data += 1 return True #if self.gv2024 and self.gn2024 and self.st2024 and self.ek2024: if self.st2024 and self.ek2024: self.report.valid_data += 1 return True self.report.invalid_data +=1 return False class dataimport: def __init__(self, filename, logfile, seek=0): self.seek = seek self.progress = progress.Progress( *progress.Progress.get_default_columns(), progress.MofNCompleteColumn(), progress.TimeElapsedColumn(), expand=True ) self.filename = filename FORMAT = "%(message)s" self.logfile = open(logfile, 'a') if self.logfile != "NONE": self.logconsole = Console(file=self.logfile) logging.basicConfig( level="NOTSET", format=FORMAT, datefmt="[%X]", handlers=[ RichHandler(rich_tracebacks=True, console=self.progress.console, show_path=False, show_time=False, level="NOTSET"), RichHandler(rich_tracebacks=True, console=self.logconsole, show_path=False, level="WARNING")]) else: logging.basicConfig( level="NOTSET", format=FORMAT, datefmt="[%X]", handlers=[ RichHandler(rich_tracebacks=True, console=self.progress.console, show_path=False, show_time=False, level="NOTSET")]) self.log = logging.getLogger("import") self.total_rows = self.get_total() self.errors = 0 self.data = {} self.duplicate_database_id = None self.task = self.progress.add_task(f"Importing {self.filename.split('/')[-1]}", total=self.get_total()) self.progress.update(self.task, advance=self.seek) global AUTHTOKEN AUTHTOKEN = None self.valid_data = 0 self.invalid_data = 0 #with ps_pool.ConnectionPool(conninfo="postgresql:///bachelorarbeit?sslmode=require&port=5432&host=denkena-consulting.com&passfile=/home/user/bachelorarbeit_importer/pgpass&user=bachelorarbeit_w&hostaddr=94.16.116.86", min_size=4, max_size=10, open=True, ) as pool: # with pool.connection() as conn: #self.db_setup() self.importer() #AUTHTOKEN = self.authtoken #self.log.info('AUTHTOKEN SET!') def db_setup(self, conn): with conn.cursor() as cur: cur.execute("CREATE TABLE IF NOT EXISTS test( bvd_id serial PRIMARY KEY)") pass def importer(self): with self.progress: if AUTHTOKEN is not None: self.authtoken = AUTHTOKEN self.log.info('AUTHTOKEN obtained!') else: pass with open(self.filename, mode='r', encoding='utf-8-sig', newline='') as csv_file: csv_reader = csv.DictReader(csv_file, delimiter=',') rownum = 0 for row in csv_reader: if rownum < self.seek: rownum += 1 continue for key in csv_reader.fieldnames: self.data[key] = row[key] self.comp_import(self.data) #if self.check_duplicate(data): # self.patch_record(data) # self.duplicate_database_id = None #else: # self.create_record(data) self.data = {} rownum += 1 self.progress.update(self.task, advance=1) self.progress.console.rule() self.log.info(f"Rows: {self.total_rows}") self.log.info(f"Valid: {self.valid_data}") self.log.info(f"Invalid: {self.invalid_data}") if self.errors == 0: self.log.info(f"Errors: {self.errors}") self.progress.console.rule() elif self.errors > 0: self.log.error(f"Errors: {self.errors}") self.progress.console.rule() else: self.log.critical("ERROR CALCULATION EXCEPTION") def get_total(self): return sum(1 for _ in open(self.filename, mode='r')) - 1 def comp_import(self, data): current = Company(data, report=self) current.validate() parser = argparse.ArgumentParser(description='Import data from ORBIS', epilog='Copyright Denkena Consulting') parser.add_argument('filename', nargs="+") parser.add_argument('-l', '--logfile', default="log_importer", nargs="?") parser.add_argument('-s', '--seek', type=int, default=0) args = parser.parse_args() if len(args.filename) > 1 and args.seek > 0: parser.error("Seek combined with multiple files is a bad idea!") for filename in args.filename: dataimport(filename, args.logfile, args.seek)