Files
proxmox_manager/models.py
2026-02-17 12:43:27 +01:00

409 lines
15 KiB
Python

import pymysql
from datetime import datetime
import bcrypt
from config import Config
def get_db_connection():
"""Crea connessione al database"""
return pymysql.connect(
host=Config.DB_HOST,
user=Config.DB_USER,
password=Config.DB_PASSWORD,
database=Config.DB_NAME,
cursorclass=pymysql.cursors.DictCursor,
autocommit=True
)
class User:
@staticmethod
def create(username, email, password, is_admin=False):
"""Crea un nuovo utente"""
password_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = """INSERT INTO users (username, email, password_hash, is_admin)
VALUES (%s, %s, %s, %s)"""
cursor.execute(sql, (username, email, password_hash, is_admin))
return cursor.lastrowid
finally:
conn.close()
@staticmethod
def get_by_username(username):
"""Recupera utente per username"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "SELECT * FROM users WHERE username = %s AND active = TRUE"
cursor.execute(sql, (username,))
return cursor.fetchone()
finally:
conn.close()
@staticmethod
def get_by_id(user_id):
"""Recupera utente per ID"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "SELECT * FROM users WHERE id = %s AND active = TRUE"
cursor.execute(sql, (user_id,))
return cursor.fetchone()
finally:
conn.close()
@staticmethod
def verify_password(username, password):
"""Verifica password utente"""
user = User.get_by_username(username)
if user:
stored_hash = user['password_hash']
# Se è una stringa, convertila in bytes
if isinstance(stored_hash, str):
stored_hash = stored_hash.encode('utf-8')
return bcrypt.checkpw(password.encode('utf-8'), stored_hash)
return False
@staticmethod
def update_last_login(user_id):
"""Aggiorna timestamp ultimo login"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "UPDATE users SET last_login = NOW() WHERE id = %s"
cursor.execute(sql, (user_id,))
finally:
conn.close()
@staticmethod
def get_all_users():
"""Recupera tutti gli utenti (solo admin)"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "SELECT id, username, email, is_admin, created_at, last_login, active FROM users ORDER BY created_at DESC"
cursor.execute(sql)
return cursor.fetchall()
finally:
conn.close()
@staticmethod
def update_active_status(user_id, active):
"""Attiva/disattiva un utente"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "UPDATE users SET active = %s WHERE id = %s"
cursor.execute(sql, (active, user_id))
return cursor.rowcount > 0
finally:
conn.close()
@staticmethod
def update_password(user_id, new_password):
"""Aggiorna la password di un utente"""
password_hash = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt())
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "UPDATE users SET password_hash = %s WHERE id = %s"
cursor.execute(sql, (password_hash, user_id))
return cursor.rowcount > 0
finally:
conn.close()
@staticmethod
def update_email(user_id, new_email):
"""Aggiorna l'email di un utente"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "UPDATE users SET email = %s WHERE id = %s"
cursor.execute(sql, (new_email, user_id))
return cursor.rowcount > 0
finally:
conn.close()
class UserVM:
@staticmethod
def assign_vm(user_id, vm_id, vm_name=None, notes=None):
"""Assegna una VM a un utente"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = """INSERT INTO user_vms (user_id, vm_id, vm_name, notes)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE vm_name = VALUES(vm_name), notes = VALUES(notes)"""
cursor.execute(sql, (user_id, vm_id, vm_name, notes))
return cursor.lastrowid
finally:
conn.close()
@staticmethod
def get_user_vms(user_id):
"""Recupera le VM di un utente"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "SELECT * FROM user_vms WHERE user_id = %s ORDER BY vm_id"
cursor.execute(sql, (user_id,))
return cursor.fetchall()
finally:
conn.close()
@staticmethod
def remove_vm(user_id, vm_id):
"""Rimuove l'associazione VM-utente"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "DELETE FROM user_vms WHERE user_id = %s AND vm_id = %s"
cursor.execute(sql, (user_id, vm_id))
return cursor.rowcount > 0
finally:
conn.close()
@staticmethod
def user_has_vm(user_id, vm_id):
"""Verifica se l'utente ha accesso a questa VM"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "SELECT COUNT(*) as count FROM user_vms WHERE user_id = %s AND vm_id = %s"
cursor.execute(sql, (user_id, vm_id))
result = cursor.fetchone()
return result['count'] > 0
finally:
conn.close()
@staticmethod
def get_all_assignments():
"""Recupera tutte le assegnazioni VM-utente (solo admin)"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = """SELECT uv.*, u.username
FROM user_vms uv
JOIN users u ON uv.user_id = u.id
ORDER BY u.username, uv.vm_id"""
cursor.execute(sql)
return cursor.fetchall()
finally:
conn.close()
class Subdomain:
@staticmethod
def create(user_id, subdomain, ip_address, vm_id=None, proxied=True, cloudflare_record_id=None):
"""Crea un nuovo sottodominio"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = """INSERT INTO subdomains (user_id, subdomain, ip_address, vm_id, proxied, cloudflare_record_id)
VALUES (%s, %s, %s, %s, %s, %s)"""
cursor.execute(sql, (user_id, subdomain, ip_address, vm_id, proxied, cloudflare_record_id))
return cursor.lastrowid
finally:
conn.close()
@staticmethod
def get_user_subdomains(user_id):
"""Recupera tutti i sottodomini di un utente"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "SELECT * FROM subdomains WHERE user_id = %s ORDER BY created_at DESC"
cursor.execute(sql, (user_id,))
return cursor.fetchall()
finally:
conn.close()
@staticmethod
def get_by_id(subdomain_id):
"""Recupera sottodominio per ID"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "SELECT * FROM subdomains WHERE id = %s"
cursor.execute(sql, (subdomain_id,))
return cursor.fetchone()
finally:
conn.close()
@staticmethod
def delete(subdomain_id, user_id):
"""Elimina un sottodominio (con controllo ownership)"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "DELETE FROM subdomains WHERE id = %s AND user_id = %s"
cursor.execute(sql, (subdomain_id, user_id))
return cursor.rowcount > 0
finally:
conn.close()
@staticmethod
def subdomain_exists(subdomain):
"""Verifica se un sottodominio esiste già"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "SELECT COUNT(*) as count FROM subdomains WHERE subdomain = %s"
cursor.execute(sql, (subdomain,))
result = cursor.fetchone()
return result['count'] > 0
finally:
conn.close()
@staticmethod
def update_ip(subdomain_id, user_id, new_ip):
"""Aggiorna IP di un sottodominio"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "UPDATE subdomains SET ip_address = %s WHERE id = %s AND user_id = %s"
cursor.execute(sql, (new_ip, subdomain_id, user_id))
return cursor.rowcount > 0
finally:
conn.close()
class IPAssignment:
@staticmethod
def create(vm_id, ip_address, assigned_by, mac_address=None, notes=None):
"""Crea una nuova assegnazione IP"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = """INSERT INTO ip_assignments (vm_id, ip_address, mac_address, notes, assigned_by)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
ip_address = VALUES(ip_address),
mac_address = VALUES(mac_address),
notes = VALUES(notes),
assigned_by = VALUES(assigned_by),
updated_at = CURRENT_TIMESTAMP"""
cursor.execute(sql, (vm_id, ip_address, mac_address, notes, assigned_by))
return cursor.lastrowid
finally:
conn.close()
@staticmethod
def get_by_vm_id(vm_id):
"""Recupera assegnazione IP per VM ID"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "SELECT * FROM ip_assignments WHERE vm_id = %s"
cursor.execute(sql, (vm_id,))
return cursor.fetchone()
finally:
conn.close()
@staticmethod
def get_all():
"""Recupera tutte le assegnazioni IP"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = """SELECT ia.*, u.username as assigned_by_username
FROM ip_assignments ia
LEFT JOIN users u ON ia.assigned_by = u.id
ORDER BY ia.updated_at DESC"""
cursor.execute(sql)
return cursor.fetchall()
finally:
conn.close()
@staticmethod
def delete(vm_id):
"""Elimina un'assegnazione IP"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "DELETE FROM ip_assignments WHERE vm_id = %s"
cursor.execute(sql, (vm_id,))
return cursor.rowcount > 0
finally:
conn.close()
@staticmethod
def check_ip_conflict(ip_address, exclude_vm_id=None):
"""Verifica se un IP è già assegnato"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
if exclude_vm_id:
sql = "SELECT vm_id FROM ip_assignments WHERE ip_address = %s AND vm_id != %s"
cursor.execute(sql, (ip_address, exclude_vm_id))
else:
sql = "SELECT vm_id FROM ip_assignments WHERE ip_address = %s"
cursor.execute(sql, (ip_address,))
result = cursor.fetchone()
return result['vm_id'] if result else None
finally:
conn.close()
class ActionLog:
@staticmethod
def log_action(user_id, vm_id, action_type, status, error_message=None, ip_address=None):
"""Registra un'azione dell'utente"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = """INSERT INTO action_logs
(user_id, vm_id, action_type, status, error_message, ip_address)
VALUES (%s, %s, %s, %s, %s, %s)"""
cursor.execute(sql, (user_id, vm_id, action_type, status, error_message, ip_address))
return cursor.lastrowid
finally:
conn.close()
@staticmethod
def get_user_logs(user_id, limit=50):
"""Recupera i log di un utente"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = """SELECT * FROM action_logs
WHERE user_id = %s
ORDER BY created_at DESC
LIMIT %s"""
cursor.execute(sql, (user_id, limit))
return cursor.fetchall()
finally:
conn.close()
@staticmethod
def get_all_logs(limit=100):
"""Recupera tutti i log (solo admin)"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = """SELECT al.*, u.username
FROM action_logs al
JOIN users u ON al.user_id = u.id
ORDER BY al.created_at DESC
LIMIT %s"""
cursor.execute(sql, (limit,))
return cursor.fetchall()
finally:
conn.close()
@staticmethod
def get_vm_logs(vm_id, limit=50):
"""Recupera i log di una VM specifica"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = """SELECT al.*, u.username
FROM action_logs al
JOIN users u ON al.user_id = u.id
WHERE al.vm_id = %s
ORDER BY al.created_at DESC
LIMIT %s"""
cursor.execute(sql, (vm_id, limit))
return cursor.fetchall()
finally:
conn.close()