|
- #!/usr/bin/env python3
-
- import os
- import sys
- import json
- import platform
- import subprocess
- from datetime import datetime
- import tempfile
- import shutil
- import hashlib
- import argparse
-
-
- def load_config(config_file="config.json"):
- """Carica la configurazione dal file JSON."""
- try:
- with open(config_file, 'r') as f:
- return json.load(f)
- except FileNotFoundError:
- raise Exception(f"File di configurazione '{config_file}' non trovato")
- except json.JSONDecodeError:
- raise Exception(f"Errore nel parsing del file di configurazione '{config_file}'")
-
-
- def get_date_string():
- """Restituisce la data odierna nel formato giorno+mese+anno."""
- now = datetime.now()
- return f"{now.day:02d}{now.month:02d}{now.year}"
-
-
- def get_previous_day_string():
- """Restituisce la data di ieri nel formato giorno+mese+anno."""
- from datetime import timedelta
- yesterday = datetime.now() - timedelta(days=1)
- return f"{yesterday.day:02d}{yesterday.month:02d}{yesterday.year}"
-
-
- def parse_date_argument(date_str):
- """
- Converte una data in formato GG-MM-AAAA nel formato GGMMAAAA.
-
- Args:
- date_str: stringa nel formato "GG-MM-AAAA" (es: "01-01-2026")
-
- Returns:
- stringa nel formato "GGMMAAAA" (es: "01012026")
-
- Raises:
- ValueError: se il formato non è valido
- """
- try:
- date_obj = datetime.strptime(date_str, "%d-%m-%Y")
- return f"{date_obj.day:02d}{date_obj.month:02d}{date_obj.year}"
- except ValueError:
- raise ValueError(f"Formato data non valido: {date_str}. Usa il formato GG-MM-AAAA (es: 01-01-2026)")
-
-
- def calculate_content_hash(content):
- """Calcola l'hash MD5 del contenuto."""
- return hashlib.md5(content.encode('utf-8')).hexdigest()
-
-
- def get_state_filename(date_string):
- """Restituisce il nome del file di stato per una data specifica."""
- return f".state_{date_string}.json"
-
-
- def load_state(date_string):
- """
- Carica lo stato dell'ultima elaborazione per una data specifica.
-
- Returns:
- dict con 'last_hash' e 'last_check_time', oppure None se non esiste
- """
- state_file = get_state_filename(date_string)
-
- if not os.path.exists(state_file):
- return None
-
- try:
- with open(state_file, 'r') as f:
- return json.load(f)
- except:
- return None
-
-
- def save_state(date_string, content_hash):
- """Salva lo stato dell'elaborazione corrente."""
- state_file = get_state_filename(date_string)
-
- state = {
- 'last_hash': content_hash,
- 'last_check_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- 'date': date_string
- }
-
- try:
- with open(state_file, 'w') as f:
- json.dump(state, f, indent=4)
- return True
- except Exception as e:
- print(f"ATTENZIONE: Impossibile salvare lo stato: {e}")
- return False
-
-
- def cleanup_old_state_files():
- """Elimina i file di stato di date precedenti."""
- today = get_date_string()
-
- for filename in os.listdir('.'):
- if filename.startswith('.state_') and filename.endswith('.json'):
- # Estrai la data dal nome del file
- try:
- state_date = filename.replace('.state_', '').replace('.json', '')
- if state_date != today:
- os.remove(filename)
- print(f"File di stato obsoleto eliminato: {filename}")
- except:
- pass
-
-
- def process_single_date(config, date_string, force=False, date_label="", skip_state=False):
- """
- Processa un singolo file per una data specifica.
-
- Args:
- config: configurazione caricata
- date_string: stringa data in formato GGMMAAAA
- force: forza elaborazione anche se non modificato
- date_label: etichetta descrittiva (es: "oggi", "ieri")
- skip_state: se True, non usa né salva i file di stato (per modalità daily)
-
- Returns:
- (success, skipped) - (True/False, True se saltato perché non modificato)
- """
- input_filename = f"{date_string}.txt"
- output_filename = f"{date_string}.txt"
-
- if date_label:
- print(f"\n--- Elaborazione file di {date_label}: {input_filename} ---")
- else:
- print(f"\n--- Elaborazione file: {input_filename} ---")
-
- # Leggi file di input dal PC remoto
- print(f"Lettura file da {config['input']['host']}...")
- try:
- input_content = read_remote_file(config['input'], input_filename)
- print(f"File letto con successo ({len(input_content)} caratteri)")
- except FileNotFoundError:
- print(f"File {input_filename} non trovato (normale se non esiste ancora)")
- return (True, True) # Non è un errore, semplicemente il file non esiste
- except Exception as e:
- error_msg = f"Impossibile leggere il file di input {input_filename}: {str(e)}"
- print(f"ERRORE: {error_msg}")
- write_error_file(error_msg, "input")
- return (False, False)
-
- # Calcola hash del contenuto
- content_hash = calculate_content_hash(input_content)
-
- # Verifica se il file è cambiato rispetto all'ultima elaborazione (solo se non skip_state)
- if not skip_state and not force:
- previous_state = load_state(date_string)
-
- if previous_state and previous_state.get('last_hash') == content_hash:
- print(f"File non modificato rispetto all'ultimo controllo ({previous_state.get('last_check_time')})")
- print("Elaborazione saltata.")
- return (True, True) # Success ma skipped
-
- if previous_state:
- print(f"File modificato rispetto all'ultimo controllo ({previous_state.get('last_check_time')})")
- else:
- print("Prima elaborazione per questo file")
- elif skip_state:
- print("Modalità DAILY: elaborazione senza controllo stato")
- else:
- print("Modalità --force: elaborazione forzata")
-
- # Processa il contenuto
- print("Elaborazione stringhe che iniziano con '01003'...")
- filtered_lines = process_file(input_content)
- print(f"Trovate {len(filtered_lines)} stringhe che iniziano con '01003'")
-
- # Crea file di output (anche se vuoto)
- output_content = '\n'.join(filtered_lines)
- if filtered_lines:
- output_content += '\n' # Aggiungi newline finale
-
- # Verifica se l'output deve essere locale o remoto
- use_local = config['output'].get('locale', 'no').lower() in ['yes', 'si', 'true', '1']
-
- if use_local:
- # Output locale
- print("Modalità output LOCALE")
- local_path = config['output'].get('local_path', './output')
-
- try:
- output_file_path = write_local_file(config['output'], output_content, output_filename)
- print(f"File {output_filename} scritto in: {output_file_path}")
-
- if len(filtered_lines) == 0:
- print("NOTA: File output vuoto (nessuna stringa '01003')")
-
- # Salva lo stato corrente (solo se non skip_state)
- if not skip_state:
- if save_state(date_string, content_hash):
- print(f"Stato salvato")
- else:
- print("Stato non salvato (modalità DAILY)")
-
- except Exception as e:
- error_msg = f"Impossibile scrivere il file di output locale: {str(e)}"
- print(f"ERRORE: {error_msg}")
- write_error_file(error_msg, "output")
- return (False, False)
- else:
- # Output remoto (comportamento originale)
- print("Modalità output REMOTO")
-
- # Scrivi su file temporaneo locale
- temp_output = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt')
- try:
- temp_output.write(output_content)
- temp_output.close()
-
- # Copia il file sul PC remoto di output
- print(f"Scrittura file su {config['output']['host']}...")
- try:
- copy_via_smb(config['output'], temp_output.name, output_filename)
- print(f"File {output_filename} creato con successo")
-
- if len(filtered_lines) == 0:
- print("NOTA: File output vuoto (nessuna stringa '01003')")
-
- # Salva lo stato corrente (solo se non skip_state)
- if not skip_state:
- if save_state(date_string, content_hash):
- print(f"Stato salvato")
- else:
- print("Stato non salvato (modalità DAILY)")
-
- except Exception as e:
- error_msg = f"Impossibile scrivere il file di output: {str(e)}"
- print(f"ERRORE: {error_msg}")
- write_error_file(error_msg, "output")
- return (False, False)
-
- finally:
- # Elimina file temporaneo
- try:
- os.unlink(temp_output.name)
- except:
- pass
-
- print("Elaborazione completata")
- return (True, False) # Success e non skipped
-
-
- def write_error_file(error_message, error_type="input"):
- """Scrive un file di errore con il numero del giorno e la descrizione."""
- day = datetime.now().day
- error_filename = f"error{day:02d}.txt"
-
- # Usa il percorso errori dalla configurazione (se disponibile) o default
- try:
- # Cerca di leggere error_path dalla configurazione
- if hasattr(write_error_file, 'error_path'):
- error_dir = write_error_file.error_path
- else:
- error_dir = "./errors" # Default
-
- # Crea la directory se non esiste
- os.makedirs(error_dir, exist_ok=True)
-
- error_filepath = os.path.join(error_dir, error_filename)
-
- with open(error_filepath, 'w') as f:
- f.write(f"Errore di {error_type}\n")
- f.write(f"Data: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}\n")
- f.write(f"Descrizione: {error_message}\n")
- print(f"File di errore creato: {error_filepath}")
- except Exception as e:
- # Fallback: prova a scrivere nella directory corrente
- try:
- with open(error_filename, 'w') as f:
- f.write(f"Errore di {error_type}\n")
- f.write(f"Data: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}\n")
- f.write(f"Descrizione: {error_message}\n")
- print(f"ATTENZIONE: File di errore creato nella directory corrente: {error_filename}")
- print(f"(Impossibile usare directory configurata: {e})")
- except Exception as e2:
- print(f"ERRORE CRITICO: Impossibile creare file di errore: {e2}")
-
-
- def mount_windows_share(config, temp_dir):
- """Monta una condivisione Windows usando net use (Windows) o mount (Linux)."""
- is_windows = platform.system() == "Windows"
-
- # Costruisci il percorso UNC
- unc_path = f"\\\\{config['host']}\\{config['share']}"
-
- if is_windows:
- # Windows: usa net use
- mount_point = unc_path
- cmd = ["net", "use", unc_path]
-
- if config['password']:
- cmd.append(config['password'])
- if config['username']:
- cmd.append(f"/user:{config['domain']}\\{config['username']}" if config['domain'] else f"/user:{config['username']}")
-
- else:
- # Linux: usa mount.cifs
- mount_point = os.path.join(temp_dir, "mount_" + config['host'].replace('.', '_'))
- os.makedirs(mount_point, exist_ok=True)
-
- creds = f"username={config['username']},password={config['password']}"
- if config['domain']:
- creds += f",domain={config['domain']}"
-
- cmd = ["mount", "-t", "cifs", f"//{config['host']}/{config['share']}",
- mount_point, "-o", creds]
-
- try:
- result = subprocess.run(cmd, capture_output=True, text=True, check=True)
- return mount_point
- except subprocess.CalledProcessError as e:
- raise Exception(f"Impossibile montare {unc_path}: {e.stderr}")
-
-
- def unmount_share(mount_point):
- """Smonta una condivisione."""
- is_windows = platform.system() == "Windows"
-
- try:
- if is_windows:
- subprocess.run(["net", "use", mount_point, "/delete"],
- capture_output=True, check=False)
- else:
- subprocess.run(["umount", mount_point],
- capture_output=True, check=False)
- except Exception:
- pass # Ignora errori di smontaggio
-
-
- def copy_via_smb(config, local_file, remote_filename):
- """Copia un file locale su una condivisione remota usando smbclient."""
- is_windows = platform.system() == "Windows"
-
- if is_windows:
- # Su Windows usa net use + copy
- temp_dir = tempfile.mkdtemp()
- try:
- mount_point = mount_windows_share(config, temp_dir)
- remote_path = os.path.join(mount_point, config['path'], remote_filename)
-
- # Crea la directory se non esiste
- remote_dir = os.path.join(mount_point, config['path'])
- os.makedirs(remote_dir, exist_ok=True)
-
- # Copia il file
- shutil.copy2(local_file, remote_path)
-
- unmount_share(mount_point)
- finally:
- shutil.rmtree(temp_dir, ignore_errors=True)
- else:
- # Su Linux usa smbclient
- unc_path = f"//{config['host']}/{config['share']}/{config['path']}/{remote_filename}"
-
- cmd = [
- "smbclient",
- f"//{config['host']}/{config['share']}",
- "-U", f"{config['domain']}\\{config['username']}" if config['domain'] else config['username'],
- "-c", f"cd {config['path']}; put {local_file} {remote_filename}"
- ]
-
- env = os.environ.copy()
- if config['password']:
- env['PASSWD'] = config['password']
- cmd.insert(2, config['password'])
-
- try:
- result = subprocess.run(cmd, capture_output=True, text=True, env=env)
- if result.returncode != 0:
- raise Exception(f"Errore smbclient: {result.stderr}")
- except FileNotFoundError:
- raise Exception("smbclient non trovato. Installare samba-client o cifs-utils")
-
-
- def write_local_file(config, content, filename):
- """
- Scrive il file di output in locale invece che su una condivisione remota.
-
- Args:
- config: configurazione output (deve contenere 'local_path')
- content: contenuto da scrivere
- filename: nome del file
-
- Returns:
- path completo del file creato
- """
- local_path = config.get('local_path', './output')
-
- # Crea la directory se non esiste
- os.makedirs(local_path, exist_ok=True)
-
- # Percorso completo del file
- output_file = os.path.join(local_path, filename)
-
- # Scrivi il file
- with open(output_file, 'w') as f:
- f.write(content)
-
- return output_file
-
-
- def read_remote_file(config, filename):
- """Legge un file da una condivisione remota."""
- is_windows = platform.system() == "Windows"
- temp_dir = tempfile.mkdtemp()
-
- try:
- if is_windows:
- # Windows: monta e leggi
- mount_point = mount_windows_share(config, temp_dir)
- file_path = os.path.join(mount_point, config['path'], filename)
-
- if not os.path.exists(file_path):
- unmount_share(mount_point)
- raise FileNotFoundError(f"File {filename} non trovato su {config['host']}")
-
- with open(file_path, 'r') as f:
- content = f.read()
-
- unmount_share(mount_point)
- return content
-
- else:
- # Linux: usa smbclient per scaricare
- local_temp = os.path.join(temp_dir, filename)
-
- cmd = [
- "smbclient",
- f"//{config['host']}/{config['share']}",
- "-U", f"{config['domain']}\\{config['username']}" if config['domain'] else config['username'],
- "-c", f"cd {config['path']}; get {filename} {local_temp}"
- ]
-
- if config['password']:
- cmd.insert(2, config['password'])
-
- result = subprocess.run(cmd, capture_output=True, text=True)
-
- if result.returncode != 0 or not os.path.exists(local_temp):
- raise FileNotFoundError(f"File {filename} non trovato su {config['host']}")
-
- with open(local_temp, 'r') as f:
- content = f.read()
-
- return content
- finally:
- shutil.rmtree(temp_dir, ignore_errors=True)
-
-
- def process_file(input_content):
- """Processa il contenuto del file di input e restituisce le righe filtrate."""
- filtered_lines = []
-
- for line in input_content.splitlines():
- line = line.strip()
- if line.startswith('01003'):
- filtered_lines.append(line)
-
- return filtered_lines
-
-
- def main():
- """Funzione principale."""
- # Default per modalità daily
- DEFAULT_DAILY_HOUR = 3
-
- # Parse argomenti da riga di comando
- parser = argparse.ArgumentParser(
- description='Processa file remoti filtrando stringhe che iniziano con 01003',
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- Esempi:
- python file_processor.py # Usa modalità da config (hourly/daily)
- python file_processor.py 01-01-2026 # Processa data specifica
- python file_processor.py --force # Forza elaborazione
- """
- )
- parser.add_argument(
- 'date',
- nargs='?',
- help='Data nel formato GG-MM-AAAA (opzionale, default: oggi)'
- )
- parser.add_argument(
- '--force',
- action='store_true',
- help='Forza l\'elaborazione anche se il file non è cambiato'
- )
- parser.add_argument(
- '--config',
- default='config.json',
- help='Percorso del file di configurazione (default: config.json)'
- )
- parser.add_argument(
- '--no-previous-day-check',
- action='store_true',
- help='Non controllare il file del giorno precedente (solo modalità hourly)'
- )
-
- args = parser.parse_args()
-
- print("=== Avvio elaborazione ===")
- print(f"Ora corrente: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
-
- try:
- # Carica configurazione
- print("\nCaricamento configurazione...")
- config = load_config(args.config)
-
- # Imposta il percorso errori dalla configurazione
- error_path = config.get('error_path', './errors')
- write_error_file.error_path = error_path
- print(f"Percorso errori: {error_path}")
-
- # Leggi modalità di esecuzione
- execution_mode = config.get('execution_mode', 'hourly').lower()
- daily_hour = config.get('daily_execution_hour', DEFAULT_DAILY_HOUR)
-
- print(f"Modalità esecuzione: {execution_mode.upper()}")
- if execution_mode == 'daily':
- print(f"Ora configurata per esecuzione daily: {daily_hour}:00")
-
- # Determina la data da processare
- if args.date:
- # Data specifica fornita dall'utente - ignora la modalità
- try:
- date_string = parse_date_argument(args.date)
- print(f"Data specificata: {args.date} -> {date_string}")
- except ValueError as e:
- print(f"ERRORE: {e}")
- return 1
-
- print("\nModalità: DATA SPECIFICA (ignora execution_mode)")
- # Processa solo la data specificata con stato (comportamento standard)
- success, skipped = process_single_date(config, date_string, args.force, skip_state=False)
-
- if success:
- print("\n=== Elaborazione completata con successo ===")
- return 0
- else:
- return 1
-
- # Nessuna data specifica: usa la modalità configurata
- if execution_mode == 'daily':
- # ============================================================
- # MODALITÀ DAILY: Processa SOLO il giorno precedente
- # ============================================================
- print("\n" + "="*60)
- print("MODALITÀ DAILY: Consolidamento giorno precedente")
- print("="*60)
-
- yesterday_string = get_previous_day_string()
-
- # Processa solo ieri, senza file di stato
- success, skipped = process_single_date(
- config,
- yesterday_string,
- force=True, # In daily forziamo sempre (no controllo modifiche)
- date_label="IERI (consolidamento finale)",
- skip_state=True # Non usa file di stato
- )
-
- if success:
- if not skipped:
- print("\n=== Consolidamento giornaliero completato con successo ===")
- else:
- print("\n=== File del giorno precedente non trovato ===")
- return 0
- else:
- print("\n=== Consolidamento giornaliero completato con errori ===")
- return 1
-
- elif execution_mode == 'hourly':
- # ============================================================
- # MODALITÀ HOURLY: Comportamento originale (ieri + oggi)
- # ============================================================
- today_string = get_date_string()
- yesterday_string = get_previous_day_string()
-
- print(f"Data odierna: {datetime.now().strftime('%d/%m/%Y')}")
-
- # Pulizia file di stato obsoleti (solo in modalità automatica)
- cleanup_old_state_files()
-
- overall_success = True
-
- # FASE 1: Controlla il file di IERI per consolidare eventuali modifiche tardive
- if not args.no_previous_day_check:
- print("\n" + "="*60)
- print("FASE 1: Consolidamento file del giorno precedente")
- print("="*60)
-
- success, skipped = process_single_date(
- config,
- yesterday_string,
- args.force,
- "IERI",
- skip_state=False # Usa file di stato in hourly
- )
-
- if not success:
- overall_success = False
- print("ATTENZIONE: Errore nel consolidamento del file di ieri")
- elif skipped:
- print("File di ieri già aggiornato o non esistente")
-
- # FASE 2: Processa il file di OGGI
- print("\n" + "="*60)
- print("FASE 2: Elaborazione file del giorno corrente")
- print("="*60)
-
- success, skipped = process_single_date(
- config,
- today_string,
- args.force,
- "OGGI",
- skip_state=False # Usa file di stato in hourly
- )
-
- if not success:
- overall_success = False
-
- if overall_success:
- print("\n=== Elaborazione completata con successo ===")
- return 0
- else:
- print("\n=== Elaborazione completata con errori ===")
- return 1
- else:
- # Modalità non riconosciuta
- print(f"\nERRORE: Modalità '{execution_mode}' non valida. Usa 'daily' o 'hourly'.")
- return 1
-
- except Exception as e:
- error_msg = f"Errore generale: {str(e)}"
- print(f"ERRORE: {error_msg}")
- write_error_file(error_msg, "input")
- return 1
-
-
- if __name__ == "__main__":
- sys.exit(main())
|