addressbook_web/app.py

329 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from fastapi import FastAPI, HTTPException, File, UploadFile, Query
from pydantic import BaseModel
import sqlite3
import datetime
import csv
import markdown
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, StreamingResponse
import io
import os
from fastapi.middleware.cors import CORSMiddleware
from werkzeug.utils import secure_filename
# Создаем два экземпляра FastAPI
web_app = FastAPI(title="Web Interface") # Для веб-интерфейса (порт 8001)
api_app = FastAPI(title="API Endpoints") # Для API (порт 8002)
# Общий код для подключения к базе данных
conn = sqlite3.connect("/db/rustdesk.db", check_same_thread=False)
cursor = conn.cursor()
# Проверяем и обновляем структуру таблицы installs
cursor.execute("PRAGMA table_info(installs)")
columns = [row[1] for row in cursor.fetchall()]
if 'protocol' not in columns:
cursor.execute("ALTER TABLE installs ADD COLUMN protocol TEXT DEFAULT 'rustdesk'")
conn.commit()
if 'note' not in columns:
cursor.execute("ALTER TABLE installs ADD COLUMN note TEXT DEFAULT ''")
conn.commit()
# Создаем таблицы
cursor.execute("""
CREATE TABLE IF NOT EXISTS folders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
parent_id INTEGER,
FOREIGN KEY (parent_id) REFERENCES folders(id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS installs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
rust_id TEXT,
computer_name TEXT,
install_time TEXT,
folder_id INTEGER,
protocol TEXT DEFAULT 'rustdesk',
note TEXT DEFAULT '',
FOREIGN KEY (folder_id) REFERENCES folders(id)
)
""")
conn.commit()
# Проверяем/создаем папку "Несортированные" и получаем её ID
cursor.execute("SELECT id FROM folders WHERE name = 'Несортированные'")
unsorted_folder = cursor.fetchone()
if not unsorted_folder:
cursor.execute("INSERT INTO folders (name) VALUES ('Несортированные')")
conn.commit()
unsorted_folder_id = cursor.lastrowid
else:
unsorted_folder_id = unsorted_folder[0]
# Папка для загрузки изображений
UPLOAD_FOLDER = "/app/uploads"
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
# Монтируем папку uploads для доступа к изображениям
api_app.mount("/uploads", StaticFiles(directory=UPLOAD_FOLDER), name="uploads")
web_app.mount("/uploads", StaticFiles(directory=UPLOAD_FOLDER), name="uploads")
# Модели данных
class Folder(BaseModel):
name: str
parent_id: int | None = None
class FolderUpdate(BaseModel):
name: str
class InstallData(BaseModel):
rust_id: str | None = None
computer_name: str | None = None
install_time: str | None = None
folder_id: int | None = None
protocol: str | None = 'rustdesk'
note: str | None = ''
# Функция форматирования времени
def format_time(time_str):
if not time_str:
return None
try:
dt = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ")
return dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
try:
dt = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
return dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
return time_str
# Монтируем папки templates и icons
web_app.mount("/templates", StaticFiles(directory="templates"), name="templates")
web_app.mount("/icons", StaticFiles(directory="templates/icons"), name="icons")
# Веб-интерфейс
@web_app.get("/")
async def root():
return FileResponse("templates/index.html")
# API-эндпоинты
@api_app.get("/api/folders")
def get_folders():
cursor.execute("SELECT * FROM folders")
rows = cursor.fetchall()
return [{"id": row[0], "name": row[1], "parent_id": row[2]} for row in rows]
@api_app.post("/api/folders")
def add_folder(folder: Folder):
cursor.execute("INSERT INTO folders (name, parent_id) VALUES (?, ?)",
(folder.name, folder.parent_id))
conn.commit()
return {"status": "success", "id": cursor.lastrowid}
@api_app.put("/api/folders/{folder_id}")
def update_folder(folder_id: int, folder: FolderUpdate):
cursor.execute("UPDATE folders SET name = ? WHERE id = ?", (folder.name, folder_id))
conn.commit()
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="Папка не найдена")
return {"status": "success"}
@api_app.delete("/api/folders/{folder_id}")
def delete_folder(folder_id: int):
cursor.execute("SELECT name FROM folders WHERE id = ?", (folder_id,))
folder_name = cursor.fetchone()
if folder_name and folder_name[0] == 'Несортированные':
raise HTTPException(status_code=403, detail="Папка 'Несортированные' не может быть удалена")
cursor.execute("DELETE FROM folders WHERE id = ?", (folder_id,))
conn.commit()
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="Папка не найдена")
return {"status": "success"}
@api_app.get("/api/installs")
def get_installs():
cursor.execute("""
SELECT i.id, i.rust_id, i.computer_name, i.install_time, i.folder_id, f.name as folder_name, i.protocol, i.note
FROM installs i
LEFT JOIN folders f ON i.folder_id = f.id
""")
rows = cursor.fetchall()
return [{"id": row[0], "rust_id": row[1], "computer_name": row[2],
"install_time": format_time(row[3]),
"folder_id": row[4], "folder_name": row[5], "protocol": row[6], "note": row[7]}
for row in rows]
@api_app.post("/api/install")
def add_install(data: InstallData):
install_time = data.install_time or datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
folder_id = data.folder_id if data.folder_id is not None else unsorted_folder_id
protocol = data.protocol or 'rustdesk'
note = data.note or ''
cursor.execute("INSERT INTO installs (rust_id, computer_name, install_time, folder_id, protocol, note) VALUES (?, ?, ?, ?, ?, ?)",
(data.rust_id, data.computer_name, install_time, folder_id, protocol, note))
conn.commit()
return {"status": "success"}
@api_app.put("/api/install/{install_id}")
def update_install(install_id: int, data: InstallData):
cursor.execute("SELECT rust_id, computer_name, install_time, folder_id, protocol, note FROM installs WHERE id = ?", (install_id,))
current = cursor.fetchone()
if not current:
raise HTTPException(status_code=404, detail="Запись не найдена")
new_rust_id = data.rust_id if data.rust_id is not None else current[0]
new_computer_name = data.computer_name if data.computer_name is not None else current[1]
new_install_time = data.install_time if data.install_time is not None else current[2]
new_folder_id = data.folder_id if data.folder_id is not None else current[3]
# Сохраняем текущее значение protocol, если новое не передано
new_protocol = data.protocol if data.protocol is not None else current[4]
new_note = data.note if data.note is not None else current[5]
if new_install_time:
try:
datetime.datetime.strptime(new_install_time, "%Y-%m-%d %H:%M:%S")
except ValueError:
raise HTTPException(status_code=400, detail="Неверный формат времени. Используйте YYYY-MM-DD HH:MM:SS")
else:
new_install_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
cursor.execute("""
UPDATE installs
SET rust_id = ?, computer_name = ?, install_time = ?, folder_id = ?, protocol = ?, note = ?
WHERE id = ?
""", (new_rust_id, new_computer_name, new_install_time, new_folder_id, new_protocol, new_note, install_id))
conn.commit()
return {"status": "success"}
@api_app.delete("/api/install/{install_id}")
def delete_install(install_id: int):
cursor.execute("DELETE FROM installs WHERE id = ?", (install_id,))
conn.commit()
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="Запись не найдена")
return {"status": "success"}
@api_app.get("/api/export/csv")
async def export_csv(folder_id: int | None = Query(None, description="ID папки для экспорта, если None - экспортировать все папки")):
if folder_id:
cursor.execute("""
SELECT i.rust_id, i.computer_name, i.install_time, f.name as folder_name, i.protocol, i.note
FROM installs i
LEFT JOIN folders f ON i.folder_id = f.id
WHERE i.folder_id = ?
""", (folder_id,))
else:
cursor.execute("""
SELECT i.rust_id, i.computer_name, i.install_time, f.name as folder_name, i.protocol, i.note
FROM installs i
LEFT JOIN folders f ON i.folder_id = f.id
""")
rows = cursor.fetchall()
output = io.StringIO()
writer = csv.writer(output, lineterminator='\n')
writer.writerow(['ID подключения', 'Имя компьютера', 'Время установки', 'Папка', 'Протокол', 'Заметка'])
for row in rows:
install_time = format_time(row[2]) if row[2] else ""
writer.writerow([row[0], row[1], install_time, row[3], row[4], row[5]])
headers = {
'Content-Disposition': 'attachment; filename="rustdesk_data.csv"',
'Content-Type': 'text/csv'
}
return StreamingResponse(iter([output.getvalue()]), headers=headers)
@api_app.post("/api/import/csv")
async def import_csv(file: UploadFile = File(...), folder_id: int | None = Query(None, description="ID папки для импорта, если None - использовать 'Несортированные'")):
try:
contents = await file.read()
csv_data = io.StringIO(contents.decode('utf-8'))
reader = csv.DictReader(csv_data)
target_folder_id = folder_id if folder_id is not None else unsorted_folder_id
for row in reader:
rust_id = row['ID подключения']
computer_name = row['Имя компьютера']
install_time = row['Время установки']
folder_name = row.get('Папка', None)
protocol = row.get('Протокол', 'rustdesk')
note = row.get('Заметка', '')
cursor.execute("SELECT id FROM installs WHERE rust_id = ?", (rust_id,))
if cursor.fetchone():
continue
if install_time:
try:
dt = datetime.datetime.strptime(install_time, "%Y-%m-%d %H:%M:%S")
install_time = dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
try:
dt = datetime.datetime.strptime(install_time, "%Y-%m-%dT%H:%M:%S.%fZ")
install_time = dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
raise HTTPException(status_code=400, detail=f"Неверный формат времени для записи с ID {rust_id}. Используйте YYYY-MM-DD HH:MM:SS или ISO 8601")
if folder_name:
cursor.execute("SELECT id FROM folders WHERE name = ?", (folder_name,))
folder = cursor.fetchone()
folder_id = folder[0] if folder else unsorted_folder_id
else:
folder_id = target_folder_id
cursor.execute("""
INSERT INTO installs (rust_id, computer_name, install_time, folder_id, protocol, note)
VALUES (?, ?, ?, ?, ?, ?)
""", (rust_id, computer_name, install_time, folder_id, protocol, note))
conn.commit()
return {"status": "success", "message": "Данные успешно импортированы"}
except Exception as e:
raise HTTPException(status_code=400, detail=f"Ошибка импорта: {str(e)}")
# Новый эндпоинт для загрузки изображений
@api_app.post("/api/upload-image")
async def upload_image(install_id: int = Query(..., description="ID записи для ассоциации изображения"), file: UploadFile = File(...)):
if not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
# Проверка расширения файла
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif'}
if not secure_filename(file.filename).rsplit('.', 1)[1].lower() in allowed_extensions:
raise HTTPException(status_code=400, detail="Invalid file format. Use png, jpg, jpeg, or gif")
# Генерация безопасного имени файла
filename = secure_filename(f"{install_id}_{file.filename}")
file_path = os.path.join(UPLOAD_FOLDER, filename)
# Сохранение файла
with open(file_path, "wb") as buffer:
buffer.write(await file.read())
# Возвращаем URL изображения
url = f"/uploads/{filename}"
return {"url": url}
# CORS для API
api_app.add_middleware(
CORSMiddleware,
allow_origins=["http://10.0.0.10:8001", "http://localhost:8001"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# CORS для веб-интерфейса
web_app.add_middleware(
CORSMiddleware,
allow_origins=["http://10.0.0.10:8001", "http://localhost:8001"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)