#!/usr/bin/env python3 import os import sys import json import platform import subprocess from datetime import datetime import tempfile import shutil import hashlib import argparse def load_config(config_file="config.json"): """Carica la configurazione dal file JSON.""" try: with open(config_file, 'r') as f: return json.load(f) except FileNotFoundError: raise Exception(f"File di configurazione '{config_file}' non trovato") except json.JSONDecodeError: raise Exception(f"Errore nel parsing del file di configurazione '{config_file}'") def get_date_string(): """Restituisce la data odierna nel formato giorno+mese+anno.""" now = datetime.now() return f"{now.day:02d}{now.month:02d}{now.year}" def get_previous_day_string(): """Restituisce la data di ieri nel formato giorno+mese+anno.""" from datetime import timedelta yesterday = datetime.now() - timedelta(days=1) return f"{yesterday.day:02d}{yesterday.month:02d}{yesterday.year}" def parse_date_argument(date_str): """ Converte una data in formato GG-MM-AAAA nel formato GGMMAAAA. Args: date_str: stringa nel formato "GG-MM-AAAA" (es: "01-01-2026") Returns: stringa nel formato "GGMMAAAA" (es: "01012026") Raises: ValueError: se il formato non è valido """ try: date_obj = datetime.strptime(date_str, "%d-%m-%Y") return f"{date_obj.day:02d}{date_obj.month:02d}{date_obj.year}" except ValueError: raise ValueError(f"Formato data non valido: {date_str}. Usa il formato GG-MM-AAAA (es: 01-01-2026)") def calculate_content_hash(content): """Calcola l'hash MD5 del contenuto.""" return hashlib.md5(content.encode('utf-8')).hexdigest() def get_state_filename(date_string): """Restituisce il nome del file di stato per una data specifica.""" return f".state_{date_string}.json" def load_state(date_string): """ Carica lo stato dell'ultima elaborazione per una data specifica. Returns: dict con 'last_hash' e 'last_check_time', oppure None se non esiste """ state_file = get_state_filename(date_string) if not os.path.exists(state_file): return None try: with open(state_file, 'r') as f: return json.load(f) except: return None def save_state(date_string, content_hash): """Salva lo stato dell'elaborazione corrente.""" state_file = get_state_filename(date_string) state = { 'last_hash': content_hash, 'last_check_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'date': date_string } try: with open(state_file, 'w') as f: json.dump(state, f, indent=4) return True except Exception as e: print(f"ATTENZIONE: Impossibile salvare lo stato: {e}") return False def cleanup_old_state_files(): """Elimina i file di stato di date precedenti.""" today = get_date_string() for filename in os.listdir('.'): if filename.startswith('.state_') and filename.endswith('.json'): # Estrai la data dal nome del file try: state_date = filename.replace('.state_', '').replace('.json', '') if state_date != today: os.remove(filename) print(f"File di stato obsoleto eliminato: {filename}") except: pass def process_single_date(config, date_string, force=False, date_label="", skip_state=False): """ Processa un singolo file per una data specifica. Args: config: configurazione caricata date_string: stringa data in formato GGMMAAAA force: forza elaborazione anche se non modificato date_label: etichetta descrittiva (es: "oggi", "ieri") skip_state: se True, non usa né salva i file di stato (per modalità daily) Returns: (success, skipped) - (True/False, True se saltato perché non modificato) """ input_filename = f"{date_string}.txt" output_filename = f"{date_string}.txt" if date_label: print(f"\n--- Elaborazione file di {date_label}: {input_filename} ---") else: print(f"\n--- Elaborazione file: {input_filename} ---") # Leggi file di input dal PC remoto print(f"Lettura file da {config['input']['host']}...") try: input_content = read_remote_file(config['input'], input_filename) print(f"File letto con successo ({len(input_content)} caratteri)") except FileNotFoundError: print(f"File {input_filename} non trovato (normale se non esiste ancora)") return (True, True) # Non è un errore, semplicemente il file non esiste except Exception as e: error_msg = f"Impossibile leggere il file di input {input_filename}: {str(e)}" print(f"ERRORE: {error_msg}") write_error_file(error_msg, "input") return (False, False) # Calcola hash del contenuto content_hash = calculate_content_hash(input_content) # Verifica se il file è cambiato rispetto all'ultima elaborazione (solo se non skip_state) if not skip_state and not force: previous_state = load_state(date_string) if previous_state and previous_state.get('last_hash') == content_hash: print(f"File non modificato rispetto all'ultimo controllo ({previous_state.get('last_check_time')})") print("Elaborazione saltata.") return (True, True) # Success ma skipped if previous_state: print(f"File modificato rispetto all'ultimo controllo ({previous_state.get('last_check_time')})") else: print("Prima elaborazione per questo file") elif skip_state: print("Modalità DAILY: elaborazione senza controllo stato") else: print("Modalità --force: elaborazione forzata") # Processa il contenuto print("Elaborazione stringhe che iniziano con '01003'...") filtered_lines = process_file(input_content) print(f"Trovate {len(filtered_lines)} stringhe che iniziano con '01003'") # Crea file di output (anche se vuoto) output_content = '\n'.join(filtered_lines) if filtered_lines: output_content += '\n' # Aggiungi newline finale # Verifica se l'output deve essere locale o remoto use_local = config['output'].get('locale', 'no').lower() in ['yes', 'si', 'true', '1'] if use_local: # Output locale print("Modalità output LOCALE") local_path = config['output'].get('local_path', './output') try: output_file_path = write_local_file(config['output'], output_content, output_filename) print(f"File {output_filename} scritto in: {output_file_path}") if len(filtered_lines) == 0: print("NOTA: File output vuoto (nessuna stringa '01003')") # Salva lo stato corrente (solo se non skip_state) if not skip_state: if save_state(date_string, content_hash): print(f"Stato salvato") else: print("Stato non salvato (modalità DAILY)") except Exception as e: error_msg = f"Impossibile scrivere il file di output locale: {str(e)}" print(f"ERRORE: {error_msg}") write_error_file(error_msg, "output") return (False, False) else: # Output remoto (comportamento originale) print("Modalità output REMOTO") # Scrivi su file temporaneo locale temp_output = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') try: temp_output.write(output_content) temp_output.close() # Copia il file sul PC remoto di output print(f"Scrittura file su {config['output']['host']}...") try: copy_via_smb(config['output'], temp_output.name, output_filename) print(f"File {output_filename} creato con successo") if len(filtered_lines) == 0: print("NOTA: File output vuoto (nessuna stringa '01003')") # Salva lo stato corrente (solo se non skip_state) if not skip_state: if save_state(date_string, content_hash): print(f"Stato salvato") else: print("Stato non salvato (modalità DAILY)") except Exception as e: error_msg = f"Impossibile scrivere il file di output: {str(e)}" print(f"ERRORE: {error_msg}") write_error_file(error_msg, "output") return (False, False) finally: # Elimina file temporaneo try: os.unlink(temp_output.name) except: pass print("Elaborazione completata") return (True, False) # Success e non skipped def write_error_file(error_message, error_type="input"): """Scrive un file di errore con il numero del giorno e la descrizione.""" day = datetime.now().day error_filename = f"error{day:02d}.txt" # Usa il percorso errori dalla configurazione (se disponibile) o default try: # Cerca di leggere error_path dalla configurazione if hasattr(write_error_file, 'error_path'): error_dir = write_error_file.error_path else: error_dir = "./errors" # Default # Crea la directory se non esiste os.makedirs(error_dir, exist_ok=True) error_filepath = os.path.join(error_dir, error_filename) with open(error_filepath, 'w') as f: f.write(f"Errore di {error_type}\n") f.write(f"Data: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}\n") f.write(f"Descrizione: {error_message}\n") print(f"File di errore creato: {error_filepath}") except Exception as e: # Fallback: prova a scrivere nella directory corrente try: with open(error_filename, 'w') as f: f.write(f"Errore di {error_type}\n") f.write(f"Data: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}\n") f.write(f"Descrizione: {error_message}\n") print(f"ATTENZIONE: File di errore creato nella directory corrente: {error_filename}") print(f"(Impossibile usare directory configurata: {e})") except Exception as e2: print(f"ERRORE CRITICO: Impossibile creare file di errore: {e2}") def mount_windows_share(config, temp_dir): """Monta una condivisione Windows usando net use (Windows) o mount (Linux).""" is_windows = platform.system() == "Windows" # Costruisci il percorso UNC unc_path = f"\\\\{config['host']}\\{config['share']}" if is_windows: # Windows: usa net use mount_point = unc_path cmd = ["net", "use", unc_path] if config['password']: cmd.append(config['password']) if config['username']: cmd.append(f"/user:{config['domain']}\\{config['username']}" if config['domain'] else f"/user:{config['username']}") else: # Linux: usa mount.cifs mount_point = os.path.join(temp_dir, "mount_" + config['host'].replace('.', '_')) os.makedirs(mount_point, exist_ok=True) creds = f"username={config['username']},password={config['password']}" if config['domain']: creds += f",domain={config['domain']}" cmd = ["mount", "-t", "cifs", f"//{config['host']}/{config['share']}", mount_point, "-o", creds] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) return mount_point except subprocess.CalledProcessError as e: raise Exception(f"Impossibile montare {unc_path}: {e.stderr}") def unmount_share(mount_point): """Smonta una condivisione.""" is_windows = platform.system() == "Windows" try: if is_windows: subprocess.run(["net", "use", mount_point, "/delete"], capture_output=True, check=False) else: subprocess.run(["umount", mount_point], capture_output=True, check=False) except Exception: pass # Ignora errori di smontaggio def copy_via_smb(config, local_file, remote_filename): """Copia un file locale su una condivisione remota usando smbclient.""" is_windows = platform.system() == "Windows" if is_windows: # Su Windows usa net use + copy temp_dir = tempfile.mkdtemp() try: mount_point = mount_windows_share(config, temp_dir) remote_path = os.path.join(mount_point, config['path'], remote_filename) # Crea la directory se non esiste remote_dir = os.path.join(mount_point, config['path']) os.makedirs(remote_dir, exist_ok=True) # Copia il file shutil.copy2(local_file, remote_path) unmount_share(mount_point) finally: shutil.rmtree(temp_dir, ignore_errors=True) else: # Su Linux usa smbclient unc_path = f"//{config['host']}/{config['share']}/{config['path']}/{remote_filename}" cmd = [ "smbclient", f"//{config['host']}/{config['share']}", "-U", f"{config['domain']}\\{config['username']}" if config['domain'] else config['username'], "-c", f"cd {config['path']}; put {local_file} {remote_filename}" ] env = os.environ.copy() if config['password']: env['PASSWD'] = config['password'] cmd.insert(2, config['password']) try: result = subprocess.run(cmd, capture_output=True, text=True, env=env) if result.returncode != 0: raise Exception(f"Errore smbclient: {result.stderr}") except FileNotFoundError: raise Exception("smbclient non trovato. Installare samba-client o cifs-utils") def write_local_file(config, content, filename): """ Scrive il file di output in locale invece che su una condivisione remota. Args: config: configurazione output (deve contenere 'local_path') content: contenuto da scrivere filename: nome del file Returns: path completo del file creato """ local_path = config.get('local_path', './output') # Crea la directory se non esiste os.makedirs(local_path, exist_ok=True) # Percorso completo del file output_file = os.path.join(local_path, filename) # Scrivi il file with open(output_file, 'w') as f: f.write(content) return output_file def read_remote_file(config, filename): """Legge un file da una condivisione remota.""" is_windows = platform.system() == "Windows" temp_dir = tempfile.mkdtemp() try: if is_windows: # Windows: monta e leggi mount_point = mount_windows_share(config, temp_dir) file_path = os.path.join(mount_point, config['path'], filename) if not os.path.exists(file_path): unmount_share(mount_point) raise FileNotFoundError(f"File {filename} non trovato su {config['host']}") with open(file_path, 'r') as f: content = f.read() unmount_share(mount_point) return content else: # Linux: usa smbclient per scaricare local_temp = os.path.join(temp_dir, filename) cmd = [ "smbclient", f"//{config['host']}/{config['share']}", "-U", f"{config['domain']}\\{config['username']}" if config['domain'] else config['username'], "-c", f"cd {config['path']}; get {filename} {local_temp}" ] if config['password']: cmd.insert(2, config['password']) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0 or not os.path.exists(local_temp): raise FileNotFoundError(f"File {filename} non trovato su {config['host']}") with open(local_temp, 'r') as f: content = f.read() return content finally: shutil.rmtree(temp_dir, ignore_errors=True) def process_file(input_content): """Processa il contenuto del file di input e restituisce le righe filtrate.""" filtered_lines = [] for line in input_content.splitlines(): line = line.strip() if line.startswith('01003'): filtered_lines.append(line) return filtered_lines def main(): """Funzione principale.""" # Default per modalità daily DEFAULT_DAILY_HOUR = 3 # Parse argomenti da riga di comando parser = argparse.ArgumentParser( description='Processa file remoti filtrando stringhe che iniziano con 01003', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Esempi: python file_processor.py # Usa modalità da config (hourly/daily) python file_processor.py 01-01-2026 # Processa data specifica python file_processor.py --force # Forza elaborazione """ ) parser.add_argument( 'date', nargs='?', help='Data nel formato GG-MM-AAAA (opzionale, default: oggi)' ) parser.add_argument( '--force', action='store_true', help='Forza l\'elaborazione anche se il file non è cambiato' ) parser.add_argument( '--config', default='config.json', help='Percorso del file di configurazione (default: config.json)' ) parser.add_argument( '--no-previous-day-check', action='store_true', help='Non controllare il file del giorno precedente (solo modalità hourly)' ) args = parser.parse_args() print("=== Avvio elaborazione ===") print(f"Ora corrente: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}") try: # Carica configurazione print("\nCaricamento configurazione...") config = load_config(args.config) # Imposta il percorso errori dalla configurazione error_path = config.get('error_path', './errors') write_error_file.error_path = error_path print(f"Percorso errori: {error_path}") # Leggi modalità di esecuzione execution_mode = config.get('execution_mode', 'hourly').lower() daily_hour = config.get('daily_execution_hour', DEFAULT_DAILY_HOUR) print(f"Modalità esecuzione: {execution_mode.upper()}") if execution_mode == 'daily': print(f"Ora configurata per esecuzione daily: {daily_hour}:00") # Determina la data da processare if args.date: # Data specifica fornita dall'utente - ignora la modalità try: date_string = parse_date_argument(args.date) print(f"Data specificata: {args.date} -> {date_string}") except ValueError as e: print(f"ERRORE: {e}") return 1 print("\nModalità: DATA SPECIFICA (ignora execution_mode)") # Processa solo la data specificata con stato (comportamento standard) success, skipped = process_single_date(config, date_string, args.force, skip_state=False) if success: print("\n=== Elaborazione completata con successo ===") return 0 else: return 1 # Nessuna data specifica: usa la modalità configurata if execution_mode == 'daily': # ============================================================ # MODALITÀ DAILY: Processa SOLO il giorno precedente # ============================================================ print("\n" + "="*60) print("MODALITÀ DAILY: Consolidamento giorno precedente") print("="*60) yesterday_string = get_previous_day_string() # Processa solo ieri, senza file di stato success, skipped = process_single_date( config, yesterday_string, force=True, # In daily forziamo sempre (no controllo modifiche) date_label="IERI (consolidamento finale)", skip_state=True # Non usa file di stato ) if success: if not skipped: print("\n=== Consolidamento giornaliero completato con successo ===") else: print("\n=== File del giorno precedente non trovato ===") return 0 else: print("\n=== Consolidamento giornaliero completato con errori ===") return 1 elif execution_mode == 'hourly': # ============================================================ # MODALITÀ HOURLY: Comportamento originale (ieri + oggi) # ============================================================ today_string = get_date_string() yesterday_string = get_previous_day_string() print(f"Data odierna: {datetime.now().strftime('%d/%m/%Y')}") # Pulizia file di stato obsoleti (solo in modalità automatica) cleanup_old_state_files() overall_success = True # FASE 1: Controlla il file di IERI per consolidare eventuali modifiche tardive if not args.no_previous_day_check: print("\n" + "="*60) print("FASE 1: Consolidamento file del giorno precedente") print("="*60) success, skipped = process_single_date( config, yesterday_string, args.force, "IERI", skip_state=False # Usa file di stato in hourly ) if not success: overall_success = False print("ATTENZIONE: Errore nel consolidamento del file di ieri") elif skipped: print("File di ieri già aggiornato o non esistente") # FASE 2: Processa il file di OGGI print("\n" + "="*60) print("FASE 2: Elaborazione file del giorno corrente") print("="*60) success, skipped = process_single_date( config, today_string, args.force, "OGGI", skip_state=False # Usa file di stato in hourly ) if not success: overall_success = False if overall_success: print("\n=== Elaborazione completata con successo ===") return 0 else: print("\n=== Elaborazione completata con errori ===") return 1 else: # Modalità non riconosciuta print(f"\nERRORE: Modalità '{execution_mode}' non valida. Usa 'daily' o 'hourly'.") return 1 except Exception as e: error_msg = f"Errore generale: {str(e)}" print(f"ERRORE: {error_msg}") write_error_file(error_msg, "input") return 1 if __name__ == "__main__": sys.exit(main())