Add missing model files: post.py and category.py

This commit is contained in:
2025-09-21 21:18:54 +02:00
parent 8b7ab9d66e
commit 7ec30b8d5e
5 changed files with 606 additions and 0 deletions

97
models/category.py Normal file
View File

@@ -0,0 +1,97 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Category Model
from typing import Optional, Dict, List
from .database import db_manager
from datetime import datetime
class Category:
def __init__(self, **kwargs):
self.id = kwargs.get('id')
self.name = kwargs.get('name')
self.slug = kwargs.get('slug')
self.description = kwargs.get('description')
self.color = kwargs.get('color', '#007bff')
self.created_at = kwargs.get('created_at')
async def save(self) -> int:
"""Save category to database"""
if self.id:
# Update existing category
query = """
UPDATE categories SET
name=%s, slug=%s, description=%s, color=%s
WHERE id=%s
"""
params = (self.name, self.slug, self.description, self.color, self.id)
await db_manager.execute_update(query, params)
return self.id
else:
# Insert new category
query = """
INSERT INTO categories (name, slug, description, color)
VALUES (%s, %s, %s, %s)
"""
params = (self.name, self.slug, self.description, self.color)
category_id = await db_manager.execute_insert(query, params)
self.id = category_id
return category_id
@classmethod
async def find_by_id(cls, category_id: int) -> Optional['Category']:
"""Find category by ID"""
query = "SELECT * FROM categories WHERE id = %s"
results = await db_manager.execute_query(query, (category_id,))
if results:
return cls(**results[0])
return None
@classmethod
async def find_by_slug(cls, slug: str) -> Optional['Category']:
"""Find category by slug"""
query = "SELECT * FROM categories WHERE slug = %s"
results = await db_manager.execute_query(query, (slug,))
if results:
return cls(**results[0])
return None
@classmethod
async def get_all(cls, limit: int = 50) -> List['Category']:
"""Get all categories"""
query = "SELECT * FROM categories ORDER BY name ASC LIMIT %s"
results = await db_manager.execute_query(query, (limit,))
return [cls(**row) for row in results]
@classmethod
async def count(cls) -> int:
"""Count total categories"""
query = "SELECT COUNT(*) as count FROM categories"
results = await db_manager.execute_query(query)
return results[0]['count'] if results else 0
async def delete(self) -> bool:
"""Delete category"""
query = "DELETE FROM categories WHERE id = %s"
affected = await db_manager.execute_update(query, (self.id,))
return affected > 0
def to_dict(self) -> Dict:
"""Convert category to dictionary"""
return {
'id': self.id,
'name': self.name,
'slug': self.slug,
'description': self.description,
'color': self.color,
'created_at': self.created_at.isoformat() if self.created_at else None
}
@staticmethod
def generate_slug(name: str) -> str:
"""Generate URL-friendly slug from name"""
import re
slug = re.sub(r'[^\w\s-]', '', name).strip().lower()
slug = re.sub(r'[\s_-]+', '-', slug)
return slug

116
models/post.py Normal file
View File

