""" API 轉接層 - 偽裝 OpenAI API,整合 Rocket.Chat 作為管理員回覆介面 """ from fastapi import FastAPI, Request, Header from fastapi.responses import JSONResponse from pydantic import BaseModel import asyncpg import asyncio import time import uuid import os import json import hashlib import httpx from typing import Optional from datetime import datetime import hashlib app = FastAPI() # 資料庫連接池 db_pool = None # 資料庫設定 DB_CONFIG = { "host": os.getenv("DB_HOST", "postgres"), "port": int(os.getenv("DB_PORT", 5432)), "database": os.getenv("DB_NAME", "tobiichiGPT"), "user": os.getenv("DB_USER", "tobiichi3227"), "password": os.getenv("DB_PASSWORD", "tobiichi_password") } # Rocket.Chat 設定 ROCKETCHAT_URL = os.getenv("ROCKETCHAT_URL", "http://rocketchat:3000") ROCKETCHAT_USER = os.getenv("ROCKETCHAT_USER", "admin") ROCKETCHAT_PASSWORD = os.getenv("ROCKETCHAT_PASSWORD", "admin") # 全域認證狀態 rocketchat_auth = None class Message(BaseModel): role: str content: str class ChatRequest(BaseModel): model: str messages: list[Message] stream: Optional[bool] = False async def rocketchat_login(): """登入 Rocket.Chat 取得認證 Token""" global rocketchat_auth try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.post( f"{ROCKETCHAT_URL}/api/v1/login", json={ "username": ROCKETCHAT_USER, "password": ROCKETCHAT_PASSWORD } ) if resp.status_code == 200: data = resp.json() rocketchat_auth = { "X-Auth-Token": data["data"]["authToken"], "X-User-Id": data["data"]["userId"] } print(f"✅ Rocket.Chat 登入成功") return rocketchat_auth else: print(f"⚠️ Rocket.Chat 登入失敗: {resp.status_code}") return None except Exception as e: print(f"❌ Rocket.Chat 登入錯誤: {e}") return None async def get_or_create_channel(user_id: str, user_name: str = None): """取得或創建用戶專屬頻道""" if not rocketchat_auth: await rocketchat_login() if not rocketchat_auth: return None # 頻道名稱使用用戶名,如果沒有則用 user_id display_name = user_name if user_name else f"User-{user_id[:8]}" # 清理頻道名稱(移除空格和特殊字符,Rocket.Chat 頻道名不支援) channel_name = display_name.replace(" ", "-").replace("@", "").lower()[:50] # 限制長度 try: async with httpx.AsyncClient(timeout=10.0) as client: # 嘗試創建頻道 resp = await client.post( f"{ROCKETCHAT_URL}/api/v1/channels.create", headers=rocketchat_auth, json={ "name": channel_name, "members": [] # 空成員列表,只有管理員可見 } ) if resp.status_code == 200: data = resp.json() room_id = data["channel"]["_id"] print(f"✅ 創建頻道: {channel_name} ({display_name})") # 設置頻道描述 await client.post( f"{ROCKETCHAT_URL}/api/v1/channels.setDescription", headers=rocketchat_auth, json={ "roomId": room_id, "description": f"用戶: {display_name} (ID: {user_id})" } ) return room_id # 如果頻道已存在,取得頻道資訊 elif resp.status_code == 400: resp = await client.get( f"{ROCKETCHAT_URL}/api/v1/channels.info", headers=rocketchat_auth, params={"roomName": channel_name} ) if resp.status_code == 200: data = resp.json() room_id = data["channel"]["_id"] print(f"✅ 使用現有頻道: {channel_name}") return room_id else: print(f"⚠️ 取得頻道資訊失敗: {resp.status_code}") return None else: print(f"⚠️ 創建頻道失敗: {resp.status_code}") return None except Exception as e: print(f"❌ Rocket.Chat 頻道操作錯誤: {e}") return None async def get_or_create_thread(room_id: str, chat_id: str): """查找或創建對話的執行緒""" if not rocketchat_auth: await rocketchat_login() if not rocketchat_auth: return None try: async with httpx.AsyncClient(timeout=10.0) as client: # 搜尋頻道中是否已有此 chat_id 的執行緒(搜尋最近50條消息) resp = await client.get( f"{ROCKETCHAT_URL}/api/v1/channels.messages", headers=rocketchat_auth, params={ "roomId": room_id, "count": 50 } ) if resp.status_code == 200: data = resp.json() messages = data.get("messages", []) # 找到現有的執行緒起始訊息 for msg in messages: if f"對話 {chat_id[:8]}" in msg.get("msg", ""): thread_id = msg["_id"] print(f"✅ 找到現有執行緒: {thread_id}") return thread_id # 沒有找到,返回 None(需要創建新執行緒) return None except Exception as e: print(f"❌ 查找執行緒錯誤: {e}") return None async def create_thread_message(room_id: str, chat_id: str, user_message: str, user_name: str = None): """在頻道中發送訊息並創建執行緒""" if not rocketchat_auth: await rocketchat_login() if not rocketchat_auth: return None display_name = user_name if user_name else "User" try: async with httpx.AsyncClient(timeout=10.0) as client: # 發送訊息(這會成為執行緒的起始訊息) resp = await client.post( f"{ROCKETCHAT_URL}/api/v1/chat.postMessage", headers=rocketchat_auth, json={ "roomId": room_id, "text": f"**[對話 {chat_id[:8]}] - {display_name}**\n\n{user_message}" } ) if resp.status_code == 200: data = resp.json() message_id = data["message"]["_id"] print(f"✅ 創建執行緒訊息: {message_id}") return message_id else: print(f"⚠️ 發送訊息失敗: {resp.status_code}") print(f" 錯誤內容: {resp.text}") return None except Exception as e: print(f"❌ Rocket.Chat 發送訊息錯誤: {e}") return None async def add_thread_reply(room_id: str, thread_id: str, user_message: str, user_name: str = None): """在現有執行緒中追加用戶訊息""" if not rocketchat_auth: await rocketchat_login() if not rocketchat_auth: return False display_name = user_name if user_name else "User" try: async with httpx.AsyncClient(timeout=10.0) as client: # 在執行緒中回覆 resp = await client.post( f"{ROCKETCHAT_URL}/api/v1/chat.postMessage", headers=rocketchat_auth, json={ "roomId": room_id, "text": f"**{display_name}:**\n\n{user_message}", "tmid": thread_id # 指定執行緒 ID } ) if resp.status_code == 200: print(f"✅ 追加執行緒訊息") return True else: print(f"⚠️ 追加訊息失敗: {resp.status_code}") return False except Exception as e: print(f"❌ 追加訊息錯誤: {e}") return False async def wait_for_thread_reply(room_id: str, thread_id: str, existing_message_count: int = 0, timeout: int = 600): """輪詢等待執行緒中的管理員回覆 Args: existing_message_count: 發送訊息前執行緒中已有的訊息數量,只等待新增的回覆 """ if not rocketchat_auth: await rocketchat_login() if not rocketchat_auth: return None start_time = time.time() check_interval = 3 # 每 3 秒檢查一次 # 取得 bot 帳號的用戶名,用於過濾 bot_username = ROCKETCHAT_USER print(f"🔄 開始輪詢執行緒回覆 (thread_id: {thread_id}, 現有訊息: {existing_message_count})") try: async with httpx.AsyncClient(timeout=10.0) as client: while time.time() - start_time < timeout: # 取得執行緒中的所有訊息 resp = await client.get( f"{ROCKETCHAT_URL}/api/v1/chat.getThreadMessages", headers=rocketchat_auth, params={ "tmid": thread_id # Thread Message ID } ) if resp.status_code == 200: data = resp.json() messages = data.get("messages", []) # 只看新增的訊息(比現有數量多的部分) if len(messages) > existing_message_count + 1: # +1 是我們剛發的訊息 print(f" 檢查執行緒,找到 {len(messages)} 條訊息 (新增: {len(messages) - existing_message_count - 1})") # 按時間排序,取最新的訊息 sorted_messages = sorted(messages, key=lambda m: m.get("ts", ""), reverse=True) # 尋找管理員的新回覆(不是 bot 發送的) for msg in sorted_messages: # 跳過原始訊息 if msg["_id"] == thread_id: continue # 檢查發送者 sender = msg.get("u", {}) sender_username = sender.get("username", "") # 跳過 bot 自己發送的訊息(系統轉發的用戶訊息) if sender_username == bot_username: continue # 檢查是否為真人回覆(有 u 欄位且不是 bot 標記) if sender and not msg.get("bot"): reply_text = msg.get("msg", "") if reply_text: print(f"✅ 收到管理員回覆 (from: {sender_username}): {reply_text[:50]}...") return reply_text else: print(f" ⚠️ API 回應異常: {resp.status_code}") await asyncio.sleep(check_interval) # 超時 print(f"⚠️ 等待回覆超時 ({timeout}秒)") return "抱歉,目前客服繁忙,請稍後再試。" except Exception as e: print(f"❌ 檢查回覆錯誤: {e}") return "系統錯誤,請稍後再試。" async def get_user_name(user_id: str): """從資料庫獲取用戶真實姓名(保留作為工具函數)""" if not user_id: return None try: async with db_pool.acquire() as conn: row = await conn.fetchrow( 'SELECT name, email FROM "user" WHERE id = $1', user_id ) if row: name = row['name'] or row['email'] if name: return name except Exception as e: print(f"⚠️ 獲取用戶名稱失敗: {e}") return None async def init_db(): """初始化資料庫連接池和表格""" global db_pool db_pool = await asyncpg.create_pool(**DB_CONFIG, min_size=2, max_size=10) # 建立對話隊列表格(用於記錄和追蹤) async with db_pool.acquire() as conn: await conn.execute(""" CREATE TABLE IF NOT EXISTS reply_queue ( id SERIAL PRIMARY KEY, conversation_id VARCHAR(50) UNIQUE NOT NULL, user_id VARCHAR(255), chat_id VARCHAR(255), user_name VARCHAR(255), user_message TEXT NOT NULL, admin_reply TEXT, status VARCHAR(20) DEFAULT 'pending', created_at TIMESTAMP DEFAULT NOW(), replied_at TIMESTAMP, rocketchat_room_id VARCHAR(100), rocketchat_thread_id VARCHAR(100) ) """) # 建立索引 await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_status ON reply_queue(status); CREATE INDEX IF NOT EXISTS idx_conversation_id ON reply_queue(conversation_id); CREATE INDEX IF NOT EXISTS idx_chat_id ON reply_queue(chat_id); CREATE INDEX IF NOT EXISTS idx_user_id ON reply_queue(user_id); """) @app.on_event("startup") async def startup(): await init_db() print("✅ 資料庫連接成功") # 嘗試登入 Rocket.Chat await rocketchat_login() @app.on_event("shutdown") async def shutdown(): if db_pool: await db_pool.close() print("👋 資料庫連接已關閉") @app.get("/") async def root(): """根路徑""" return { "status": "ok", "service": "TobiichiGPT API", "chat_backend": "Rocket.Chat" } @app.get("/v1/models") async def list_models(): """模擬 OpenAI 的 /v1/models 端點""" return { "object": "list", "data": [ { "id": "tobiichiGPT", "object": "model", "created": int(time.time()), "owned_by": "tobiichi", "permission": [], "root": "tobiichiGPT", "parent": None } ] } @app.post("/v1/chat/completions") async def chat_completions( request_data: ChatRequest, http_request: Request, authorization: Optional[str] = Header(None) ): """ 模擬 OpenAI Chat Completions API 將用戶訊息轉發到 Rocket.Chat,等待管理員回覆 """ # 取得最後一則用戶訊息 user_message = None for msg in reversed(request_data.messages): if msg.role == "user": user_message = msg.content break if not user_message: return JSONResponse( status_code=400, content={"error": "No user message found"} ) # 從 Open WebUI headers 提取用戶資訊 headers_dict = dict(http_request.headers) # 調試:輸出所有 headers print(f"🔍 收到的 Headers:") for key, value in headers_dict.items(): if 'user' in key.lower() or 'chat' in key.lower() or 'name' in key.lower(): print(f" {key}: {value}") user_id = headers_dict.get("x-openwebui-user-id") chat_id = headers_dict.get("x-openwebui-chat-id") user_name = headers_dict.get("x-openwebui-user-name") user_email = headers_dict.get("x-openwebui-user-email") # 如果沒有從 headers 取得,使用備用方案 if not user_id: user_id = headers_dict.get("x-user-id") or headers_dict.get("user-id") if not chat_id: chat_id = headers_dict.get("x-chat-id") or headers_dict.get("chat-id") # 生成 message ID message_id = str(uuid.uuid4()) # 如果還是沒有,使用 fallback if not user_id: user_id = hashlib.md5(authorization.encode() if authorization else message_id.encode()).hexdigest()[:16] if not chat_id: chat_id = message_id # 過濾 Open WebUI 的系統任務訊息(標題生成、標籤生成、後續問題生成等) if user_message.strip().startswith("### Task:"): print(f"⏭️ 跳過系統任務訊息: {user_message[:50]}...") # 回傳空的 JSON 回應讓 Open WebUI 處理 return { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion", "created": int(time.time()), "model": request_data.model, "choices": [ { "index": 0, "message": { "role": "assistant", "content": "{}" }, "finish_reason": "stop" } ], "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} } print(f"📝 收到訊息") print(f" 用戶ID: {user_id}") print(f" 用戶名: {user_name}") print(f" 對話: {chat_id[:8]}") print(f" 內容: {user_message[:50]}...") # 1. 取得或創建用戶頻道 room_id = await get_or_create_channel(user_id, user_name) if not room_id: return JSONResponse( status_code=500, content={"error": "Failed to create Rocket.Chat channel"} ) # 2. 查找是否已有此 chat_id 的執行緒 thread_id = await get_or_create_thread(room_id, chat_id) existing_message_count = 0 if thread_id: # 已有執行緒,先取得現有訊息數量 try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.get( f"{ROCKETCHAT_URL}/api/v1/chat.getThreadMessages", headers=rocketchat_auth, params={"tmid": thread_id} ) if resp.status_code == 200: existing_message_count = len(resp.json().get("messages", [])) except Exception as e: print(f"⚠️ 取得現有訊息數量失敗: {e}") # 追加訊息 success = await add_thread_reply(room_id, thread_id, user_message, user_name) if not success: return JSONResponse( status_code=500, content={"error": "Failed to add message to thread"} ) else: # 沒有執行緒,創建新的 thread_id = await create_thread_message(room_id, chat_id, user_message, user_name) if not thread_id: return JSONResponse( status_code=500, content={"error": "Failed to create Rocket.Chat thread"} ) # 3. 記錄到資料庫 async with db_pool.acquire() as conn: await conn.execute( """ INSERT INTO reply_queue ( conversation_id, user_id, chat_id, user_name, user_message, status, rocketchat_room_id, rocketchat_thread_id ) VALUES ($1, $2, $3, $4, $5, 'pending', $6, $7) """, message_id, user_id, chat_id, user_name, user_message, room_id, thread_id ) # 4. 等待管理員在執行緒中回覆(傳入現有訊息數量,只等待新回覆) admin_reply = await wait_for_thread_reply(room_id, thread_id, existing_message_count) # 5. 更新資料庫 async with db_pool.acquire() as conn: await conn.execute( """ UPDATE reply_queue SET admin_reply = $1, status = 'replied', replied_at = NOW() WHERE conversation_id = $2 """, admin_reply, message_id ) print(f"✅ 完成回覆 [chat:{chat_id[:8]}]") # 6. 回傳 OpenAI 格式的回應 return { "id": f"chatcmpl-{message_id}", "object": "chat.completion", "created": int(time.time()), "model": request_data.model, "choices": [ { "index": 0, "message": { "role": "assistant", "content": admin_reply }, "finish_reason": "stop" } ], "usage": { "prompt_tokens": len(user_message), "completion_tokens": len(admin_reply), "total_tokens": len(user_message) + len(admin_reply) } } @app.get("/health") async def health_check(): """健康檢查端點""" db_status = "ok" if db_pool else "disconnected" rc_status = "ok" if rocketchat_auth else "not_authenticated" return { "status": "healthy", "database": db_status, "rocketchat": rc_status } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)