Add dynamic features: database models, authentication, and admin dashboard

This commit is contained in:
2025-09-21 17:33:33 +02:00
parent 058f6205d7
commit f96b7d47e0
10 changed files with 881 additions and 12 deletions

View File

@@ -1,4 +1,33 @@
APP_HOST=127.0.0.1
# Flask/Quart Configuration
DEBUG=False
SECRET_KEY=your-super-secret-key-here-change-this
# Server Configuration
APP_HOST=0.0.0.0
APP_PORT=5000
DEBUG=True
SECRET_KEY=yoursecretkey
# MySQL Database Configuration
DB_HOST=localhost
DB_PORT=3306
DB_USER=hersel_user
DB_PASSWORD=your_secure_password_here
DB_NAME=hersel_portfolio
# Upload Configuration
UPLOAD_FOLDER=static/uploads
MAX_CONTENT_LENGTH=16777216
# Email Configuration (optional)
MAIL_SERVER=smtp.gmail.com
MAIL_PORT=587
MAIL_USE_TLS=True
MAIL_USERNAME=your-email@gmail.com
MAIL_PASSWORD=your-email-password
# Pagination
POSTS_PER_PAGE=10
PROJECTS_PER_PAGE=12
# Cache
CACHE_TYPE=simple
CACHE_DEFAULT_TIMEOUT=300

View File

@@ -1,17 +1,58 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright Hersel Giannella
# Enhanced Configuration with Database Settings
from pydantic_settings import BaseSettings
import os
from dotenv import load_dotenv
class Config(BaseSettings):
APP_HOST: str = "127.0.0.1"
APP_PORT: int = 5000
DEBUG: bool = True
SECRET_KEY: str = "default_secret_key"
# Load environment variables from .env file
load_dotenv()
class Config:
env_file = ".env"
# Flask/Quart Settings
DEBUG = os.getenv('DEBUG', 'False').lower() == 'true'
SECRET_KEY = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production')
# Server Settings
APP_HOST = os.getenv('APP_HOST', '0.0.0.0')
APP_PORT = int(os.getenv('APP_PORT', '5000'))
# Database Settings
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = int(os.getenv('DB_PORT', '3306'))
DB_USER = os.getenv('DB_USER', 'hersel_user')
DB_PASSWORD = os.getenv('DB_PASSWORD', 'your_password_here')
DB_NAME = os.getenv('DB_NAME', 'hersel_portfolio')
# Session Settings
SESSION_PERMANENT = False
SESSION_TYPE = 'filesystem'
PERMANENT_SESSION_LIFETIME = 60 * 60 * 24 * 7 # 7 days
# Upload Settings
UPLOAD_FOLDER = os.getenv('UPLOAD_FOLDER', 'static/uploads')
MAX_CONTENT_LENGTH = int(os.getenv('MAX_CONTENT_LENGTH', '16777216')) # 16MB
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp', 'pdf', 'txt', 'doc', 'docx'}
# Email Settings (for future use)
MAIL_SERVER = os.getenv('MAIL_SERVER', 'localhost')
MAIL_PORT = int(os.getenv('MAIL_PORT', '587'))
MAIL_USE_TLS = os.getenv('MAIL_USE_TLS', 'True').lower() == 'true'
MAIL_USERNAME = os.getenv('MAIL_USERNAME', '')
MAIL_PASSWORD = os.getenv('MAIL_PASSWORD', '')
# Security Settings
WTF_CSRF_ENABLED = True
WTF_CSRF_TIME_LIMIT = None
# Pagination
POSTS_PER_PAGE = int(os.getenv('POSTS_PER_PAGE', '10'))
PROJECTS_PER_PAGE = int(os.getenv('PROJECTS_PER_PAGE', '12'))
# Cache Settings
CACHE_TYPE = os.getenv('CACHE_TYPE', 'simple')
CACHE_DEFAULT_TIMEOUT = int(os.getenv('CACHE_DEFAULT_TIMEOUT', '300'))
# Create config instance
config = Config()

8
models/__init__.py Normal file
View File

@@ -0,0 +1,8 @@
# Database Models Package
from .user import User
from .post import Post
from .project import Project
from .category import Category
from .settings import Settings
__all__ = ['User', 'Post', 'Project', 'Category', 'Settings']

194
models/database.py Normal file
View File

