import pymysql from datetime import datetime import bcrypt from config import Config def get_db_connection(): """Crea connessione al database""" return pymysql.connect( host=Config.DB_HOST, user=Config.DB_USER, password=Config.DB_PASSWORD, database=Config.DB_NAME, cursorclass=pymysql.cursors.DictCursor, autocommit=True ) class User: @staticmethod def create(username, email, password, is_admin=False): """Crea un nuovo utente""" password_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) conn = get_db_connection() try: with conn.cursor() as cursor: sql = """INSERT INTO users (username, email, password_hash, is_admin) VALUES (%s, %s, %s, %s)""" cursor.execute(sql, (username, email, password_hash, is_admin)) return cursor.lastrowid finally: conn.close() @staticmethod def get_by_username(username): """Recupera utente per username""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = "SELECT * FROM users WHERE username = %s AND active = TRUE" cursor.execute(sql, (username,)) return cursor.fetchone() finally: conn.close() @staticmethod def get_by_id(user_id): """Recupera utente per ID""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = "SELECT * FROM users WHERE id = %s AND active = TRUE" cursor.execute(sql, (user_id,)) return cursor.fetchone() finally: conn.close() @staticmethod def verify_password(username, password): """Verifica password utente""" user = User.get_by_username(username) if user: stored_hash = user['password_hash'] # Se è una stringa, convertila in bytes if isinstance(stored_hash, str): stored_hash = stored_hash.encode('utf-8') return bcrypt.checkpw(password.encode('utf-8'), stored_hash) return False @staticmethod def update_last_login(user_id): """Aggiorna timestamp ultimo login""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = "UPDATE users SET last_login = NOW() WHERE id = %s" cursor.execute(sql, (user_id,)) finally: conn.close() @staticmethod def get_all_users(): """Recupera tutti gli utenti (solo admin)""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = "SELECT id, username, email, is_admin, created_at, last_login, active FROM users ORDER BY created_at DESC" cursor.execute(sql) return cursor.fetchall() finally: conn.close() @staticmethod def update_active_status(user_id, active): """Attiva/disattiva un utente""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = "UPDATE users SET active = %s WHERE id = %s" cursor.execute(sql, (active, user_id)) return cursor.rowcount > 0 finally: conn.close() @staticmethod def update_password(user_id, new_password): """Aggiorna la password di un utente""" password_hash = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()) conn = get_db_connection() try: with conn.cursor() as cursor: sql = "UPDATE users SET password_hash = %s WHERE id = %s" cursor.execute(sql, (password_hash, user_id)) return cursor.rowcount > 0 finally: conn.close() @staticmethod def update_email(user_id, new_email): """Aggiorna l'email di un utente""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = "UPDATE users SET email = %s WHERE id = %s" cursor.execute(sql, (new_email, user_id)) return cursor.rowcount > 0 finally: conn.close() class UserVM: @staticmethod def assign_vm(user_id, vm_id, vm_name=None, notes=None): """Assegna una VM a un utente""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = """INSERT INTO user_vms (user_id, vm_id, vm_name, notes) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE vm_name = VALUES(vm_name), notes = VALUES(notes)""" cursor.execute(sql, (user_id, vm_id, vm_name, notes)) return cursor.lastrowid finally: conn.close() @staticmethod def get_user_vms(user_id): """Recupera le VM di un utente""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = "SELECT * FROM user_vms WHERE user_id = %s ORDER BY vm_id" cursor.execute(sql, (user_id,)) return cursor.fetchall() finally: conn.close() @staticmethod def remove_vm(user_id, vm_id): """Rimuove l'associazione VM-utente""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = "DELETE FROM user_vms WHERE user_id = %s AND vm_id = %s" cursor.execute(sql, (user_id, vm_id)) return cursor.rowcount > 0 finally: conn.close() @staticmethod def user_has_vm(user_id, vm_id): """Verifica se l'utente ha accesso a questa VM""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = "SELECT COUNT(*) as count FROM user_vms WHERE user_id = %s AND vm_id = %s" cursor.execute(sql, (user_id, vm_id)) result = cursor.fetchone() return result['count'] > 0 finally: conn.close() @staticmethod def get_all_assignments(): """Recupera tutte le assegnazioni VM-utente (solo admin)""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = """SELECT uv.*, u.username FROM user_vms uv JOIN users u ON uv.user_id = u.id ORDER BY u.username, uv.vm_id""" cursor.execute(sql) return cursor.fetchall() finally: conn.close() class Subdomain: @staticmethod def create(user_id, subdomain, ip_address, vm_id=None, proxied=True, cloudflare_record_id=None): """Crea un nuovo sottodominio""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = """INSERT INTO subdomains (user_id, subdomain, ip_address, vm_id, proxied, cloudflare_record_id) VALUES (%s, %s, %s, %s, %s, %s)""" cursor.execute(sql, (user_id, subdomain, ip_address, vm_id, proxied, cloudflare_record_id)) return cursor.lastrowid finally: conn.close() @staticmethod def get_user_subdomains(user_id): """Recupera tutti i sottodomini di un utente""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = "SELECT * FROM subdomains WHERE user_id = %s ORDER BY created_at DESC" cursor.execute(sql, (user_id,)) return cursor.fetchall() finally: conn.close() @staticmethod def get_by_id(subdomain_id): """Recupera sottodominio per ID""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = "SELECT * FROM subdomains WHERE id = %s" cursor.execute(sql, (subdomain_id,)) return cursor.fetchone() finally: conn.close() @staticmethod def delete(subdomain_id, user_id): """Elimina un sottodominio (con controllo ownership)""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = "DELETE FROM subdomains WHERE id = %s AND user_id = %s" cursor.execute(sql, (subdomain_id, user_id)) return cursor.rowcount > 0 finally: conn.close() @staticmethod def subdomain_exists(subdomain): """Verifica se un sottodominio esiste già""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = "SELECT COUNT(*) as count FROM subdomains WHERE subdomain = %s" cursor.execute(sql, (subdomain,)) result = cursor.fetchone() return result['count'] > 0 finally: conn.close() @staticmethod def update_ip(subdomain_id, user_id, new_ip): """Aggiorna IP di un sottodominio""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = "UPDATE subdomains SET ip_address = %s WHERE id = %s AND user_id = %s" cursor.execute(sql, (new_ip, subdomain_id, user_id)) return cursor.rowcount > 0 finally: conn.close() class IPAssignment: @staticmethod def create(vm_id, ip_address, assigned_by, mac_address=None, notes=None): """Crea una nuova assegnazione IP""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = """INSERT INTO ip_assignments (vm_id, ip_address, mac_address, notes, assigned_by) VALUES (%s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE ip_address = VALUES(ip_address), mac_address = VALUES(mac_address), notes = VALUES(notes), assigned_by = VALUES(assigned_by), updated_at = CURRENT_TIMESTAMP""" cursor.execute(sql, (vm_id, ip_address, mac_address, notes, assigned_by)) return cursor.lastrowid finally: conn.close() @staticmethod def get_by_vm_id(vm_id): """Recupera assegnazione IP per VM ID""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = "SELECT * FROM ip_assignments WHERE vm_id = %s" cursor.execute(sql, (vm_id,)) return cursor.fetchone() finally: conn.close() @staticmethod def get_all(): """Recupera tutte le assegnazioni IP""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = """SELECT ia.*, u.username as assigned_by_username FROM ip_assignments ia LEFT JOIN users u ON ia.assigned_by = u.id ORDER BY ia.updated_at DESC""" cursor.execute(sql) return cursor.fetchall() finally: conn.close() @staticmethod def delete(vm_id): """Elimina un'assegnazione IP""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = "DELETE FROM ip_assignments WHERE vm_id = %s" cursor.execute(sql, (vm_id,)) return cursor.rowcount > 0 finally: conn.close() @staticmethod def check_ip_conflict(ip_address, exclude_vm_id=None): """Verifica se un IP è già assegnato""" conn = get_db_connection() try: with conn.cursor() as cursor: if exclude_vm_id: sql = "SELECT vm_id FROM ip_assignments WHERE ip_address = %s AND vm_id != %s" cursor.execute(sql, (ip_address, exclude_vm_id)) else: sql = "SELECT vm_id FROM ip_assignments WHERE ip_address = %s" cursor.execute(sql, (ip_address,)) result = cursor.fetchone() return result['vm_id'] if result else None finally: conn.close() class ActionLog: @staticmethod def log_action(user_id, vm_id, action_type, status, error_message=None, ip_address=None): """Registra un'azione dell'utente""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = """INSERT INTO action_logs (user_id, vm_id, action_type, status, error_message, ip_address) VALUES (%s, %s, %s, %s, %s, %s)""" cursor.execute(sql, (user_id, vm_id, action_type, status, error_message, ip_address)) return cursor.lastrowid finally: conn.close() @staticmethod def get_user_logs(user_id, limit=50): """Recupera i log di un utente""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = """SELECT * FROM action_logs WHERE user_id = %s ORDER BY created_at DESC LIMIT %s""" cursor.execute(sql, (user_id, limit)) return cursor.fetchall() finally: conn.close() @staticmethod def get_all_logs(limit=100): """Recupera tutti i log (solo admin)""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = """SELECT al.*, u.username FROM action_logs al JOIN users u ON al.user_id = u.id ORDER BY al.created_at DESC LIMIT %s""" cursor.execute(sql, (limit,)) return cursor.fetchall() finally: conn.close() @staticmethod def get_vm_logs(vm_id, limit=50): """Recupera i log di una VM specifica""" conn = get_db_connection() try: with conn.cursor() as cursor: sql = """SELECT al.*, u.username FROM action_logs al JOIN users u ON al.user_id = u.id WHERE al.vm_id = %s ORDER BY al.created_at DESC LIMIT %s""" cursor.execute(sql, (vm_id, limit)) return cursor.fetchall() finally: conn.close()