@@ -0,0 +1,116 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Post Model (for future blog functionality)
from typing import Optional, Dict, List
from .database import db_manager
from datetime import datetime
class Post:
def __init__(self, **kwargs):
self.id = kwargs.get('id')
self.title = kwargs.get('title')
self.slug = kwargs.get('slug')
self.excerpt = kwargs.get('excerpt')
self.content = kwargs.get('content')
self.featured_image = kwargs.get('featured_image')
self.category_id = kwargs.get('category_id')
self.author_id = kwargs.get('author_id')
self.is_published = kwargs.get('is_published', False)
self.published_at = kwargs.get('published_at')
self.created_at = kwargs.get('created_at')
self.updated_at = kwargs.get('updated_at')
async def save(self) -> int:
"""Save post to database"""
if self.id:
# Update existing post
query = """
UPDATE posts SET
title=%s, slug=%s, excerpt=%s, content=%s, featured_image=%s,
category_id=%s, is_published=%s, published_at=%s, updated_at=NOW()
WHERE id=%s
"""
params = (self.title, self.slug, self.excerpt, self.content, self.featured_image,
self.category_id, self.is_published, self.published_at, self.id)
await db_manager.execute_update(query, params)
return self.id
else:
# Insert new post
query = """
INSERT INTO posts (title, slug, excerpt, content, featured_image, category_id,
author_id, is_published, published_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
params = (self.title, self.slug, self.excerpt, self.content, self.featured_image,
self.category_id, self.author_id, self.is_published, self.published_at)
post_id = await db_manager.execute_insert(query, params)
self.id = post_id
return post_id
@classmethod
async def find_by_id(cls, post_id: int) -> Optional['Post']:
"""Find post by ID"""
query = "SELECT * FROM posts WHERE id = %s"
results = await db_manager.execute_query(query, (post_id,))
if results:
return cls(**results[0])
return None
@classmethod
async def find_by_slug(cls, slug: str) -> Optional['Post']:
"""Find post by slug"""
query = "SELECT * FROM posts WHERE slug = %s AND is_published = TRUE"
results = await db_manager.execute_query(query, (slug,))
if results:
return cls(**results[0])
return None
@classmethod
async def get_all(cls, published_only: bool = True, limit: int = 50, offset: int = 0) -> List['Post']:
"""Get all posts with pagination"""
query = "SELECT * FROM posts"
params = []
if published_only:
query += " WHERE is_published = TRUE"
query += " ORDER BY created_at DESC LIMIT %s OFFSET %s"
params.extend([limit, offset])
results = await db_manager.execute_query(query, tuple(params))
return [cls(**row) for row in results]
@classmethod
async def count(cls, published_only: bool = True) -> int:
"""Count total posts"""
query = "SELECT COUNT(*) as count FROM posts"
if published_only:
query += " WHERE is_published = TRUE"
results = await db_manager.execute_query(query)
return results[0]['count'] if results else 0
async def delete(self) -> bool:
"""Delete post"""
query = "DELETE FROM posts WHERE id = %s"
affected = await db_manager.execute_update(query, (self.id,))
return affected > 0
def to_dict(self) -> Dict:
"""Convert post to dictionary"""
return {
'id': self.id,
'title': self.title,
'slug': self.slug,
'excerpt': self.excerpt,
'content': self.content,
'featured_image': self.featured_image,
'category_id': self.category_id,
'author_id': self.author_id,
'is_published': self.is_published,
'published_at': self.published_at.isoformat() if self.published_at else None,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None
}

127
models/settings.py Normal file
View File

@@ -0,0 +1,127 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Settings Model
import json
from typing import Optional, Dict, List, Any
from .database import db_manager
class Settings:
def __init__(self, **kwargs):
self.id = kwargs.get('id')
self.setting_key = kwargs.get('setting_key')
self.setting_value = kwargs.get('setting_value')
self.description = kwargs.get('description')
self.type = kwargs.get('type', 'text')
self.updated_at = kwargs.get('updated_at')
@property
def parsed_value(self) -> Any:
"""Parse setting value based on type"""
if not self.setting_value:
return None
if self.type == 'boolean':
return self.setting_value.lower() in ('true', '1', 'yes', 'on')
elif self.type == 'json':
try:
return json.loads(self.setting_value)
except:
return {}
else:
return self.setting_value
async def save(self) -> int:
"""Save setting to database"""
if self.id:
# Update existing setting
query = """
UPDATE settings SET
setting_key=%s, setting_value=%s, description=%s, type=%s, updated_at=NOW()
WHERE id=%s
"""
params = (self.setting_key, self.setting_value, self.description, self.type, self.id)
await db_manager.execute_update(query, params)
return self.id
else:
# Insert new setting
query = """
INSERT INTO settings (setting_key, setting_value, description, type)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
setting_value=VALUES(setting_value),
description=VALUES(description),
type=VALUES(type),
updated_at=NOW()
"""
params = (self.setting_key, self.setting_value, self.description, self.type)
setting_id = await db_manager.execute_insert(query, params)
self.id = setting_id or self.id
return self.id
@classmethod
async def get(cls, key: str, default: Any = None) -> Any:
"""Get setting value by key"""
query = "SELECT * FROM settings WHERE setting_key = %s"
results = await db_manager.execute_query(query, (key,))
if results:
setting = cls(**results[0])
return setting.parsed_value
return default
@classmethod
async def set(cls, key: str, value: Any, description: str = '', setting_type: str = 'text') -> bool:
"""Set setting value"""
# Convert value to string based on type
if setting_type == 'boolean':
str_value = 'true' if value else 'false'
elif setting_type == 'json':
str_value = json.dumps(value) if isinstance(value, (dict, list)) else str(value)
else:
str_value = str(value)
setting = cls(
setting_key=key,
setting_value=str_value,
description=description,
type=setting_type
)
try:
await setting.save()
return True
except:
return False
@classmethod
async def get_all(cls) -> List['Settings']:
"""Get all settings"""
query = "SELECT * FROM settings ORDER BY setting_key ASC"
results = await db_manager.execute_query(query)
return [cls(**row) for row in results]
@classmethod
async def get_by_prefix(cls, prefix: str) -> List['Settings']:
"""Get settings by key prefix"""
query = "SELECT * FROM settings WHERE setting_key LIKE %s ORDER BY setting_key ASC"
results = await db_manager.execute_query(query, (f"{prefix}%",))
return [cls(**row) for row in results]
async def delete(self) -> bool:
"""Delete setting"""
query = "DELETE FROM settings WHERE id = %s"
affected = await db_manager.execute_update(query, (self.id,))
return affected > 0
def to_dict(self) -> Dict:
"""Convert setting to dictionary"""
return {
'id': self.id,
'setting_key': self.setting_key,
'setting_value': self.setting_value,
'parsed_value': self.parsed_value,
'description': self.description,
'type': self.type,
'updated_at': self.updated_at.isoformat() if self.updated_at else None
}