@@ -0,0 +1,194 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Database Configuration and Connection Manager
import asyncio
import aiomysql
from typing import Optional
from config import config
class DatabaseManager:
def __init__(self):
self.pool: Optional[aiomysql.Pool] = None
async def init_pool(self):
"""Initialize connection pool"""
try:
self.pool = await aiomysql.create_pool(
host=config.DB_HOST,
port=config.DB_PORT,
user=config.DB_USER,
password=config.DB_PASSWORD,
db=config.DB_NAME,
minsize=1,
maxsize=10,
autocommit=True,
charset='utf8mb4'
)
print("✅ Database connection pool initialized")
except Exception as e:
print(f"❌ Error initializing database pool: {e}")
raise
async def close_pool(self):
"""Close connection pool"""
if self.pool:
self.pool.close()
await self.pool.wait_closed()
print("🔒 Database connection pool closed")
async def get_connection(self):
"""Get connection from pool"""
if not self.pool:
await self.init_pool()
return self.pool.acquire()
async def execute_query(self, query: str, params: tuple = None):
"""Execute a query and return results"""
async with self.pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(query, params)
return await cursor.fetchall()
async def execute_insert(self, query: str, params: tuple = None):
"""Execute insert query and return last insert id"""
async with self.pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(query, params)
return cursor.lastrowid
async def execute_update(self, query: str, params: tuple = None):
"""Execute update/delete query and return affected rows"""
async with self.pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(query, params)
return cursor.rowcount
# Global database manager instance
db_manager = DatabaseManager()
async def init_database():
"""Initialize database and create tables"""
await db_manager.init_pool()
await create_tables()
async def create_tables():
"""Create database tables if they don't exist"""
tables = [
# Users table
"""
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
first_name VARCHAR(50),
last_name VARCHAR(50),
role ENUM('admin', 'user') DEFAULT 'user',
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci
""",
# Categories table
"""
CREATE TABLE IF NOT EXISTS categories (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
slug VARCHAR(100) UNIQUE NOT NULL,
description TEXT,
color VARCHAR(7) DEFAULT '#007bff',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci
""",
# Projects table
"""
CREATE TABLE IF NOT EXISTS projects (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(200) NOT NULL,
slug VARCHAR(200) UNIQUE NOT NULL,
description TEXT,
content LONGTEXT,
image_url VARCHAR(500),
github_url VARCHAR(500),
demo_url VARCHAR(500),
technologies JSON,
category_id INT,
is_featured BOOLEAN DEFAULT FALSE,
is_published BOOLEAN DEFAULT TRUE,
created_by INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (category_id) REFERENCES categories(id) ON DELETE SET NULL,
FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE SET NULL
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci
""",
# Posts table (for blog functionality)
"""
CREATE TABLE IF NOT EXISTS posts (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(200) NOT NULL,
slug VARCHAR(200) UNIQUE NOT NULL,
excerpt TEXT,
content LONGTEXT,
featured_image VARCHAR(500),
category_id INT,
author_id INT,
is_published BOOLEAN DEFAULT FALSE,
published_at TIMESTAMP NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (category_id) REFERENCES categories(id) ON DELETE SET NULL,
FOREIGN KEY (author_id) REFERENCES users(id) ON DELETE SET NULL
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci
""",
# Settings table
"""
CREATE TABLE IF NOT EXISTS settings (
id INT AUTO_INCREMENT PRIMARY KEY,
setting_key VARCHAR(100) UNIQUE NOT NULL,
setting_value LONGTEXT,
description TEXT,
type ENUM('text', 'textarea', 'boolean', 'json') DEFAULT 'text',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci
"""
]
for table_sql in tables:
try:
await db_manager.execute_update(table_sql)
print(f"✅ Table created/verified")
except Exception as e:
print(f"❌ Error creating table: {e}")
# Insert default settings
await insert_default_settings()
async def insert_default_settings():
"""Insert default settings if they don't exist"""
default_settings = [
('site_name', 'Hersel.it', 'Nome del sito', 'text'),
('site_description', 'Portfolio personale di Hersel Giannella', 'Descrizione del sito', 'textarea'),
('admin_email', 'admin@hersel.it', 'Email amministratore', 'text'),
('site_logo', '/static/img/logo.png', 'URL del logo', 'text'),
('social_github', 'https://github.com/BluLupo', 'GitHub URL', 'text'),
('social_linkedin', '', 'LinkedIn URL', 'text'),
('social_twitter', '', 'Twitter URL', 'text'),
('site_maintenance', 'false', 'Modalità manutenzione', 'boolean'),
('analytics_code', '', 'Codice Analytics', 'textarea')
]
for key, value, desc, type_val in default_settings:
try:
await db_manager.execute_insert(
"INSERT IGNORE INTO settings (setting_key, setting_value, description, type) VALUES (%s, %s, %s, %s)",
(key, value, desc, type_val)
)
except Exception as e:
print(f"Warning: Could not insert setting {key}: {e}")

