155 lines
5.6 KiB
Python
155 lines
5.6 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# User Model
|
|
|
|
import bcrypt
|
|
from typing import Optional, Dict, List
|
|
from .database import db_manager
|
|
from datetime import datetime
|
|
|
|
class User:
|
|
def __init__(self, **kwargs):
|
|
self.id = kwargs.get('id')
|
|
self.username = kwargs.get('username')
|
|
self.email = kwargs.get('email')
|
|
self.password_hash = kwargs.get('password_hash')
|
|
self.first_name = kwargs.get('first_name')
|
|
self.last_name = kwargs.get('last_name')
|
|
self.role = kwargs.get('role', 'user')
|
|
self.is_active = kwargs.get('is_active', True)
|
|
self.created_at = kwargs.get('created_at')
|
|
self.updated_at = kwargs.get('updated_at')
|
|
|
|
@property
|
|
def full_name(self) -> str:
|
|
"""Get user's full name"""
|
|
if self.first_name and self.last_name:
|
|
return f"{self.first_name} {self.last_name}"
|
|
return self.username
|
|
|
|
@property
|
|
def is_admin(self) -> bool:
|
|
"""Check if user is admin"""
|
|
return self.role == 'admin'
|
|
|
|
def set_password(self, password: str) -> None:
|
|
"""Hash and set user password"""
|
|
salt = bcrypt.gensalt()
|
|
self.password_hash = bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
|
|
|
|
def check_password(self, password: str) -> bool:
|
|
"""Check if provided password matches hash"""
|
|
if not self.password_hash:
|
|
return False
|
|
return bcrypt.checkpw(password.encode('utf-8'), self.password_hash.encode('utf-8'))
|
|
|
|
async def save(self) -> int:
|
|
"""Save user to database"""
|
|
if self.id:
|
|
# Update existing user
|
|
query = """
|
|
UPDATE users SET
|
|
username=%s, email=%s, first_name=%s, last_name=%s,
|
|
role=%s, is_active=%s, updated_at=NOW()
|
|
WHERE id=%s
|
|
"""
|
|
params = (self.username, self.email, self.first_name, self.last_name,
|
|
self.role, self.is_active, self.id)
|
|
await db_manager.execute_update(query, params)
|
|
return self.id
|
|
else:
|
|
# Insert new user
|
|
query = """
|
|
INSERT INTO users (username, email, password_hash, first_name, last_name, role, is_active)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
|
"""
|
|
params = (self.username, self.email, self.password_hash, self.first_name,
|
|
self.last_name, self.role, self.is_active)
|
|
user_id = await db_manager.execute_insert(query, params)
|
|
self.id = user_id
|
|
return user_id
|
|
|
|
@classmethod
|
|
async def find_by_id(cls, user_id: int) -> Optional['User']:
|
|
"""Find user by ID"""
|
|
query = "SELECT * FROM users WHERE id = %s AND is_active = TRUE"
|
|
results = await db_manager.execute_query(query, (user_id,))
|
|
if results:
|
|
return cls(**results[0])
|
|
return None
|
|
|
|
@classmethod
|
|
async def find_by_username(cls, username: str) -> Optional['User']:
|
|
"""Find user by username"""
|
|
query = "SELECT * FROM users WHERE username = %s AND is_active = TRUE"
|
|
results = await db_manager.execute_query(query, (username,))
|
|
if results:
|
|
return cls(**results[0])
|
|
return None
|
|
|
|
@classmethod
|
|
async def find_by_email(cls, email: str) -> Optional['User']:
|
|
"""Find user by email"""
|
|
query = "SELECT * FROM users WHERE email = %s AND is_active = TRUE"
|
|
results = await db_manager.execute_query(query, (email,))
|
|
if results:
|
|
return cls(**results[0])
|
|
return None
|
|
|
|
@classmethod
|
|
async def authenticate(cls, username_or_email: str, password: str) -> Optional['User']:
|
|
"""Authenticate user with username/email and password"""
|
|
# Try to find by username first, then by email
|
|
user = await cls.find_by_username(username_or_email)
|
|
if not user:
|
|
user = await cls.find_by_email(username_or_email)
|
|
|
|
if user and user.check_password(password):
|
|
return user
|
|
return None
|
|
|
|
@classmethod
|
|
async def get_all(cls, limit: int = 50, offset: int = 0) -> List['User']:
|
|
"""Get all users with pagination"""
|
|
query = """
|
|
SELECT * FROM users
|
|
ORDER BY created_at DESC
|
|
LIMIT %s OFFSET %s
|
|
"""
|
|
results = await db_manager.execute_query(query, (limit, offset))
|
|
return [cls(**row) for row in results]
|
|
|
|
@classmethod
|
|
async def count(cls) -> int:
|
|
"""Count total users"""
|
|
query = "SELECT COUNT(*) as count FROM users WHERE is_active = TRUE"
|
|
results = await db_manager.execute_query(query)
|
|
return results[0]['count'] if results else 0
|
|
|
|
async def delete(self) -> bool:
|
|
"""Soft delete user (mark as inactive)"""
|
|
query = "UPDATE users SET is_active = FALSE WHERE id = %s"
|
|
affected = await db_manager.execute_update(query, (self.id,))
|
|
return affected > 0
|
|
|
|
def to_dict(self, include_sensitive: bool = False) -> Dict:
|
|
"""Convert user to dictionary"""
|
|
data = {
|
|
'id': self.id,
|
|
'username': self.username,
|
|
'email': self.email,
|
|
'first_name': self.first_name,
|
|
'last_name': self.last_name,
|
|
'full_name': self.full_name,
|
|
'role': self.role,
|
|
'is_active': self.is_active,
|
|
'is_admin': self.is_admin,
|
|
'created_at': self.created_at.isoformat() if self.created_at else None
|
|
}
|
|
|
|
if include_sensitive:
|
|
data['password_hash'] = self.password_hash
|
|
|
|
return data
|