Files
tobiichiGPT/api/server.py
ChenKaiLiuG 76a15ecabb Rebuild
2025-12-20 23:25:42 +08:00

208 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
API 轉接層 - 偽裝 OpenAI API將請求轉為人工回覆隊列
"""
from fastapi import FastAPI
from pydantic import BaseModel
import asyncpg
import asyncio
import time
import uuid
import os
from typing import Optional
from datetime import datetime
app = FastAPI()
# 資料庫連接池
db_pool = None
# 資料庫設定
DB_CONFIG = {
"host": os.getenv("DB_HOST", "postgres"),
"port": int(os.getenv("DB_PORT", 5432)),
"database": os.getenv("DB_NAME", "tobiichi"),
"user": os.getenv("DB_USER", "tobiichi"),
"password": os.getenv("DB_PASSWORD", "tobiichi_password")
}
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
model: str
messages: list[Message]
stream: Optional[bool] = False
async def init_db():
"""初始化資料庫連接池和表格"""
global db_pool
db_pool = await asyncpg.create_pool(**DB_CONFIG, min_size=2, max_size=10)
# 建立對話隊列表格
async with db_pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS reply_queue (
id SERIAL PRIMARY KEY,
conversation_id VARCHAR(50) UNIQUE NOT NULL,
user_message TEXT NOT NULL,
admin_reply TEXT,
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT NOW(),
replied_at TIMESTAMP
)
""")
# 建立索引
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_status ON reply_queue(status);
CREATE INDEX IF NOT EXISTS idx_conversation_id ON reply_queue(conversation_id);
""")
@app.on_event("startup")
async def startup():
await init_db()
print("✅ 資料庫連接成功")
@app.on_event("shutdown")
async def shutdown():
if db_pool:
await db_pool.close()
@app.get("/")
async def root():
return {
"status": "TobiichiGPT API Running",
"database": "Connected" if db_pool else "Disconnected"
}
@app.get("/v1/models")
async def list_models():
"""模擬 OpenAI 模型列表"""
return {
"object": "list",
"data": [{
"id": "human-admin",
"object": "model",
"created": int(time.time()),
"owned_by": "tobiichi"
}]
}
@app.post("/v1/chat/completions")
async def chat_completions(request: ChatRequest):
"""接收用戶訊息,等待管理員回覆"""
# 生成對話 ID
conv_id = str(uuid.uuid4())[:12]
# 提取用戶訊息
user_message = ""
for msg in reversed(request.messages):
if msg.role == "user":
user_message = msg.content
break
if not user_message:
return {
"id": f"chatcmpl-{conv_id}",
"object": "chat.completion",
"created": int(time.time()),
"model": request.model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": "無法識別您的訊息,請重新輸入。"
},
"finish_reason": "stop"
}]
}
# 寫入資料庫
async with db_pool.acquire() as conn:
await conn.execute("""
INSERT INTO reply_queue (conversation_id, user_message, status)
VALUES ($1, $2, 'pending')
""", conv_id, user_message)
print(f"[新對話 {conv_id}] {user_message[:50]}...")
# 等待管理員回覆(最多 15 分鐘)
timeout = 900
start_time = time.time()
reply_content = None
while time.time() - start_time < timeout:
async with db_pool.acquire() as conn:
row = await conn.fetchrow("""
SELECT admin_reply, status
FROM reply_queue
WHERE conversation_id = $1
""", conv_id)
if row and row['status'] == 'replied' and row['admin_reply']:
reply_content = row['admin_reply']
print(f"[已回覆 {conv_id}] {reply_content[:50]}...")
break
await asyncio.sleep(3) # 每 3 秒檢查一次
# 如果超時
if not reply_content:
reply_content = "抱歉,管理員暫時無法回覆,請稍後再試。"
async with db_pool.acquire() as conn:
await conn.execute("""
UPDATE reply_queue
SET status = 'timeout'
WHERE conversation_id = $1
""", conv_id)
return {
"id": f"chatcmpl-{conv_id}",
"object": "chat.completion",
"created": int(time.time()),
"model": request.model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": reply_content
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": len(user_message),
"completion_tokens": len(reply_content) if reply_content else 0,
"total_tokens": len(user_message) + (len(reply_content) if reply_content else 0)
}
}
@app.get("/health")
async def health():
"""健康檢查"""
db_ok = db_pool is not None
return {"status": "healthy" if db_ok else "unhealthy", "database": db_ok}
if __name__ == "__main__":
import uvicorn
print("=" * 60)
print("🚀 TobiichiGPT API 啟動中...")
print("=" * 60)
print(f"📊 資料庫: {DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}")
print(f"🌐 API 端點: http://0.0.0.0:8000/v1")
print("=" * 60)
uvicorn.run(app, host="0.0.0.0", port=8000)