183
models/project.py Normal file
View File

@@ -0,0 +1,183 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Project Model
import json
from typing import Optional, Dict, List
from .database import db_manager
from datetime import datetime
class Project:
def __init__(self, **kwargs):
self.id = kwargs.get('id')
self.title = kwargs.get('title')
self.slug = kwargs.get('slug')
self.description = kwargs.get('description')
self.content = kwargs.get('content')
self.image_url = kwargs.get('image_url')
self.github_url = kwargs.get('github_url')
self.demo_url = kwargs.get('demo_url')
self.technologies = kwargs.get('technologies', [])
self.category_id = kwargs.get('category_id')
self.is_featured = kwargs.get('is_featured', False)
self.is_published = kwargs.get('is_published', True)
self.created_by = kwargs.get('created_by')
self.created_at = kwargs.get('created_at')
self.updated_at = kwargs.get('updated_at')
# Handle JSON technologies field
if isinstance(self.technologies, str):
try:
self.technologies = json.loads(self.technologies)
except:
self.technologies = []
@property
def technologies_json(self) -> str:
"""Get technologies as JSON string"""
return json.dumps(self.technologies) if self.technologies else '[]'
async def save(self) -> int:
"""Save project to database"""
if self.id:
# Update existing project
query = """
UPDATE projects SET
title=%s, slug=%s, description=%s, content=%s, image_url=%s,
github_url=%s, demo_url=%s, technologies=%s, category_id=%s,
is_featured=%s, is_published=%s, updated_at=NOW()
WHERE id=%s
"""
params = (self.title, self.slug, self.description, self.content, self.image_url,
self.github_url, self.demo_url, self.technologies_json, self.category_id,
self.is_featured, self.is_published, self.id)
await db_manager.execute_update(query, params)
return self.id
else:
# Insert new project
query = """
INSERT INTO projects (title, slug, description, content, image_url, github_url, demo_url,
technologies, category_id, is_featured, is_published, created_by)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
params = (self.title, self.slug, self.description, self.content, self.image_url,
self.github_url, self.demo_url, self.technologies_json, self.category_id,
self.is_featured, self.is_published, self.created_by)
project_id = await db_manager.execute_insert(query, params)
self.id = project_id
return project_id
@classmethod
async def find_by_id(cls, project_id: int) -> Optional['Project']:
"""Find project by ID"""
query = "SELECT * FROM projects WHERE id = %s"
results = await db_manager.execute_query(query, (project_id,))
if results:
return cls(**results[0])
return None
@classmethod
async def find_by_slug(cls, slug: str) -> Optional['Project']:
"""Find project by slug"""
query = "SELECT * FROM projects WHERE slug = %s AND is_published = TRUE"
results = await db_manager.execute_query(query, (slug,))
if results:
return cls(**results[0])
return None
@classmethod
async def get_all(cls, published_only: bool = True, limit: int = 50, offset: int = 0) -> List['Project']:
"""Get all projects with pagination"""
query = "SELECT * FROM projects"
params = []
if published_only:
query += " WHERE is_published = TRUE"
query += " ORDER BY created_at DESC LIMIT %s OFFSET %s"
params.extend([limit, offset])
results = await db_manager.execute_query(query, tuple(params))
return [cls(**row) for row in results]
@classmethod
async def get_featured(cls, limit: int = 6) -> List['Project']:
"""Get featured projects"""
query = """
SELECT * FROM projects
WHERE is_published = TRUE AND is_featured = TRUE
ORDER BY created_at DESC
LIMIT %s
"""
results = await db_manager.execute_query(query, (limit,))
return [cls(**row) for row in results]
@classmethod
async def get_by_category(cls, category_id: int, limit: int = 20) -> List['Project']:
"""Get projects by category"""
query = """
SELECT * FROM projects
WHERE category_id = %s AND is_published = TRUE
ORDER BY created_at DESC
LIMIT %s
"""
results = await db_manager.execute_query(query, (category_id, limit))
return [cls(**row) for row in results]
@classmethod
async def search(cls, search_term: str, limit: int = 20) -> List['Project']:
"""Search projects by title and description"""
query = """
SELECT * FROM projects
WHERE (title LIKE %s OR description LIKE %s) AND is_published = TRUE
ORDER BY created_at DESC
LIMIT %s
"""
search_pattern = f"%{search_term}%"
results = await db_manager.execute_query(query, (search_pattern, search_pattern, limit))
return [cls(**row) for row in results]
@classmethod
async def count(cls, published_only: bool = True) -> int:
"""Count total projects"""
query = "SELECT COUNT(*) as count FROM projects"
if published_only:
query += " WHERE is_published = TRUE"
results = await db_manager.execute_query(query)
return results[0]['count'] if results else 0
async def delete(self) -> bool:
"""Delete project"""
query = "DELETE FROM projects WHERE id = %s"
affected = await db_manager.execute_update(query, (self.id,))
return affected > 0
def to_dict(self) -> Dict:
"""Convert project to dictionary"""
return {
'id': self.id,
'title': self.title,
'slug': self.slug,
'description': self.description,
'content': self.content,
'image_url': self.image_url,
'github_url': self.github_url,
'demo_url': self.demo_url,
'technologies': self.technologies,
'category_id': self.category_id,
'is_featured': self.is_featured,
'is_published': self.is_published,
'created_by': self.created_by,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None
}
@staticmethod
def generate_slug(title: str) -> str:
"""Generate URL-friendly slug from title"""
import re
slug = re.sub(r'[^\w\s-]', '', title).strip().lower()
slug = re.sub(r'[\s_-]+', '-', slug)
return slug

154
models/user.py Normal file
View File

@@ -0,0 +1,154 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# User Model
import bcrypt
from typing import Optional, Dict, List
from .database import db_manager
from datetime import datetime
class User:
def __init__(self, **kwargs):
self.id = kwargs.get('id')
self.username = kwargs.get('username')
self.email = kwargs.get('email')
self.password_hash = kwargs.get('password_hash')
self.first_name = kwargs.get('first_name')
self.last_name = kwargs.get('last_name')
self.role = kwargs.get('role', 'user')
self.is_active = kwargs.get('is_active', True)
self.created_at = kwargs.get('created_at')
self.updated_at = kwargs.get('updated_at')
@property
def full_name(self) -> str:
"""Get user's full name"""
if self.first_name and self.last_name:
return f"{self.first_name} {self.last_name}"
return self.username
@property
def is_admin(self) -> bool:
"""Check if user is admin"""
return self.role == 'admin'
def set_password(self, password: str) -> None:
"""Hash and set user password"""
salt = bcrypt.gensalt()
self.password_hash = bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
def check_password(self, password: str) -> bool:
"""Check if provided password matches hash"""
if not self.password_hash:
return False
return bcrypt.checkpw(password.encode('utf-8'), self.password_hash.encode('utf-8'))
async def save(self) -> int:
"""Save user to database"""
if self.id:
# Update existing user
query = """
UPDATE users SET
username=%s, email=%s, first_name=%s, last_name=%s,
role=%s, is_active=%s, updated_at=NOW()
WHERE id=%s
"""
params = (self.username, self.email, self.first_name, self.last_name,
self.role, self.is_active, self.id)
await db_manager.execute_update(query, params)
return self.id
else:
# Insert new user
query = """
INSERT INTO users (username, email, password_hash, first_name, last_name, role, is_active)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
params = (self.username, self.email, self.password_hash, self.first_name,
self.last_name, self.role, self.is_active)
user_id = await db_manager.execute_insert(query, params)
self.id = user_id
return user_id
@classmethod
async def find_by_id(cls, user_id: int) -> Optional['User']:
"""Find user by ID"""
query = "SELECT * FROM users WHERE id = %s AND is_active = TRUE"
results = await db_manager.execute_query(query, (user_id,))
if results:
return cls(**results[0])
return None
@classmethod
async def find_by_username(cls, username: str) -> Optional['User']:
"""Find user by username"""
query = "SELECT * FROM users WHERE username = %s AND is_active = TRUE"
results = await db_manager.execute_query(query, (username,))
if results:
return cls(**results[0])
return None
@classmethod
async def find_by_email(cls, email: str) -> Optional['User']:
"""Find user by email"""
query = "SELECT * FROM users WHERE email = %s AND is_active = TRUE"
results = await db_manager.execute_query(query, (email,))
if results:
return cls(**results[0])
return None
@classmethod
async def authenticate(cls, username_or_email: str, password: str) -> Optional['User']:
"""Authenticate user with username/email and password"""
# Try to find by username first, then by email
user = await cls.find_by_username(username_or_email)
if not user:
user = await cls.find_by_email(username_or_email)
if user and user.check_password(password):
return user
return None
@classmethod
async def get_all(cls, limit: int = 50, offset: int = 0) -> List['User']:
"""Get all users with pagination"""
query = """
SELECT * FROM users
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""
results = await db_manager.execute_query(query, (limit, offset))
return [cls(**row) for row in results]
@classmethod
async def count(cls) -> int:
"""Count total users"""
query = "SELECT COUNT(*) as count FROM users WHERE is_active = TRUE"
results = await db_manager.execute_query(query)
return results[0]['count'] if results else 0
async def delete(self) -> bool:
"""Soft delete user (mark as inactive)"""
query = "UPDATE users SET is_active = FALSE WHERE id = %s"
affected = await db_manager.execute_update(query, (self.id,))
return affected > 0
def to_dict(self, include_sensitive: bool = False) -> Dict:
"""Convert user to dictionary"""
data = {
'id': self.id,
'username': self.username,
'email': self.email,
'first_name': self.first_name,
'last_name': self.last_name,
'full_name': self.full_name,
'role': self.role,
'is_active': self.is_active,
'is_admin': self.is_admin,
'created_at': self.created_at.isoformat() if self.created_at else None
}
if include_sensitive:
data['password_hash'] = self.password_hash
return data

