diff --git a/.env.example b/.env.example index 00f4127..9902a0f 100644 --- a/.env.example +++ b/.env.example @@ -1,4 +1,33 @@ -APP_HOST=127.0.0.1 +# Flask/Quart Configuration +DEBUG=False +SECRET_KEY=your-super-secret-key-here-change-this + +# Server Configuration +APP_HOST=0.0.0.0 APP_PORT=5000 -DEBUG=True -SECRET_KEY=yoursecretkey \ No newline at end of file + +# MySQL Database Configuration +DB_HOST=localhost +DB_PORT=3306 +DB_USER=hersel_user +DB_PASSWORD=your_secure_password_here +DB_NAME=hersel_portfolio + +# Upload Configuration +UPLOAD_FOLDER=static/uploads +MAX_CONTENT_LENGTH=16777216 + +# Email Configuration (optional) +MAIL_SERVER=smtp.gmail.com +MAIL_PORT=587 +MAIL_USE_TLS=True +MAIL_USERNAME=your-email@gmail.com +MAIL_PASSWORD=your-email-password + +# Pagination +POSTS_PER_PAGE=10 +PROJECTS_PER_PAGE=12 + +# Cache +CACHE_TYPE=simple +CACHE_DEFAULT_TIMEOUT=300 diff --git a/config.py b/config.py index a577a0e..1d41147 100644 --- a/config.py +++ b/config.py @@ -1,17 +1,58 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# Copyright Hersel Giannella +# Enhanced Configuration with Database Settings -from pydantic_settings import BaseSettings +import os +from dotenv import load_dotenv -class Config(BaseSettings): - APP_HOST: str = "127.0.0.1" - APP_PORT: int = 5000 - DEBUG: bool = True - SECRET_KEY: str = "default_secret_key" +# Load environment variables from .env file +load_dotenv() - class Config: - env_file = ".env" +class Config: + # Flask/Quart Settings + DEBUG = os.getenv('DEBUG', 'False').lower() == 'true' + SECRET_KEY = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production') + + # Server Settings + APP_HOST = os.getenv('APP_HOST', '0.0.0.0') + APP_PORT = int(os.getenv('APP_PORT', '5000')) + + # Database Settings + DB_HOST = os.getenv('DB_HOST', 'localhost') + DB_PORT = int(os.getenv('DB_PORT', '3306')) + DB_USER = os.getenv('DB_USER', 'hersel_user') + DB_PASSWORD = os.getenv('DB_PASSWORD', 'your_password_here') + DB_NAME = os.getenv('DB_NAME', 'hersel_portfolio') + + # Session Settings + SESSION_PERMANENT = False + SESSION_TYPE = 'filesystem' + PERMANENT_SESSION_LIFETIME = 60 * 60 * 24 * 7 # 7 days + + # Upload Settings + UPLOAD_FOLDER = os.getenv('UPLOAD_FOLDER', 'static/uploads') + MAX_CONTENT_LENGTH = int(os.getenv('MAX_CONTENT_LENGTH', '16777216')) # 16MB + ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp', 'pdf', 'txt', 'doc', 'docx'} + + # Email Settings (for future use) + MAIL_SERVER = os.getenv('MAIL_SERVER', 'localhost') + MAIL_PORT = int(os.getenv('MAIL_PORT', '587')) + MAIL_USE_TLS = os.getenv('MAIL_USE_TLS', 'True').lower() == 'true' + MAIL_USERNAME = os.getenv('MAIL_USERNAME', '') + MAIL_PASSWORD = os.getenv('MAIL_PASSWORD', '') + + # Security Settings + WTF_CSRF_ENABLED = True + WTF_CSRF_TIME_LIMIT = None + + # Pagination + POSTS_PER_PAGE = int(os.getenv('POSTS_PER_PAGE', '10')) + PROJECTS_PER_PAGE = int(os.getenv('PROJECTS_PER_PAGE', '12')) + + # Cache Settings + CACHE_TYPE = os.getenv('CACHE_TYPE', 'simple') + CACHE_DEFAULT_TIMEOUT = int(os.getenv('CACHE_DEFAULT_TIMEOUT', '300')) +# Create config instance config = Config() diff --git a/models/__init__.py b/models/__init__.py new file mode 100644 index 0000000..6ae9b71 --- /dev/null +++ b/models/__init__.py @@ -0,0 +1,8 @@ +# Database Models Package +from .user import User +from .post import Post +from .project import Project +from .category import Category +from .settings import Settings + +__all__ = ['User', 'Post', 'Project', 'Category', 'Settings'] diff --git a/models/database.py b/models/database.py new file mode 100644 index 0000000..1b4ad3d --- /dev/null +++ b/models/database.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Database Configuration and Connection Manager + +import asyncio +import aiomysql +from typing import Optional +from config import config + +class DatabaseManager: + def __init__(self): + self.pool: Optional[aiomysql.Pool] = None + + async def init_pool(self): + """Initialize connection pool""" + try: + self.pool = await aiomysql.create_pool( + host=config.DB_HOST, + port=config.DB_PORT, + user=config.DB_USER, + password=config.DB_PASSWORD, + db=config.DB_NAME, + minsize=1, + maxsize=10, + autocommit=True, + charset='utf8mb4' + ) + print("✅ Database connection pool initialized") + except Exception as e: + print(f"❌ Error initializing database pool: {e}") + raise + + async def close_pool(self): + """Close connection pool""" + if self.pool: + self.pool.close() + await self.pool.wait_closed() + print("🔒 Database connection pool closed") + + async def get_connection(self): + """Get connection from pool""" + if not self.pool: + await self.init_pool() + return self.pool.acquire() + + async def execute_query(self, query: str, params: tuple = None): + """Execute a query and return results""" + async with self.pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cursor: + await cursor.execute(query, params) + return await cursor.fetchall() + + async def execute_insert(self, query: str, params: tuple = None): + """Execute insert query and return last insert id""" + async with self.pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute(query, params) + return cursor.lastrowid + + async def execute_update(self, query: str, params: tuple = None): + """Execute update/delete query and return affected rows""" + async with self.pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute(query, params) + return cursor.rowcount + +# Global database manager instance +db_manager = DatabaseManager() + +async def init_database(): + """Initialize database and create tables""" + await db_manager.init_pool() + await create_tables() + +async def create_tables(): + """Create database tables if they don't exist""" + tables = [ + # Users table + """ + CREATE TABLE IF NOT EXISTS users ( + id INT AUTO_INCREMENT PRIMARY KEY, + username VARCHAR(50) UNIQUE NOT NULL, + email VARCHAR(100) UNIQUE NOT NULL, + password_hash VARCHAR(255) NOT NULL, + first_name VARCHAR(50), + last_name VARCHAR(50), + role ENUM('admin', 'user') DEFAULT 'user', + is_active BOOLEAN DEFAULT TRUE, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP + ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci + """, + + # Categories table + """ + CREATE TABLE IF NOT EXISTS categories ( + id INT AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(100) NOT NULL, + slug VARCHAR(100) UNIQUE NOT NULL, + description TEXT, + color VARCHAR(7) DEFAULT '#007bff', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci + """, + + # Projects table + """ + CREATE TABLE IF NOT EXISTS projects ( + id INT AUTO_INCREMENT PRIMARY KEY, + title VARCHAR(200) NOT NULL, + slug VARCHAR(200) UNIQUE NOT NULL, + description TEXT, + content LONGTEXT, + image_url VARCHAR(500), + github_url VARCHAR(500), + demo_url VARCHAR(500), + technologies JSON, + category_id INT, + is_featured BOOLEAN DEFAULT FALSE, + is_published BOOLEAN DEFAULT TRUE, + created_by INT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + FOREIGN KEY (category_id) REFERENCES categories(id) ON DELETE SET NULL, + FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE SET NULL + ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci + """, + + # Posts table (for blog functionality) + """ + CREATE TABLE IF NOT EXISTS posts ( + id INT AUTO_INCREMENT PRIMARY KEY, + title VARCHAR(200) NOT NULL, + slug VARCHAR(200) UNIQUE NOT NULL, + excerpt TEXT, + content LONGTEXT, + featured_image VARCHAR(500), + category_id INT, + author_id INT, + is_published BOOLEAN DEFAULT FALSE, + published_at TIMESTAMP NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + FOREIGN KEY (category_id) REFERENCES categories(id) ON DELETE SET NULL, + FOREIGN KEY (author_id) REFERENCES users(id) ON DELETE SET NULL + ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci + """, + + # Settings table + """ + CREATE TABLE IF NOT EXISTS settings ( + id INT AUTO_INCREMENT PRIMARY KEY, + setting_key VARCHAR(100) UNIQUE NOT NULL, + setting_value LONGTEXT, + description TEXT, + type ENUM('text', 'textarea', 'boolean', 'json') DEFAULT 'text', + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP + ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci + """ + ] + + for table_sql in tables: + try: + await db_manager.execute_update(table_sql) + print(f"✅ Table created/verified") + except Exception as e: + print(f"❌ Error creating table: {e}") + + # Insert default settings + await insert_default_settings() + +async def insert_default_settings(): + """Insert default settings if they don't exist""" + default_settings = [ + ('site_name', 'Hersel.it', 'Nome del sito', 'text'), + ('site_description', 'Portfolio personale di Hersel Giannella', 'Descrizione del sito', 'textarea'), + ('admin_email', 'admin@hersel.it', 'Email amministratore', 'text'), + ('site_logo', '/static/img/logo.png', 'URL del logo', 'text'), + ('social_github', 'https://github.com/BluLupo', 'GitHub URL', 'text'), + ('social_linkedin', '', 'LinkedIn URL', 'text'), + ('social_twitter', '', 'Twitter URL', 'text'), + ('site_maintenance', 'false', 'Modalità manutenzione', 'boolean'), + ('analytics_code', '', 'Codice Analytics', 'textarea') + ] + + for key, value, desc, type_val in default_settings: + try: + await db_manager.execute_insert( + "INSERT IGNORE INTO settings (setting_key, setting_value, description, type) VALUES (%s, %s, %s, %s)", + (key, value, desc, type_val) + ) + except Exception as e: + print(f"Warning: Could not insert setting {key}: {e}") diff --git a/models/project.py b/models/project.py new file mode 100644 index 0000000..018c144 --- /dev/null +++ b/models/project.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Project Model + +import json +from typing import Optional, Dict, List +from .database import db_manager +from datetime import datetime + +class Project: + def __init__(self, **kwargs): + self.id = kwargs.get('id') + self.title = kwargs.get('title') + self.slug = kwargs.get('slug') + self.description = kwargs.get('description') + self.content = kwargs.get('content') + self.image_url = kwargs.get('image_url') + self.github_url = kwargs.get('github_url') + self.demo_url = kwargs.get('demo_url') + self.technologies = kwargs.get('technologies', []) + self.category_id = kwargs.get('category_id') + self.is_featured = kwargs.get('is_featured', False) + self.is_published = kwargs.get('is_published', True) + self.created_by = kwargs.get('created_by') + self.created_at = kwargs.get('created_at') + self.updated_at = kwargs.get('updated_at') + + # Handle JSON technologies field + if isinstance(self.technologies, str): + try: + self.technologies = json.loads(self.technologies) + except: + self.technologies = [] + + @property + def technologies_json(self) -> str: + """Get technologies as JSON string""" + return json.dumps(self.technologies) if self.technologies else '[]' + + async def save(self) -> int: + """Save project to database""" + if self.id: + # Update existing project + query = """ + UPDATE projects SET + title=%s, slug=%s, description=%s, content=%s, image_url=%s, + github_url=%s, demo_url=%s, technologies=%s, category_id=%s, + is_featured=%s, is_published=%s, updated_at=NOW() + WHERE id=%s + """ + params = (self.title, self.slug, self.description, self.content, self.image_url, + self.github_url, self.demo_url, self.technologies_json, self.category_id, + self.is_featured, self.is_published, self.id) + await db_manager.execute_update(query, params) + return self.id + else: + # Insert new project + query = """ + INSERT INTO projects (title, slug, description, content, image_url, github_url, demo_url, + technologies, category_id, is_featured, is_published, created_by) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """ + params = (self.title, self.slug, self.description, self.content, self.image_url, + self.github_url, self.demo_url, self.technologies_json, self.category_id, + self.is_featured, self.is_published, self.created_by) + project_id = await db_manager.execute_insert(query, params) + self.id = project_id + return project_id + + @classmethod + async def find_by_id(cls, project_id: int) -> Optional['Project']: + """Find project by ID""" + query = "SELECT * FROM projects WHERE id = %s" + results = await db_manager.execute_query(query, (project_id,)) + if results: + return cls(**results[0]) + return None + + @classmethod + async def find_by_slug(cls, slug: str) -> Optional['Project']: + """Find project by slug""" + query = "SELECT * FROM projects WHERE slug = %s AND is_published = TRUE" + results = await db_manager.execute_query(query, (slug,)) + if results: + return cls(**results[0]) + return None + + @classmethod + async def get_all(cls, published_only: bool = True, limit: int = 50, offset: int = 0) -> List['Project']: + """Get all projects with pagination""" + query = "SELECT * FROM projects" + params = [] + + if published_only: + query += " WHERE is_published = TRUE" + + query += " ORDER BY created_at DESC LIMIT %s OFFSET %s" + params.extend([limit, offset]) + + results = await db_manager.execute_query(query, tuple(params)) + return [cls(**row) for row in results] + + @classmethod + async def get_featured(cls, limit: int = 6) -> List['Project']: + """Get featured projects""" + query = """ + SELECT * FROM projects + WHERE is_published = TRUE AND is_featured = TRUE + ORDER BY created_at DESC + LIMIT %s + """ + results = await db_manager.execute_query(query, (limit,)) + return [cls(**row) for row in results] + + @classmethod + async def get_by_category(cls, category_id: int, limit: int = 20) -> List['Project']: + """Get projects by category""" + query = """ + SELECT * FROM projects + WHERE category_id = %s AND is_published = TRUE + ORDER BY created_at DESC + LIMIT %s + """ + results = await db_manager.execute_query(query, (category_id, limit)) + return [cls(**row) for row in results] + + @classmethod + async def search(cls, search_term: str, limit: int = 20) -> List['Project']: + """Search projects by title and description""" + query = """ + SELECT * FROM projects + WHERE (title LIKE %s OR description LIKE %s) AND is_published = TRUE + ORDER BY created_at DESC + LIMIT %s + """ + search_pattern = f"%{search_term}%" + results = await db_manager.execute_query(query, (search_pattern, search_pattern, limit)) + return [cls(**row) for row in results] + + @classmethod + async def count(cls, published_only: bool = True) -> int: + """Count total projects""" + query = "SELECT COUNT(*) as count FROM projects" + if published_only: + query += " WHERE is_published = TRUE" + + results = await db_manager.execute_query(query) + return results[0]['count'] if results else 0 + + async def delete(self) -> bool: + """Delete project""" + query = "DELETE FROM projects WHERE id = %s" + affected = await db_manager.execute_update(query, (self.id,)) + return affected > 0 + + def to_dict(self) -> Dict: + """Convert project to dictionary""" + return { + 'id': self.id, + 'title': self.title, + 'slug': self.slug, + 'description': self.description, + 'content': self.content, + 'image_url': self.image_url, + 'github_url': self.github_url, + 'demo_url': self.demo_url, + 'technologies': self.technologies, + 'category_id': self.category_id, + 'is_featured': self.is_featured, + 'is_published': self.is_published, + 'created_by': self.created_by, + 'created_at': self.created_at.isoformat() if self.created_at else None, + 'updated_at': self.updated_at.isoformat() if self.updated_at else None + } + + @staticmethod + def generate_slug(title: str) -> str: + """Generate URL-friendly slug from title""" + import re + slug = re.sub(r'[^\w\s-]', '', title).strip().lower() + slug = re.sub(r'[\s_-]+', '-', slug) + return slug diff --git a/models/user.py b/models/user.py new file mode 100644 index 0000000..4af080c --- /dev/null +++ b/models/user.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# User Model + +import bcrypt +from typing import Optional, Dict, List +from .database import db_manager +from datetime import datetime + +class User: + def __init__(self, **kwargs): + self.id = kwargs.get('id') + self.username = kwargs.get('username') + self.email = kwargs.get('email') + self.password_hash = kwargs.get('password_hash') + self.first_name = kwargs.get('first_name') + self.last_name = kwargs.get('last_name') + self.role = kwargs.get('role', 'user') + self.is_active = kwargs.get('is_active', True) + self.created_at = kwargs.get('created_at') + self.updated_at = kwargs.get('updated_at') + + @property + def full_name(self) -> str: + """Get user's full name""" + if self.first_name and self.last_name: + return f"{self.first_name} {self.last_name}" + return self.username + + @property + def is_admin(self) -> bool: + """Check if user is admin""" + return self.role == 'admin' + + def set_password(self, password: str) -> None: + """Hash and set user password""" + salt = bcrypt.gensalt() + self.password_hash = bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8') + + def check_password(self, password: str) -> bool: + """Check if provided password matches hash""" + if not self.password_hash: + return False + return bcrypt.checkpw(password.encode('utf-8'), self.password_hash.encode('utf-8')) + + async def save(self) -> int: + """Save user to database""" + if self.id: + # Update existing user + query = """ + UPDATE users SET + username=%s, email=%s, first_name=%s, last_name=%s, + role=%s, is_active=%s, updated_at=NOW() + WHERE id=%s + """ + params = (self.username, self.email, self.first_name, self.last_name, + self.role, self.is_active, self.id) + await db_manager.execute_update(query, params) + return self.id + else: + # Insert new user + query = """ + INSERT INTO users (username, email, password_hash, first_name, last_name, role, is_active) + VALUES (%s, %s, %s, %s, %s, %s, %s) + """ + params = (self.username, self.email, self.password_hash, self.first_name, + self.last_name, self.role, self.is_active) + user_id = await db_manager.execute_insert(query, params) + self.id = user_id + return user_id + + @classmethod + async def find_by_id(cls, user_id: int) -> Optional['User']: + """Find user by ID""" + query = "SELECT * FROM users WHERE id = %s AND is_active = TRUE" + results = await db_manager.execute_query(query, (user_id,)) + if results: + return cls(**results[0]) + return None + + @classmethod + async def find_by_username(cls, username: str) -> Optional['User']: + """Find user by username""" + query = "SELECT * FROM users WHERE username = %s AND is_active = TRUE" + results = await db_manager.execute_query(query, (username,)) + if results: + return cls(**results[0]) + return None + + @classmethod + async def find_by_email(cls, email: str) -> Optional['User']: + """Find user by email""" + query = "SELECT * FROM users WHERE email = %s AND is_active = TRUE" + results = await db_manager.execute_query(query, (email,)) + if results: + return cls(**results[0]) + return None + + @classmethod + async def authenticate(cls, username_or_email: str, password: str) -> Optional['User']: + """Authenticate user with username/email and password""" + # Try to find by username first, then by email + user = await cls.find_by_username(username_or_email) + if not user: + user = await cls.find_by_email(username_or_email) + + if user and user.check_password(password): + return user + return None + + @classmethod + async def get_all(cls, limit: int = 50, offset: int = 0) -> List['User']: + """Get all users with pagination""" + query = """ + SELECT * FROM users + ORDER BY created_at DESC + LIMIT %s OFFSET %s + """ + results = await db_manager.execute_query(query, (limit, offset)) + return [cls(**row) for row in results] + + @classmethod + async def count(cls) -> int: + """Count total users""" + query = "SELECT COUNT(*) as count FROM users WHERE is_active = TRUE" + results = await db_manager.execute_query(query) + return results[0]['count'] if results else 0 + + async def delete(self) -> bool: + """Soft delete user (mark as inactive)""" + query = "UPDATE users SET is_active = FALSE WHERE id = %s" + affected = await db_manager.execute_update(query, (self.id,)) + return affected > 0 + + def to_dict(self, include_sensitive: bool = False) -> Dict: + """Convert user to dictionary""" + data = { + 'id': self.id, + 'username': self.username, + 'email': self.email, + 'first_name': self.first_name, + 'last_name': self.last_name, + 'full_name': self.full_name, + 'role': self.role, + 'is_active': self.is_active, + 'is_admin': self.is_admin, + 'created_at': self.created_at.isoformat() if self.created_at else None + } + + if include_sensitive: + data['password_hash'] = self.password_hash + + return data diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..1744064 --- /dev/null +++ b/utils/__init__.py @@ -0,0 +1,10 @@ +# Utilities Package +from .auth import login_required, admin_required, get_current_user +from .helpers import flash_message, generate_slug, sanitize_html +from .validators import validate_email, validate_password + +__all__ = [ + 'login_required', 'admin_required', 'get_current_user', + 'flash_message', 'generate_slug', 'sanitize_html', + 'validate_email', 'validate_password' +] diff --git a/utils/auth.py b/utils/auth.py new file mode 100644 index 0000000..29883e3 --- /dev/null +++ b/utils/auth.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Authentication Utilities + +from functools import wraps +from quart import session, request, redirect, url_for, jsonify +from typing import Optional +from models.user import User + +def login_required(f): + """Decorator to require login for a route""" + @wraps(f) + async def decorated_function(*args, **kwargs): + if 'user_id' not in session: + if request.is_json: + return jsonify({'error': 'Login required'}), 401 + return redirect(url_for('auth.login')) + return await f(*args, **kwargs) + return decorated_function + +def admin_required(f): + """Decorator to require admin role for a route""" + @wraps(f) + async def decorated_function(*args, **kwargs): + if 'user_id' not in session: + if request.is_json: + return jsonify({'error': 'Login required'}), 401 + return redirect(url_for('auth.login')) + + user = await get_current_user() + if not user or not user.is_admin: + if request.is_json: + return jsonify({'error': 'Admin access required'}), 403 + return redirect(url_for('home.index')) + + return await f(*args, **kwargs) + return decorated_function + +async def get_current_user() -> Optional[User]: + """Get the currently logged-in user""" + if 'user_id' not in session: + return None + + try: + user_id = session['user_id'] + return await User.find_by_id(user_id) + except: + # Clear invalid session + session.pop('user_id', None) + return None + +def login_user(user: User): + """Log in a user (set session)""" + session['user_id'] = user.id + session['username'] = user.username + session['is_admin'] = user.is_admin + +def logout_user(): + """Log out the current user (clear session)""" + session.pop('user_id', None) + session.pop('username', None) + session.pop('is_admin', None) diff --git a/utils/helpers.py b/utils/helpers.py new file mode 100644 index 0000000..1620629 --- /dev/null +++ b/utils/helpers.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Helper Utilities + +import re +import html +from quart import session +from typing import List + +def flash_message(message: str, category: str = 'info'): + """Add a flash message to session""" + if 'flash_messages' not in session: + session['flash_messages'] = [] + session['flash_messages'].append({'message': message, 'category': category}) + +def get_flash_messages() -> List[dict]: + """Get and clear flash messages from session""" + messages = session.pop('flash_messages', []) + return messages + +def generate_slug(text: str) -> str: + """Generate URL-friendly slug from text""" + # Remove HTML tags + text = re.sub(r'<[^>]+>', '', text) + # Convert to lowercase and replace spaces/special chars with hyphens + slug = re.sub(r'[^\w\s-]', '', text).strip().lower() + slug = re.sub(r'[\s_-]+', '-', slug) + # Remove leading/trailing hyphens + slug = slug.strip('-') + return slug + +def sanitize_html(text: str) -> str: + """Basic HTML sanitization""" + if not text: + return '' + # Escape HTML entities + return html.escape(text) + +def truncate_text(text: str, max_length: int = 150, suffix: str = '...') -> str: + """Truncate text to specified length""" + if not text or len(text) <= max_length: + return text + return text[:max_length].rsplit(' ', 1)[0] + suffix + +def format_date(date_obj, format_str: str = '%d/%m/%Y') -> str: + """Format datetime object to string""" + if not date_obj: + return '' + return date_obj.strftime(format_str) + +def paginate_query_params(page: int = 1, per_page: int = 10) -> tuple: + """Generate LIMIT and OFFSET for pagination""" + if page < 1: + page = 1 + offset = (page - 1) * per_page + return per_page, offset + +def calculate_pagination(total_items: int, page: int, per_page: int) -> dict: + """Calculate pagination information""" + total_pages = (total_items + per_page - 1) // per_page + has_prev = page > 1 + has_next = page < total_pages + + return { + 'page': page, + 'per_page': per_page, + 'total_pages': total_pages, + 'total_items': total_items, + 'has_prev': has_prev, + 'has_next': has_next, + 'prev_page': page - 1 if has_prev else None, + 'next_page': page + 1 if has_next else None + } diff --git a/utils/validators.py b/utils/validators.py new file mode 100644 index 0000000..e69bb71 --- /dev/null +++ b/utils/validators.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Validation Utilities + +import re +from typing import Tuple, Optional + +def validate_email(email: str) -> Tuple[bool, Optional[str]]: + """Validate email format""" + if not email: + return False, "Email è richiesta" + + # Basic email regex pattern + pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' + + if not re.match(pattern, email): + return False, "Formato email non valido" + + if len(email) > 100: + return False, "Email troppo lunga (max 100 caratteri)" + + return True, None + +def validate_password(password: str) -> Tuple[bool, Optional[str]]: + """Validate password strength""" + if not password: + return False, "Password è richiesta" + + if len(password) < 8: + return False, "Password deve essere almeno 8 caratteri" + + if len(password) > 128: + return False, "Password troppo lunga (max 128 caratteri)" + + # Check for at least one uppercase letter + if not re.search(r'[A-Z]', password): + return False, "Password deve contenere almeno una lettera maiuscola" + + # Check for at least one lowercase letter + if not re.search(r'[a-z]', password): + return False, "Password deve contenere almeno una lettera minuscola" + + # Check for at least one digit + if not re.search(r'\d', password): + return False, "Password deve contenere almeno un numero" + + return True, None + +def validate_username(username: str) -> Tuple[bool, Optional[str]]: + """Validate username format""" + if not username: + return False, "Username è richiesto" + + if len(username) < 3: + return False, "Username deve essere almeno 3 caratteri" + + if len(username) > 50: + return False, "Username troppo lungo (max 50 caratteri)" + + # Check for valid characters (alphanumeric and underscore) + if not re.match(r'^[a-zA-Z0-9_]+$', username): + return False, "Username può contenere solo lettere, numeri e underscore" + + return True, None + +def validate_project_data(data: dict) -> Tuple[bool, dict]: + """Validate project data""" + errors = {} + + # Title validation + if not data.get('title', '').strip(): + errors['title'] = 'Titolo è richiesto' + elif len(data['title']) > 200: + errors['title'] = 'Titolo troppo lungo (max 200 caratteri)' + + # Description validation + if data.get('description') and len(data['description']) > 1000: + errors['description'] = 'Descrizione troppo lunga (max 1000 caratteri)' + + # URL validations + url_pattern = r'^https?://[^\s]+$' + + if data.get('github_url') and not re.match(url_pattern, data['github_url']): + errors['github_url'] = 'URL GitHub non valido' + + if data.get('demo_url') and not re.match(url_pattern, data['demo_url']): + errors['demo_url'] = 'URL Demo non valido' + + if data.get('image_url') and not re.match(url_pattern, data['image_url']): + errors['image_url'] = 'URL Immagine non valido' + + return len(errors) == 0, errors + +def validate_post_data(data: dict) -> Tuple[bool, dict]: + """Validate blog post data""" + errors = {} + + # Title validation + if not data.get('title', '').strip(): + errors['title'] = 'Titolo è richiesto' + elif len(data['title']) > 200: + errors['title'] = 'Titolo troppo lungo (max 200 caratteri)' + + # Content validation + if not data.get('content', '').strip(): + errors['content'] = 'Contenuto è richiesto' + + # Excerpt validation + if data.get('excerpt') and len(data['excerpt']) > 500: + errors['excerpt'] = 'Riassunto troppo lungo (max 500 caratteri)' + + return len(errors) == 0, errors