Files
tobiichiGPT/api/server.py
2026-02-01 01:33:52 +08:00

623 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
API 轉接層 - 偽裝 OpenAI API整合 Rocket.Chat 作為管理員回覆介面
"""
from fastapi import FastAPI, Request, Header
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import asyncpg
import asyncio
import time
import uuid
import os
import json
import hashlib
import httpx
from typing import Optional
from datetime import datetime
import hashlib
app = FastAPI()
# 資料庫連接池
db_pool = None
# 資料庫設定
DB_CONFIG = {
"host": os.getenv("DB_HOST", "postgres"),
"port": int(os.getenv("DB_PORT", 5432)),
"database": os.getenv("DB_NAME", "tobiichiGPT"),
"user": os.getenv("DB_USER", "tobiichi3227"),
"password": os.getenv("DB_PASSWORD", "tobiichi_password")
}
# Rocket.Chat 設定
ROCKETCHAT_URL = os.getenv("ROCKETCHAT_URL", "http://rocketchat:3000")
ROCKETCHAT_USER = os.getenv("ROCKETCHAT_USER", "admin")
ROCKETCHAT_PASSWORD = os.getenv("ROCKETCHAT_PASSWORD", "admin")
# 全域認證狀態
rocketchat_auth = None
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
model: str
messages: list[Message]
stream: Optional[bool] = False
async def rocketchat_login():
"""登入 Rocket.Chat 取得認證 Token"""
global rocketchat_auth
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(
f"{ROCKETCHAT_URL}/api/v1/login",
json={
"username": ROCKETCHAT_USER,
"password": ROCKETCHAT_PASSWORD
}
)
if resp.status_code == 200:
data = resp.json()
rocketchat_auth = {
"X-Auth-Token": data["data"]["authToken"],
"X-User-Id": data["data"]["userId"]
}
print(f"✅ Rocket.Chat 登入成功")
return rocketchat_auth
else:
print(f"⚠️ Rocket.Chat 登入失敗: {resp.status_code}")
return None
except Exception as e:
print(f"❌ Rocket.Chat 登入錯誤: {e}")
return None
async def get_or_create_channel(user_id: str, user_name: str = None):
"""取得或創建用戶專屬頻道"""
if not rocketchat_auth:
await rocketchat_login()
if not rocketchat_auth:
return None
# 頻道名稱使用用戶名,如果沒有則用 user_id
display_name = user_name if user_name else f"User-{user_id[:8]}"
# 清理頻道名稱移除空格和特殊字符Rocket.Chat 頻道名不支援)
channel_name = display_name.replace(" ", "-").replace("@", "").lower()[:50] # 限制長度
try:
async with httpx.AsyncClient(timeout=10.0) as client:
# 嘗試創建頻道
resp = await client.post(
f"{ROCKETCHAT_URL}/api/v1/channels.create",
headers=rocketchat_auth,
json={
"name": channel_name,
"members": [] # 空成員列表,只有管理員可見
}
)
if resp.status_code == 200:
data = resp.json()
room_id = data["channel"]["_id"]
print(f"✅ 創建頻道: {channel_name} ({display_name})")
# 設置頻道描述
await client.post(
f"{ROCKETCHAT_URL}/api/v1/channels.setDescription",
headers=rocketchat_auth,
json={
"roomId": room_id,
"description": f"用戶: {display_name} (ID: {user_id})"
}
)
return room_id
# 如果頻道已存在,取得頻道資訊
elif resp.status_code == 400:
resp = await client.get(
f"{ROCKETCHAT_URL}/api/v1/channels.info",
headers=rocketchat_auth,
params={"roomName": channel_name}
)
if resp.status_code == 200:
data = resp.json()
room_id = data["channel"]["_id"]
print(f"✅ 使用現有頻道: {channel_name}")
return room_id
else:
print(f"⚠️ 取得頻道資訊失敗: {resp.status_code}")
return None
else:
print(f"⚠️ 創建頻道失敗: {resp.status_code}")
return None
except Exception as e:
print(f"❌ Rocket.Chat 頻道操作錯誤: {e}")
return None
async def get_or_create_thread(room_id: str, chat_id: str):
"""查找或創建對話的執行緒"""
if not rocketchat_auth:
await rocketchat_login()
if not rocketchat_auth:
return None
try:
async with httpx.AsyncClient(timeout=10.0) as client:
# 搜尋頻道中是否已有此 chat_id 的執行緒搜尋最近50條消息
resp = await client.get(
f"{ROCKETCHAT_URL}/api/v1/channels.messages",
headers=rocketchat_auth,
params={
"roomId": room_id,
"count": 50
}
)
if resp.status_code == 200:
data = resp.json()
messages = data.get("messages", [])
# 找到現有的執行緒起始訊息
for msg in messages:
if f"對話 {chat_id[:8]}" in msg.get("msg", ""):
thread_id = msg["_id"]
print(f"✅ 找到現有執行緒: {thread_id}")
return thread_id
# 沒有找到,返回 None需要創建新執行緒
return None
except Exception as e:
print(f"❌ 查找執行緒錯誤: {e}")
return None
async def create_thread_message(room_id: str, chat_id: str, user_message: str, user_name: str = None):
"""在頻道中發送訊息並創建執行緒"""
if not rocketchat_auth:
await rocketchat_login()
if not rocketchat_auth:
return None
display_name = user_name if user_name else "User"
try:
async with httpx.AsyncClient(timeout=10.0) as client:
# 發送訊息(這會成為執行緒的起始訊息)
resp = await client.post(
f"{ROCKETCHAT_URL}/api/v1/chat.postMessage",
headers=rocketchat_auth,
json={
"roomId": room_id,
"text": f"**[對話 {chat_id[:8]}] - {display_name}**\n\n{user_message}"
}
)
if resp.status_code == 200:
data = resp.json()
message_id = data["message"]["_id"]
print(f"✅ 創建執行緒訊息: {message_id}")
return message_id
else:
print(f"⚠️ 發送訊息失敗: {resp.status_code}")
print(f" 錯誤內容: {resp.text}")
return None
except Exception as e:
print(f"❌ Rocket.Chat 發送訊息錯誤: {e}")
return None
async def add_thread_reply(room_id: str, thread_id: str, user_message: str, user_name: str = None):
"""在現有執行緒中追加用戶訊息"""
if not rocketchat_auth:
await rocketchat_login()
if not rocketchat_auth:
return False
display_name = user_name if user_name else "User"
try:
async with httpx.AsyncClient(timeout=10.0) as client:
# 在執行緒中回覆
resp = await client.post(
f"{ROCKETCHAT_URL}/api/v1/chat.postMessage",
headers=rocketchat_auth,
json={
"roomId": room_id,
"text": f"**{display_name}:**\n\n{user_message}",
"tmid": thread_id # 指定執行緒 ID
}
)
if resp.status_code == 200:
print(f"✅ 追加執行緒訊息")
return True
else:
print(f"⚠️ 追加訊息失敗: {resp.status_code}")
return False
except Exception as e:
print(f"❌ 追加訊息錯誤: {e}")
return False
async def wait_for_thread_reply(room_id: str, thread_id: str, existing_message_count: int = 0, timeout: int = 600):
"""輪詢等待執行緒中的管理員回覆
Args:
existing_message_count: 發送訊息前執行緒中已有的訊息數量,只等待新增的回覆
"""
if not rocketchat_auth:
await rocketchat_login()
if not rocketchat_auth:
return None
start_time = time.time()
check_interval = 3 # 每 3 秒檢查一次
print(f"🔄 開始輪詢執行緒回覆 (thread_id: {thread_id}, 現有訊息: {existing_message_count})")
try:
async with httpx.AsyncClient(timeout=10.0) as client:
while time.time() - start_time < timeout:
# 取得執行緒中的所有訊息
resp = await client.get(
f"{ROCKETCHAT_URL}/api/v1/chat.getThreadMessages",
headers=rocketchat_auth,
params={
"tmid": thread_id # Thread Message ID
}
)
if resp.status_code == 200:
data = resp.json()
messages = data.get("messages", [])
# 只看新增的訊息(比現有數量多的部分)
if len(messages) > existing_message_count + 1: # +1 是我們剛發的訊息
print(f" 檢查執行緒,找到 {len(messages)} 條訊息 (新增: {len(messages) - existing_message_count - 1})")
# 按時間排序,取最新的訊息
sorted_messages = sorted(messages, key=lambda m: m.get("ts", ""), reverse=True)
# 尋找管理員的新回覆
for msg in sorted_messages:
# 跳過原始訊息
if msg["_id"] == thread_id:
continue
# 檢查是否為真人回覆(有 u 欄位且不是 bot
if "u" in msg and not msg.get("bot"):
reply_text = msg.get("msg", "")
# 跳過用戶自己的訊息(檢查是否包含對話標記)
if reply_text and not reply_text.startswith("[對話"):
print(f"✅ 收到管理員回覆: {reply_text[:50]}...")
return reply_text
else:
print(f" ⚠️ API 回應異常: {resp.status_code}")
await asyncio.sleep(check_interval)
# 超時
print(f"⚠️ 等待回覆超時 ({timeout}秒)")
return "抱歉,目前客服繁忙,請稍後再試。"
except Exception as e:
print(f"❌ 檢查回覆錯誤: {e}")
return "系統錯誤,請稍後再試。"
async def get_user_name(user_id: str):
"""從資料庫獲取用戶真實姓名(保留作為工具函數)"""
if not user_id:
return None
try:
async with db_pool.acquire() as conn:
row = await conn.fetchrow(
'SELECT name, email FROM "user" WHERE id = $1',
user_id
)
if row:
name = row['name'] or row['email']
if name:
return name
except Exception as e:
print(f"⚠️ 獲取用戶名稱失敗: {e}")
return None
async def init_db():
"""初始化資料庫連接池和表格"""
global db_pool
db_pool = await asyncpg.create_pool(**DB_CONFIG, min_size=2, max_size=10)
# 建立對話隊列表格(用於記錄和追蹤)
async with db_pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS reply_queue (
id SERIAL PRIMARY KEY,
conversation_id VARCHAR(50) UNIQUE NOT NULL,
user_id VARCHAR(255),
chat_id VARCHAR(255),
user_name VARCHAR(255),
user_message TEXT NOT NULL,
admin_reply TEXT,
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT NOW(),
replied_at TIMESTAMP,
rocketchat_room_id VARCHAR(100),
rocketchat_thread_id VARCHAR(100)
)
""")
# 建立索引
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_status ON reply_queue(status);
CREATE INDEX IF NOT EXISTS idx_conversation_id ON reply_queue(conversation_id);
CREATE INDEX IF NOT EXISTS idx_chat_id ON reply_queue(chat_id);
CREATE INDEX IF NOT EXISTS idx_user_id ON reply_queue(user_id);
""")
@app.on_event("startup")
async def startup():
await init_db()
print("✅ 資料庫連接成功")
# 嘗試登入 Rocket.Chat
await rocketchat_login()
@app.on_event("shutdown")
async def shutdown():
if db_pool:
await db_pool.close()
print("👋 資料庫連接已關閉")
@app.get("/")
async def root():
"""根路徑"""
return {
"status": "ok",
"service": "TobiichiGPT API",
"chat_backend": "Rocket.Chat"
}
@app.get("/v1/models")
async def list_models():
"""模擬 OpenAI 的 /v1/models 端點"""
return {
"object": "list",
"data": [
{
"id": "tobiichiGPT",
"object": "model",
"created": int(time.time()),
"owned_by": "tobiichi",
"permission": [],
"root": "tobiichiGPT",
"parent": None
}
]
}
@app.post("/v1/chat/completions")
async def chat_completions(
request_data: ChatRequest,
http_request: Request,
authorization: Optional[str] = Header(None)
):
"""
模擬 OpenAI Chat Completions API
將用戶訊息轉發到 Rocket.Chat等待管理員回覆
"""
# 取得最後一則用戶訊息
user_message = None
for msg in reversed(request_data.messages):
if msg.role == "user":
user_message = msg.content
break
if not user_message:
return JSONResponse(
status_code=400,
content={"error": "No user message found"}
)
# 從 Open WebUI headers 提取用戶資訊
headers_dict = dict(http_request.headers)
# 調試:輸出所有 headers
print(f"🔍 收到的 Headers:")
for key, value in headers_dict.items():
if 'user' in key.lower() or 'chat' in key.lower() or 'name' in key.lower():
print(f" {key}: {value}")
user_id = headers_dict.get("x-openwebui-user-id")
chat_id = headers_dict.get("x-openwebui-chat-id")
user_name = headers_dict.get("x-openwebui-user-name")
user_email = headers_dict.get("x-openwebui-user-email")
# 如果沒有從 headers 取得,使用備用方案
if not user_id:
user_id = headers_dict.get("x-user-id") or headers_dict.get("user-id")
if not chat_id:
chat_id = headers_dict.get("x-chat-id") or headers_dict.get("chat-id")
# 生成 message ID
message_id = str(uuid.uuid4())
# 如果還是沒有,使用 fallback
if not user_id:
user_id = hashlib.md5(authorization.encode() if authorization else message_id.encode()).hexdigest()[:16]
if not chat_id:
chat_id = message_id
# 過濾 Open WebUI 的系統任務訊息(標題生成、標籤生成、後續問題生成等)
if user_message.strip().startswith("### Task:"):
print(f"⏭️ 跳過系統任務訊息: {user_message[:50]}...")
# 回傳空的 JSON 回應讓 Open WebUI 處理
return {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion",
"created": int(time.time()),
"model": request_data.model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "{}"
},
"finish_reason": "stop"
}
],
"usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
}
print(f"📝 收到訊息")
print(f" 用戶ID: {user_id}")
print(f" 用戶名: {user_name}")
print(f" 對話: {chat_id[:8]}")
print(f" 內容: {user_message[:50]}...")
# 1. 取得或創建用戶頻道
room_id = await get_or_create_channel(user_id, user_name)
if not room_id:
return JSONResponse(
status_code=500,
content={"error": "Failed to create Rocket.Chat channel"}
)
# 2. 查找是否已有此 chat_id 的執行緒
thread_id = await get_or_create_thread(room_id, chat_id)
existing_message_count = 0
if thread_id:
# 已有執行緒,先取得現有訊息數量
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{ROCKETCHAT_URL}/api/v1/chat.getThreadMessages",
headers=rocketchat_auth,
params={"tmid": thread_id}
)
if resp.status_code == 200:
existing_message_count = len(resp.json().get("messages", []))
except Exception as e:
print(f"⚠️ 取得現有訊息數量失敗: {e}")
# 追加訊息
success = await add_thread_reply(room_id, thread_id, user_message, user_name)
if not success:
return JSONResponse(
status_code=500,
content={"error": "Failed to add message to thread"}
)
else:
# 沒有執行緒,創建新的
thread_id = await create_thread_message(room_id, chat_id, user_message, user_name)
if not thread_id:
return JSONResponse(
status_code=500,
content={"error": "Failed to create Rocket.Chat thread"}
)
# 3. 記錄到資料庫
async with db_pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO reply_queue (
conversation_id, user_id, chat_id, user_name, user_message,
status, rocketchat_room_id, rocketchat_thread_id
)
VALUES ($1, $2, $3, $4, $5, 'pending', $6, $7)
""",
message_id, user_id, chat_id, user_name, user_message, room_id, thread_id
)
# 4. 等待管理員在執行緒中回覆(傳入現有訊息數量,只等待新回覆)
admin_reply = await wait_for_thread_reply(room_id, thread_id, existing_message_count)
# 5. 更新資料庫
async with db_pool.acquire() as conn:
await conn.execute(
"""
UPDATE reply_queue
SET admin_reply = $1, status = 'replied', replied_at = NOW()
WHERE conversation_id = $2
""",
admin_reply, message_id
)
print(f"✅ 完成回覆 [chat:{chat_id[:8]}]")
# 6. 回傳 OpenAI 格式的回應
return {
"id": f"chatcmpl-{message_id}",
"object": "chat.completion",
"created": int(time.time()),
"model": request_data.model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": admin_reply
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": len(user_message),
"completion_tokens": len(admin_reply),
"total_tokens": len(user_message) + len(admin_reply)
}
}
@app.get("/health")
async def health_check():
"""健康檢查端點"""
db_status = "ok" if db_pool else "disconnected"
rc_status = "ok" if rocketchat_auth else "not_authenticated"
return {
"status": "healthy",
"database": db_status,
"rocketchat": rc_status
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)