10
utils/__init__.py Normal file
View File

@@ -0,0 +1,10 @@
# Utilities Package
from .auth import login_required, admin_required, get_current_user
from .helpers import flash_message, generate_slug, sanitize_html
from .validators import validate_email, validate_password
__all__ = [
'login_required', 'admin_required', 'get_current_user',
'flash_message', 'generate_slug', 'sanitize_html',
'validate_email', 'validate_password'
]

63
utils/auth.py Normal file
View File

@@ -0,0 +1,63 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Authentication Utilities
from functools import wraps
from quart import session, request, redirect, url_for, jsonify
from typing import Optional
from models.user import User
def login_required(f):
"""Decorator to require login for a route"""
@wraps(f)
async def decorated_function(*args, **kwargs):
if 'user_id' not in session:
if request.is_json:
return jsonify({'error': 'Login required'}), 401
return redirect(url_for('auth.login'))
return await f(*args, **kwargs)
return decorated_function
def admin_required(f):
"""Decorator to require admin role for a route"""
@wraps(f)
async def decorated_function(*args, **kwargs):
if 'user_id' not in session:
if request.is_json:
return jsonify({'error': 'Login required'}), 401
return redirect(url_for('auth.login'))
user = await get_current_user()
if not user or not user.is_admin:
if request.is_json:
return jsonify({'error': 'Admin access required'}), 403
return redirect(url_for('home.index'))
return await f(*args, **kwargs)
return decorated_function
async def get_current_user() -> Optional[User]:
"""Get the currently logged-in user"""
if 'user_id' not in session:
return None
try:
user_id = session['user_id']
return await User.find_by_id(user_id)
except:
# Clear invalid session
session.pop('user_id', None)
return None
def login_user(user: User):
"""Log in a user (set session)"""
session['user_id'] = user.id
session['username'] = user.username
session['is_admin'] = user.is_admin
def logout_user():
"""Log out the current user (clear session)"""
session.pop('user_id', None)
session.pop('username', None)
session.pop('is_admin', None)

