addressbook_web/app.py

406 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from fastapi import FastAPI, HTTPException, File, UploadFile, Form, Query, WebSocket, WebSocketDisconnect
from pydantic import BaseModel
import sqlite3
import datetime
import csv
import markdown
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, StreamingResponse
import io
import os
from fastapi.middleware.cors import CORSMiddleware
from werkzeug.utils import secure_filename
import json
# Создаем два экземпляра FastAPI
web_app = FastAPI(title="Web Interface") # Для веб-интерфейса (порт 8001)
api_app = FastAPI(title="API Endpoints") # Для API (порт 8002)
# Общий код для подключения к базе данных
conn = sqlite3.connect("/db/rustdesk.db", check_same_thread=False)
cursor = conn.cursor()
# Проверяем и обновляем структуру таблицы installs
cursor.execute("PRAGMA table_info(installs)")
columns = [row[1] for row in cursor.fetchall()]
if 'protocol' not in columns:
cursor.execute("ALTER TABLE installs ADD COLUMN protocol TEXT DEFAULT 'rustdesk'")
conn.commit()
if 'note' not in columns:
cursor.execute("ALTER TABLE installs ADD COLUMN note TEXT DEFAULT ''")
conn.commit()
if 'last_seen' not in columns:
cursor.execute("ALTER TABLE installs ADD COLUMN last_seen TEXT DEFAULT NULL")
conn.commit()
# Создаем таблицы
cursor.execute("""
CREATE TABLE IF NOT EXISTS folders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
parent_id INTEGER,
FOREIGN KEY (parent_id) REFERENCES folders(id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS installs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
rust_id TEXT,
computer_name TEXT,
install_time TEXT,
folder_id INTEGER,
protocol TEXT DEFAULT 'rustdesk',
note TEXT DEFAULT '',
last_seen TEXT DEFAULT NULL,
FOREIGN KEY (folder_id) REFERENCES folders(id)
)
""")
conn.commit()
# Проверяем/создаем папку "Несортированные" и получаем её ID
cursor.execute("SELECT id FROM folders WHERE name = 'Несортированные'")
unsorted_folder = cursor.fetchone()
if not unsorted_folder:
cursor.execute("INSERT INTO folders (name) VALUES ('Несортированные')")
conn.commit()
unsorted_folder_id = cursor.lastrowid
else:
unsorted_folder_id = unsorted_folder[0]
# Папка для загрузки изображений
UPLOAD_FOLDER = "/app/uploads"
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
# Монтируем папку uploads для доступа к изображениям
api_app.mount("/uploads", StaticFiles(directory=UPLOAD_FOLDER), name="uploads")
web_app.mount("/uploads", StaticFiles(directory=UPLOAD_FOLDER), name="uploads")
# Модели данных
class Folder(BaseModel):
name: str
parent_id: int | None = None
class FolderUpdate(BaseModel):
name: str
class InstallData(BaseModel):
rust_id: str | None = None
computer_name: str | None = None
install_time: str | None = None
folder_id: int | None = None
protocol: str | None = 'rustdesk'
note: str | None = ''
last_seen: str | None = None
# Функция форматирования времени
def format_time(time_str):
if not time_str:
return None
try:
dt = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ")
return dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
try:
dt = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
return dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
return time_str
# Монтируем папки templates и icons
web_app.mount("/templates", StaticFiles(directory="templates"), name="templates")
web_app.mount("/icons", StaticFiles(directory="templates/icons"), name="icons")
# Веб-интерфейс
@web_app.get("/")
async def root():
return FileResponse("templates/index.html")
# WebSocket для отправки обновлений статуса
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
self.max_connections = 100 # Ограничение числа подключений
async def connect(self, websocket: WebSocket):
if len(self.active_connections) >= self.max_connections:
await websocket.close(code=1008, reason="Too many connections")
return
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections[:]: # Копируем список для безопасного удаления
await connection.send_text(message)
manager = ConnectionManager()
@api_app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(websocket)
# API-эндпоинты
@api_app.get("/api/folders")
def get_folders():
cursor.execute("SELECT * FROM folders")
rows = cursor.fetchall()
return [{"id": row[0], "name": row[1], "parent_id": row[2]} for row in rows]
@api_app.post("/api/folders")
def add_folder(folder: Folder):
cursor.execute("INSERT INTO folders (name, parent_id) VALUES (?, ?)",
(folder.name, folder.parent_id))
conn.commit()
return {"status": "success", "id": cursor.lastrowid}
@api_app.put("/api/folders/{folder_id}")
def update_folder(folder_id: int, folder: FolderUpdate):
cursor.execute("UPDATE folders SET name = ? WHERE id = ?", (folder.name, folder_id))
conn.commit()
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="Папка не найдена")
return {"status": "success"}
@api_app.delete("/api/folders/{folder_id}")
def delete_folder(folder_id: int):
cursor.execute("SELECT name FROM folders WHERE id = ?", (folder_id,))
folder_name = cursor.fetchone()
if folder_name and folder_name[0] == 'Несортированные':
raise HTTPException(status_code=403, detail="Папка 'Несортированные' не может быть удалена")
cursor.execute("DELETE FROM folders WHERE id = ?", (folder_id,))
conn.commit()
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="Папка не найдена")
return {"status": "success"}
@api_app.get("/api/installs")
def get_installs():
cursor.execute("""
SELECT i.id, i.rust_id, i.computer_name, i.install_time, i.folder_id, f.name as folder_name, i.protocol, i.note, i.last_seen
FROM installs i
LEFT JOIN folders f ON i.folder_id = f.id
""")
rows = cursor.fetchall()
return [{"id": row[0], "rust_id": row[1], "computer_name": row[2],
"install_time": format_time(row[3]),
"folder_id": row[4], "folder_name": row[5], "protocol": row[6], "note": row[7], "last_seen": row[8]}
for row in rows]
@api_app.post("/api/install")
async def add_install(data: InstallData):
rust_id = data.rust_id
computer_name = data.computer_name or f"PC_{rust_id}"
install_time = data.install_time or datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
protocol = data.protocol or 'rustdesk'
note = data.note or ''
last_seen = data.last_seen or datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Проверяем, существует ли запись с таким rust_id
cursor.execute("SELECT id, folder_id, protocol, note FROM installs WHERE rust_id = ?", (rust_id,))
existing = cursor.fetchone()
if existing:
# Если запись существует, используем текущий folder_id из базы данных
current_folder_id = existing[1] # Текущий folder_id
current_protocol = existing[2] # Текущий protocol
current_note = existing[3] # Текущая заметка
# Обновляем запись, сохраняя существующий folder_id
cursor.execute("""
UPDATE installs
SET computer_name = ?, install_time = ?, protocol = ?, note = ?, last_seen = ?
WHERE rust_id = ?
""", (computer_name, install_time, current_protocol, current_note, last_seen, rust_id))
else:
# Если записи нет, создаем новую, используя folder_id из запроса или "Несортированные"
folder_id = data.folder_id if data.folder_id is not None else unsorted_folder_id
cursor.execute("""
INSERT INTO installs (rust_id, computer_name, install_time, folder_id, protocol, note, last_seen)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (rust_id, computer_name, install_time, folder_id, protocol, note, last_seen))
conn.commit()
# Получаем обновленную запись для отправки через WebSocket
cursor.execute("""
SELECT id, rust_id, computer_name, install_time, folder_id, protocol, note, last_seen
FROM installs WHERE rust_id = ?
""", (rust_id,))
updated_install = cursor.fetchone()
install_data = {
"id": updated_install[0],
"rust_id": updated_install[1],
"computer_name": updated_install[2],
"install_time": format_time(updated_install[3]),
"folder_id": updated_install[4], # Используем folder_id из базы
"protocol": updated_install[5],
"note": updated_install[6],
"last_seen": updated_install[7]
}
# Отправляем обновленную запись через WebSocket
await manager.broadcast(json.dumps({"type": "update", "data": install_data}))
return {"status": "success"}
@api_app.put("/api/install/{install_id}")
def update_install(install_id: int, data: InstallData):
cursor.execute("SELECT rust_id, computer_name, install_time, folder_id, protocol, note FROM installs WHERE id = ?", (install_id,))
current = cursor.fetchone()
if not current:
raise HTTPException(status_code=404, detail="Запись не найдена")
new_rust_id = data.rust_id if data.rust_id is not None else current[0]
new_computer_name = data.computer_name if data.computer_name is not None else current[1]
new_install_time = data.install_time if data.install_time is not None else current[2]
new_folder_id = data.folder_id if data.folder_id is not None else current[3]
new_protocol = data.protocol if data.protocol is not None else current[4]
new_note = data.note if data.note is not None else current[5]
if new_install_time:
try:
datetime.datetime.strptime(new_install_time, "%Y-%m-%d %H:%M:%S")
except ValueError:
raise HTTPException(status_code=400, detail="Неверный формат времени. Используйте YYYY-MM-DD HH:MM:SS")
else:
new_install_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
cursor.execute("""
UPDATE installs
SET rust_id = ?, computer_name = ?, install_time = ?, folder_id = ?, protocol = ?, note = ?
WHERE id = ?
""", (new_rust_id, new_computer_name, new_install_time, new_folder_id, new_protocol, new_note, install_id))
conn.commit()
return {"status": "success"}
@api_app.delete("/api/install/{install_id}")
def delete_install(install_id: int):
cursor.execute("DELETE FROM installs WHERE id = ?", (install_id,))
conn.commit()
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="Запись не найдена")
return {"status": "success"}
@api_app.get("/api/export/csv")
async def export_csv(folder_id: int | None = Query(None, description="ID папки для экспорта, если None - экспортировать все папки")):
if folder_id:
cursor.execute("""
SELECT i.rust_id, i.computer_name, i.install_time, f.name as folder_name, i.protocol, i.note, i.last_seen
FROM installs i
LEFT JOIN folders f ON i.folder_id = f.id
WHERE i.folder_id = ?
""", (folder_id,))
else:
cursor.execute("""
SELECT i.rust_id, i.computer_name, i.install_time, f.name as folder_name, i.protocol, i.note, i.last_seen
FROM installs i
LEFT JOIN folders f ON i.folder_id = f.id
""")
rows = cursor.fetchall()
output = io.StringIO()
writer = csv.writer(output, lineterminator='\n')
writer.writerow(['ID подключения', 'Имя компьютера', 'Время установки', 'Папка', 'Протокол', 'Заметка', 'Последнее подключение'])
for row in rows:
install_time = format_time(row[2]) if row[2] else ""
last_seen = format_time(row[6]) if row[6] else ""
writer.writerow([row[0], row[1], install_time, row[3], row[4], row[5], last_seen])
headers = {
'Content-Disposition': 'attachment; filename="rustdesk_data.csv"',
'Content-Type': 'text/csv'
}
return StreamingResponse(iter([output.getvalue()]), headers=headers)
@api_app.post("/api/import/csv")
async def import_csv(file: UploadFile = File(...), folder_id: int | None = Query(None, description="ID папки для импорта, если None - использовать 'Несортированные'")):
try:
contents = await file.read()
csv_data = io.StringIO(contents.decode('utf-8'))
reader = csv.DictReader(csv_data)
target_folder_id = folder_id if folder_id is not None else unsorted_folder_id
for row in reader:
rust_id = row['ID подключения']
computer_name = row['Имя компьютера']
install_time = row['Время установки']
folder_name = row.get('Папка', None)
protocol = row.get('Протокол', 'rustdesk')
note = row.get('Заметка', '')
cursor.execute("SELECT id FROM installs WHERE rust_id = ?", (rust_id,))
if cursor.fetchone():
continue
if install_time:
try:
dt = datetime.datetime.strptime(install_time, "%Y-%m-%d %H:%M:%S")
install_time = dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
try:
dt = datetime.datetime.strptime(install_time, "%Y-%m-%dT%H:%M:%S.%fZ")
install_time = dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
raise HTTPException(status_code=400, detail=f"Неверный формат времени для записи с ID {rust_id}. Используйте YYYY-MM-DD HH:MM:SS или ISO 8601")
if folder_name:
cursor.execute("SELECT id FROM folders WHERE name = ?", (folder_name,))
folder = cursor.fetchone()
folder_id = folder[0] if folder else unsorted_folder_id
else:
folder_id = target_folder_id
cursor.execute("""
INSERT INTO installs (rust_id, computer_name, install_time, folder_id, protocol, note)
VALUES (?, ?, ?, ?, ?, ?)
""", (rust_id, computer_name, install_time, folder_id, protocol, note))
conn.commit()
return {"status": "success", "message": "Данные успешно импортированы"}
except Exception as e:
raise HTTPException(status_code=400, detail=f"Ошибка импорта: {str(e)}")
@api_app.post("/api/upload-image")
async def upload_image(install_id: int = Form(...), file: UploadFile = File(...)):
if not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif'}
if not secure_filename(file.filename).rsplit('.', 1)[1].lower() in allowed_extensions:
raise HTTPException(status_code=400, detail="Invalid file format. Use png, jpg, jpeg, or gif")
filename = secure_filename(f"{install_id}_{file.filename}")
file_path = os.path.join(UPLOAD_FOLDER, filename)
with open(file_path, "wb") as buffer:
buffer.write(await file.read())
url = f"/uploads/{filename}"
return {"url": url}
# CORS для API
api_app.add_middleware(
CORSMiddleware,
allow_origins=["https://rd.it-depot.ru"], # Обновлено для HTTPS
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# CORS для веб-интерфейса
web_app.add_middleware(
CORSMiddleware,
allow_origins=["https://rd.it-depot.ru"], # Обновлено для HTTPS
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)