from fastapi import FastAPI, HTTPException, File, UploadFile, Query from pydantic import BaseModel import sqlite3 import datetime import csv from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse, StreamingResponse import io app = FastAPI() # Соединение с базой данных conn = sqlite3.connect("/db/rustdesk.db", check_same_thread=False) cursor = conn.cursor() # Проверяем и обновляем структуру таблицы installs cursor.execute("PRAGMA table_info(installs)") columns = [row[1] for row in cursor.fetchall()] if 'protocol' not in columns: cursor.execute("ALTER TABLE installs ADD COLUMN protocol TEXT DEFAULT 'rustdesk'") conn.commit() # Создаем таблицы (добавляем поле protocol, если таблицы нет) cursor.execute(""" CREATE TABLE IF NOT EXISTS folders ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, parent_id INTEGER, FOREIGN KEY (parent_id) REFERENCES folders(id) ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS installs ( id INTEGER PRIMARY KEY AUTOINCREMENT, rust_id TEXT, computer_name TEXT, install_time TEXT, folder_id INTEGER, protocol TEXT DEFAULT 'rustdesk', FOREIGN KEY (folder_id) REFERENCES folders(id) ) """) conn.commit() # Проверяем/создаем папку "Несортированные" и получаем её ID cursor.execute("SELECT id FROM folders WHERE name = 'Несортированные'") unsorted_folder = cursor.fetchone() if not unsorted_folder: cursor.execute("INSERT INTO folders (name) VALUES ('Несортированные')") conn.commit() unsorted_folder_id = cursor.lastrowid else: unsorted_folder_id = unsorted_folder[0] # Модели данных class Folder(BaseModel): name: str parent_id: int | None = None class FolderUpdate(BaseModel): name: str class InstallData(BaseModel): rust_id: str | None = None computer_name: str | None = None install_time: str | None = None # Принимаем строку в формате YYYY-MM-DD HH:MM:SS folder_id: int | None = None protocol: str | None = 'rustdesk' # По умолчанию RustDesk # Монтируем папки templates и icons как статические файлы app.mount("/templates", StaticFiles(directory="templates"), name="templates") app.mount("/icons", StaticFiles(directory="templates/icons"), name="icons") # Главная страница @app.get("/") async def root(): return FileResponse("templates/index.html") # --- Folders API --- @app.get("/api/folders") def get_folders(): cursor.execute("SELECT * FROM folders") rows = cursor.fetchall() return [{"id": row[0], "name": row[1], "parent_id": row[2]} for row in rows] @app.post("/api/folders") def add_folder(folder: Folder): cursor.execute("INSERT INTO folders (name, parent_id) VALUES (?, ?)", (folder.name, folder.parent_id)) conn.commit() return {"status": "success", "id": cursor.lastrowid} @app.put("/api/folders/{folder_id}") def update_folder(folder_id: int, folder: FolderUpdate): cursor.execute("UPDATE folders SET name = ? WHERE id = ?", (folder.name, folder_id)) conn.commit() if cursor.rowcount == 0: raise HTTPException(status_code=404, detail="Папка не найдена") return {"status": "success"} @app.delete("/api/folders/{folder_id}") def delete_folder(folder_id: int): # Проверяем, не является ли папка "Несортированные" cursor.execute("SELECT name FROM folders WHERE id = ?", (folder_id,)) folder_name = cursor.fetchone() if folder_name and folder_name[0] == 'Несортированные': raise HTTPException(status_code=403, detail="Папка 'Несортированные' не может быть удалена") cursor.execute("DELETE FROM folders WHERE id = ?", (folder_id,)) conn.commit() if cursor.rowcount == 0: raise HTTPException(status_code=404, detail="Папка не найдена") return {"status": "success"} # --- Installs API --- @app.get("/api/installs") def get_installs(): cursor.execute(""" SELECT i.id, i.rust_id, i.computer_name, i.install_time, i.folder_id, f.name as folder_name, i.protocol FROM installs i LEFT JOIN folders f ON i.folder_id = f.id """) rows = cursor.fetchall() # Обработка времени в разных форматах def format_time(time_str): if not time_str: return None try: # Пробуем разобрать ISO 8601 (с T и Z) dt = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ") return dt.strftime("%Y-%m-%d %H:%M:%S") except ValueError: try: # Пробуем разобрать наш читаемый формат dt = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S") return dt.strftime("%Y-%m-%d %H:%M:%S") except ValueError: return time_str # Возвращаем как есть, если формат не распознан return [{"id": row[0], "rust_id": row[1], "computer_name": row[2], "install_time": format_time(row[3]), "folder_id": row[4], "folder_name": row[5], "protocol": row[6]} for row in rows] @app.post("/api/install") def add_install(data: InstallData): install_time = data.install_time or datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Форматируем в читаемый формат folder_id = data.folder_id if data.folder_id is not None else unsorted_folder_id protocol = data.protocol or 'rustdesk' # По умолчанию RustDesk для POST-запросов cursor.execute("INSERT INTO installs (rust_id, computer_name, install_time, folder_id, protocol) VALUES (?, ?, ?, ?, ?)", (data.rust_id, data.computer_name, install_time, folder_id, protocol)) conn.commit() return {"status": "success"} @app.put("/api/install/{install_id}") def update_install(install_id: int, data: InstallData): cursor.execute("SELECT rust_id, computer_name, install_time, folder_id, protocol FROM installs WHERE id = ?", (install_id,)) current = cursor.fetchone() if not current: raise HTTPException(status_code=404, detail="Запись не найдена") new_rust_id = data.rust_id if data.rust_id is not None else current[0] new_computer_name = data.computer_name if data.computer_name is not None else current[1] new_install_time = data.install_time if data.install_time is not None else current[2] new_folder_id = data.folder_id if data.folder_id is not None else current[3] new_protocol = data.protocol if data.protocol is not None else current[4] # Форматируем время, если оно предоставлено в запросе if new_install_time: try: # Проверяем, что время в формате YYYY-MM-DD HH:MM:SS datetime.datetime.strptime(new_install_time, "%Y-%m-%d %H:%M:%S") except ValueError: raise HTTPException(status_code=400, detail="Неверный формат времени. Используйте YYYY-MM-DD HH:MM:SS") else: new_install_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") cursor.execute(""" UPDATE installs SET rust_id = ?, computer_name = ?, install_time = ?, folder_id = ?, protocol = ? WHERE id = ? """, (new_rust_id, new_computer_name, new_install_time, new_folder_id, new_protocol, install_id)) conn.commit() return {"status": "success"} @app.delete("/api/install/{install_id}") def delete_install(install_id: int): cursor.execute("DELETE FROM installs WHERE id = ?", (install_id,)) conn.commit() if cursor.rowcount == 0: raise HTTPException(status_code=404, detail="Запись не найдена") return {"status": "success"} # --- CSV Export/Import API --- @app.get("/api/export/csv") async def export_csv(folder_id: int | None = Query(None, description="ID папки для экспорта, если None - экспортировать все папки")): if folder_id: cursor.execute(""" SELECT i.rust_id, i.computer_name, i.install_time, f.name as folder_name, i.protocol FROM installs i LEFT JOIN folders f ON i.folder_id = f.id WHERE i.folder_id = ? """, (folder_id,)) else: cursor.execute(""" SELECT i.rust_id, i.computer_name, i.install_time, f.name as folder_name, i.protocol FROM installs i LEFT JOIN folders f ON i.folder_id = f.id """) rows = cursor.fetchall() output = io.StringIO() writer = csv.writer(output, lineterminator='\n') writer.writerow(['ID подключения', 'Имя компьютера', 'Время установки', 'Папка', 'Протокол']) for row in rows: # Обработка времени в разных форматах install_time = format_time(row[2]) if row[2] else "" writer.writerow([row[0], row[1], install_time, row[3], row[4]]) headers = { 'Content-Disposition': 'attachment; filename="rustdesk_data.csv"', 'Content-Type': 'text/csv' } return StreamingResponse(iter([output.getvalue()]), headers=headers) def format_time(time_str): if not time_str: return None try: # Пробуем разобрать ISO 8601 (с T и Z) dt = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ") return dt.strftime("%Y-%m-%d %H:%M:%S") except ValueError: try: # Пробуем разобрать наш читаемый формат dt = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S") return dt.strftime("%Y-%m-%d %H:%M:%S") except ValueError: return time_str # Возвращаем как есть, если формат не распознан @app.post("/api/import/csv") async def import_csv(file: UploadFile = File(...), folder_id: int | None = Query(None, description="ID папки для импорта, если None - использовать 'Несортированные'")): try: contents = await file.read() csv_data = io.StringIO(contents.decode('utf-8')) reader = csv.DictReader(csv_data) target_folder_id = folder_id if folder_id is not None else unsorted_folder_id for row in reader: rust_id = row['ID подключения'] computer_name = row['Имя компьютера'] install_time = row['Время установки'] folder_name = row.get('Папка', None) protocol = row.get('Протокол', 'rustdesk') # Проверяем, существует ли запись с таким rust_id cursor.execute("SELECT id FROM installs WHERE rust_id = ?", (rust_id,)) if cursor.fetchone(): continue # Пропускаем дублирующуюся запись # Проверяем и форматируем время if install_time: try: # Пробуем разобрать время в формате YYYY-MM-DD HH:MM:SS dt = datetime.datetime.strptime(install_time, "%Y-%m-%d %H:%M:%S") install_time = dt.strftime("%Y-%m-%d %H:%M:%S") except ValueError: try: # Пробуем разобрать ISO 8601 (если в CSV другой формат) dt = datetime.datetime.strptime(install_time, "%Y-%m-%dT%H:%M:%S.%fZ") install_time = dt.strftime("%Y-%m-%d %H:%M:%S") except ValueError: raise HTTPException(status_code=400, detail=f"Неверный формат времени для записи с ID {rust_id}. Используйте YYYY-MM-DD HH:MM:SS или ISO 8601") # Получаем или создаем ID папки if folder_name: cursor.execute("SELECT id FROM folders WHERE name = ?", (folder_name,)) folder = cursor.fetchone() folder_id = folder[0] if folder else unsorted_folder_id else: folder_id = target_folder_id cursor.execute(""" INSERT INTO installs (rust_id, computer_name, install_time, folder_id, protocol) VALUES (?, ?, ?, ?, ?) """, (rust_id, computer_name, install_time, folder_id, protocol)) conn.commit() return {"status": "success", "message": "Данные успешно импортированы"} except Exception as e: raise HTTPException(status_code=400, detail=f"Ошибка импорта: {str(e)}") # CORS from fastapi.middleware.cors import CORSMiddleware app.add_middleware( CORSMiddleware, allow_origins=["http://10.0.0.10:8001", "http://localhost:8001"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], )