74
utils/helpers.py Normal file
View File

@@ -0,0 +1,74 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Helper Utilities
import re
import html
from quart import session
from typing import List
def flash_message(message: str, category: str = 'info'):
"""Add a flash message to session"""
if 'flash_messages' not in session:
session['flash_messages'] = []
session['flash_messages'].append({'message': message, 'category': category})
def get_flash_messages() -> List[dict]:
"""Get and clear flash messages from session"""
messages = session.pop('flash_messages', [])
return messages
def generate_slug(text: str) -> str:
"""Generate URL-friendly slug from text"""
# Remove HTML tags
text = re.sub(r'<[^>]+>', '', text)
# Convert to lowercase and replace spaces/special chars with hyphens
slug = re.sub(r'[^\w\s-]', '', text).strip().lower()
slug = re.sub(r'[\s_-]+', '-', slug)
# Remove leading/trailing hyphens
slug = slug.strip('-')
return slug
def sanitize_html(text: str) -> str:
"""Basic HTML sanitization"""
if not text:
return ''
# Escape HTML entities
return html.escape(text)
def truncate_text(text: str, max_length: int = 150, suffix: str = '...') -> str:
"""Truncate text to specified length"""
if not text or len(text) <= max_length:
return text
return text[:max_length].rsplit(' ', 1)[0] + suffix
def format_date(date_obj, format_str: str = '%d/%m/%Y') -> str:
"""Format datetime object to string"""
if not date_obj:
return ''
return date_obj.strftime(format_str)
def paginate_query_params(page: int = 1, per_page: int = 10) -> tuple:
"""Generate LIMIT and OFFSET for pagination"""
if page < 1:
page = 1
offset = (page - 1) * per_page
return per_page, offset
def calculate_pagination(total_items: int, page: int, per_page: int) -> dict:
"""Calculate pagination information"""
total_pages = (total_items + per_page - 1) // per_page
has_prev = page > 1
has_next = page < total_pages
return {
'page': page,
'per_page': per_page,
'total_pages': total_pages,
'total_items': total_items,
'has_prev': has_prev,
'has_next': has_next,
'prev_page': page - 1 if has_prev else None,
'next_page': page + 1 if has_next else None
}

