228 lines
8.3 KiB
Python
Executable File
228 lines
8.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import argparse
|
|
import csv
|
|
import secrets
|
|
import sys
|
|
import time
|
|
import json
|
|
import logging
|
|
import requests
|
|
import hashlib
|
|
import io
|
|
import psycopg
|
|
from rich import progress
|
|
from rich.logging import RichHandler
|
|
from rich.console import Console
|
|
from rich.traceback import install
|
|
install(show_locals=True, locals_max_length=150, locals_max_string=300)
|
|
|
|
class Company:
|
|
def __init__(self, data, report):
|
|
self.bvdid = data["BvD-ID"]
|
|
try:
|
|
self.gv2020 = int(data["Gewinn/Verlust vor Steuern EUR 2020"])
|
|
except ValueError:
|
|
self.gv2020 = None
|
|
try:
|
|
self.gv2021 = int(data["Gewinn/Verlust vor Steuern EUR 2021"])
|
|
except ValueError:
|
|
self.gv2021 = None
|
|
try:
|
|
self.gv2022 = int(data["Gewinn/Verlust vor Steuern EUR 2022"])
|
|
except ValueError:
|
|
self.gv2022 = None
|
|
try:
|
|
self.gv2023 = int(data["Gewinn/Verlust vor Steuern EUR 2023"])
|
|
except ValueError:
|
|
self.gv2023 = None
|
|
try:
|
|
self.gv2024 = int(data["Gewinn/Verlust vor Steuern EUR 2024"])
|
|
except ValueError:
|
|
self.gv2024 = None
|
|
try:
|
|
self.gn2020 = int(data["Gewinn/Verlust nach Steuern EUR 2020"])
|
|
except ValueError:
|
|
self.gn2020 = None
|
|
try:
|
|
self.gn2021 = int(data["Gewinn/Verlust nach Steuern EUR 2021"])
|
|
except ValueError:
|
|
self.gn2021 = None
|
|
try:
|
|
self.gn2022 = int(data["Gewinn/Verlust nach Steuern EUR 2022"])
|
|
except ValueError:
|
|
self.gn2022 = None
|
|
try:
|
|
self.gn2023 = int(data["Gewinn/Verlust nach Steuern EUR 2023"])
|
|
except ValueError:
|
|
self.gn2023 = None
|
|
try:
|
|
self.gn2024 = int(data["Gewinn/Verlust nach Steuern EUR 2024"])
|
|
except ValueError:
|
|
self.gn2024 = None
|
|
try:
|
|
self.st2020 = int(data["Steuern EUR 2020"])
|
|
except ValueError:
|
|
self.st2020 = None
|
|
try:
|
|
self.st2021 = int(data["Steuern EUR 2021"])
|
|
except ValueError:
|
|
self.st2021 = None
|
|
try:
|
|
self.st2022 = int(data["Steuern EUR 2022"])
|
|
except ValueError:
|
|
self.st2022 = None
|
|
try:
|
|
self.st2023 = int(data["Steuern EUR 2023"])
|
|
except ValueError:
|
|
self.st2023 = None
|
|
try:
|
|
self.st2024 = int(data["Steuern EUR 2024"])
|
|
except ValueError:
|
|
self.st2024 = None
|
|
try:
|
|
self.ek2020 = int(data["Eigenkapital EUR 2020"])
|
|
except ValueError:
|
|
self.ek2020 = None
|
|
try:
|
|
self.ek2021 = int(data["Eigenkapital EUR 2021"])
|
|
except ValueError:
|
|
self.ek2021 = None
|
|
try:
|
|
self.ek2022 = int(data["Eigenkapital EUR 2022"])
|
|
except ValueError:
|
|
self.ek2022 = None
|
|
try:
|
|
self.ek2023 = int(data["Eigenkapital EUR 2023"])
|
|
except ValueError:
|
|
self.ek2023 = None
|
|
try:
|
|
self.ek2024 = int(data["Eigenkapital EUR 2024"])
|
|
except ValueError:
|
|
self.ek2024 = None
|
|
self.report = report
|
|
|
|
def calculate_tax(self):
|
|
if not self.st2020:
|
|
self.st2020 = self.gv2020 - self.gn2020
|
|
if not self.st2021:
|
|
self.st2021 = self.gv2021 - self.gn2021
|
|
if not self.st2022:
|
|
self.st2022 = self.gv2022 - self.gn2022
|
|
if not self.st2023:
|
|
self.st2023 = self.gv2023 - self.gn2023
|
|
if not self.st2024:
|
|
self.st2024 = self.gv2024 - self.gn2024
|
|
|
|
def validate(self):
|
|
#fallback, in case tax wasn't already calculated
|
|
self.calculate_tax()
|
|
if self.gv2020 and self.gv2021 and self.gv2022 and self.gv2023 and self.gv2024 and self.gn2020 and self.gn2021 and self.gn2022 and self.gn2023 and self.gn2024 and self.st2020 and self.st2021 and self.st2022 and self.st2023 and self.st2024 and self.ek2020 and self.ek2021 and self.ek2022 and self.ek2023 and self.ek2024:
|
|
self.report.valid_data += 1
|
|
return True
|
|
self.report.invalid_data +=1
|
|
return False
|
|
|
|
|
|
|
|
class dataimport:
|
|
def __init__(self, filename, logfile, seek=0):
|
|
self.seek = seek
|
|
self.progress = progress.Progress(
|
|
*progress.Progress.get_default_columns(),
|
|
progress.MofNCompleteColumn(),
|
|
progress.TimeElapsedColumn(),
|
|
expand=True
|
|
)
|
|
self.filename = filename
|
|
FORMAT = "%(message)s"
|
|
self.logfile = open(logfile, 'a')
|
|
if self.logfile != "NONE":
|
|
self.logconsole = Console(file=self.logfile)
|
|
logging.basicConfig(
|
|
level="NOTSET", format=FORMAT, datefmt="[%X]", handlers=[
|
|
RichHandler(rich_tracebacks=True, console=self.progress.console,
|
|
show_path=False, show_time=False, level="NOTSET"),
|
|
RichHandler(rich_tracebacks=True, console=self.logconsole,
|
|
show_path=False, level="WARNING")])
|
|
else:
|
|
logging.basicConfig(
|
|
level="NOTSET", format=FORMAT, datefmt="[%X]", handlers=[
|
|
RichHandler(rich_tracebacks=True, console=self.progress.console,
|
|
show_path=False, show_time=False, level="NOTSET")])
|
|
|
|
self.log = logging.getLogger("import")
|
|
self.total_rows = self.get_total()
|
|
self.errors = 0
|
|
self.data = {}
|
|
self.duplicate_database_id = None
|
|
self.task = self.progress.add_task(f"Importing {self.filename.split('/')[-1]}", total=self.get_total())
|
|
self.progress.update(self.task, advance=self.seek)
|
|
global AUTHTOKEN
|
|
AUTHTOKEN = None
|
|
self.valid_data = 0
|
|
self.invalid_data = 0
|
|
self.importer()
|
|
#AUTHTOKEN = self.authtoken
|
|
#self.log.info('AUTHTOKEN SET!')
|
|
|
|
def importer(self):
|
|
with self.progress:
|
|
if AUTHTOKEN is not None:
|
|
self.authtoken = AUTHTOKEN
|
|
self.log.info('AUTHTOKEN obtained!')
|
|
else:
|
|
pass
|
|
with open(self.filename, mode='r', encoding='utf-8-sig', newline='') as csv_file:
|
|
csv_reader = csv.DictReader(csv_file, delimiter=',')
|
|
rownum = 0
|
|
for row in csv_reader:
|
|
if rownum < self.seek:
|
|
rownum += 1
|
|
continue
|
|
for key in csv_reader.fieldnames:
|
|
if not row[key] == '':
|
|
self.data[key] = row[key]
|
|
self.comp_import(self.data)
|
|
#if self.check_duplicate(data):
|
|
# self.patch_record(data)
|
|
# self.duplicate_database_id = None
|
|
#else:
|
|
# self.create_record(data)
|
|
self.data = {}
|
|
rownum += 1
|
|
self.progress.update(self.task, advance=1)
|
|
self.progress.console.rule()
|
|
self.log.info(f"Rows: {self.total_rows}")
|
|
self.log.info(f"Valid: {self.valid_data}")
|
|
self.log.info(f"Invalid: {self.invalid_data}")
|
|
if self.errors == 0:
|
|
self.log.info(f"Errors: {self.errors}")
|
|
self.progress.console.rule()
|
|
elif self.errors > 0:
|
|
self.log.error(f"Errors: {self.errors}")
|
|
self.progress.console.rule()
|
|
else:
|
|
self.log.critical("ERROR CALCULATION EXCEPTION")
|
|
|
|
def get_total(self):
|
|
return sum(1 for _ in open(self.filename, mode='r')) - 1
|
|
|
|
def comp_import(self, data):
|
|
current = Company(data, report=self)
|
|
current.all_valid()
|
|
|
|
|
|
|
|
|
|
|
|
parser = argparse.ArgumentParser(description='Import data from ORBIS', epilog='Copyright Denkena Consulting')
|
|
parser.add_argument('filename', nargs="+")
|
|
parser.add_argument('-l', '--logfile', default="log_importer", nargs="?")
|
|
parser.add_argument('-s', '--seek', type=int, default=0)
|
|
args = parser.parse_args()
|
|
if len(args.filename) > 1 and args.seek > 0:
|
|
parser.error("Seek combined with multiple files is a bad idea!")
|
|
for filename in args.filename:
|
|
dataimport(filename, args.logfile, args.seek)
|
|
|