113
utils/validators.py Normal file
View File

@@ -0,0 +1,113 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Validation Utilities
import re
from typing import Tuple, Optional
def validate_email(email: str) -> Tuple[bool, Optional[str]]:
"""Validate email format"""
if not email:
return False, "Email è richiesta"
# Basic email regex pattern
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(pattern, email):
return False, "Formato email non valido"
if len(email) > 100:
return False, "Email troppo lunga (max 100 caratteri)"
return True, None
def validate_password(password: str) -> Tuple[bool, Optional[str]]:
"""Validate password strength"""
if not password:
return False, "Password è richiesta"
if len(password) < 8:
return False, "Password deve essere almeno 8 caratteri"
if len(password) > 128:
return False, "Password troppo lunga (max 128 caratteri)"
# Check for at least one uppercase letter
if not re.search(r'[A-Z]', password):
return False, "Password deve contenere almeno una lettera maiuscola"
# Check for at least one lowercase letter
if not re.search(r'[a-z]', password):
return False, "Password deve contenere almeno una lettera minuscola"
# Check for at least one digit
if not re.search(r'\d', password):
return False, "Password deve contenere almeno un numero"
return True, None
def validate_username(username: str) -> Tuple[bool, Optional[str]]:
"""Validate username format"""
if not username:
return False, "Username è richiesto"
if len(username) < 3:
return False, "Username deve essere almeno 3 caratteri"
if len(username) > 50:
return False, "Username troppo lungo (max 50 caratteri)"
# Check for valid characters (alphanumeric and underscore)
if not re.match(r'^[a-zA-Z0-9_]+$', username):
return False, "Username può contenere solo lettere, numeri e underscore"
return True, None
def validate_project_data(data: dict) -> Tuple[bool, dict]:
"""Validate project data"""
errors = {}
# Title validation
if not data.get('title', '').strip():
errors['title'] = 'Titolo è richiesto'
elif len(data['title']) > 200:
errors['title'] = 'Titolo troppo lungo (max 200 caratteri)'
# Description validation
if data.get('description') and len(data['description']) > 1000:
errors['description'] = 'Descrizione troppo lunga (max 1000 caratteri)'
# URL validations
url_pattern = r'^https?://[^\s]+$'
if data.get('github_url') and not re.match(url_pattern, data['github_url']):
errors['github_url'] = 'URL GitHub non valido'
if data.get('demo_url') and not re.match(url_pattern, data['demo_url']):
errors['demo_url'] = 'URL Demo non valido'
if data.get('image_url') and not re.match(url_pattern, data['image_url']):
errors['image_url'] = 'URL Immagine non valido'
return len(errors) == 0, errors
def validate_post_data(data: dict) -> Tuple[bool, dict]:
"""Validate blog post data"""
errors = {}
# Title validation
if not data.get('title', '').strip():
errors['title'] = 'Titolo è richiesto'
elif len(data['title']) > 200:
errors['title'] = 'Titolo troppo lungo (max 200 caratteri)'
# Content validation
if not data.get('content', '').strip():
errors['content'] = 'Contenuto è richiesto'
# Excerpt validation
if data.get('excerpt') and len(data['excerpt']) > 500:
errors['excerpt'] = 'Riassunto troppo lungo (max 500 caratteri)'
return len(errors) == 0, errors