""" API 轉接層 - 偽裝 OpenAI API,將請求轉為人工回覆隊列 """ from fastapi import FastAPI, Request, Header from fastapi.responses import JSONResponse from pydantic import BaseModel import asyncpg import asyncio import time import uuid import os import httpx from typing import Optional from datetime import datetime import hashlib app = FastAPI() # 資料庫連接池 db_pool = None # 資料庫設定 DB_CONFIG = { "host": os.getenv("DB_HOST", "postgres"), "port": int(os.getenv("DB_PORT", 5432)), "database": os.getenv("DB_NAME", "tobiichiGPT"), "user": os.getenv("DB_USER", "tobiichi3227"), "password": os.getenv("DB_PASSWORD", "tobiichi_password") } # Papercups 設定 PAPERCUPS_URL = os.getenv("PAPERCUPS_URL", "http://papercups:4000") PAPERCUPS_API_TOKEN = os.getenv("PAPERCUPS_API_TOKEN", "") class Message(BaseModel): role: str content: str class ChatRequest(BaseModel): model: str messages: list[Message] stream: Optional[bool] = False class PapercupsWebhook(BaseModel): """Papercups Webhook 資料格式""" event: str payload: Optional[dict] = None async def get_user_name(user_id: str): """從資料庫獲取用戶真實姓名""" if not user_id: return None try: async with db_pool.acquire() as conn: row = await conn.fetchrow( 'SELECT name, email FROM "user" WHERE id = $1', user_id ) if row: name = row['name'] or row['email'] if name: return name except Exception as e: print(f"⚠️ 獲取用戶名稱失敗: {e}") return None async def get_or_create_papercups_conversation(user_id: str, chat_id: str, user_name: str = None): """獲取或創建 Papercups 對話(使用 chat_id 作為唯一標識)""" if not PAPERCUPS_API_TOKEN: print("⚠️ Papercups 未配置,跳過推送") return None try: async with httpx.AsyncClient(timeout=10.0) as client: headers = {"Authorization": f"Bearer {PAPERCUPS_API_TOKEN}"} # 使用 chat_id 作為唯一標識符 identifier = chat_id if chat_id else user_id # 如果沒有提供用戶名,嘗試從數據庫獲取 if not user_name and user_id: user_name = await get_user_name(user_id) # 生成顯示名稱 if user_name: display_name = user_name elif user_id: display_name = f"User-{user_id[:8]}" else: display_name = f"Chat-{chat_id[:8]}" # Papercups API: 創建或獲取對話 conversation_url = f"{PAPERCUPS_URL}/api/conversations" conversation_payload = { "customer": { "external_id": identifier, "name": display_name } } conversation_response = await client.post(conversation_url, json=conversation_payload, headers=headers) if conversation_response.status_code in [200, 201]: conversation_data = conversation_response.json() papercups_conv_id = conversation_data.get("id") print(f"✅ Papercups 對話: #{papercups_conv_id} ({display_name})") return papercups_conv_id else: print(f"⚠️ Papercups 對話創建失敗: {conversation_response.status_code}") return None except Exception as e: print(f"❌ Papercups 錯誤: {e}") import traceback traceback.print_exc() return None async def send_message_to_papercups(papercups_conv_id: str, message: str, sent_by: str = "customer"): """發送訊息到 Papercups 對話""" if not PAPERCUPS_API_TOKEN or not papercups_conv_id: return False try: async with httpx.AsyncClient(timeout=10.0) as client: headers = {"Authorization": f"Bearer {PAPERCUPS_API_TOKEN}"} msg_url = f"{PAPERCUPS_URL}/api/messages" msg_payload = { "conversation_id": papercups_conv_id, "body": message, "sent_by": sent_by } msg_response = await client.post(msg_url, json=msg_payload, headers=headers) if msg_response.status_code in [200, 201]: print(f"✅ 訊息已發送到 Papercups 對話 #{papercups_conv_id}") return True else: print(f"⚠️ 訊息發送失敗: {msg_response.status_code}") return False except Exception as e: print(f"❌ 發送訊息錯誤: {e}") return False async def init_db(): """初始化資料庫連接池和表格""" global db_pool db_pool = await asyncpg.create_pool(**DB_CONFIG, min_size=2, max_size=10) # 建立對話隊列表格 async with db_pool.acquire() as conn: await conn.execute(""" CREATE TABLE IF NOT EXISTS reply_queue ( id SERIAL PRIMARY KEY, conversation_id VARCHAR(50) UNIQUE NOT NULL, user_id VARCHAR(255), chat_id VARCHAR(255), user_message TEXT NOT NULL, admin_reply TEXT, status VARCHAR(20) DEFAULT 'pending', created_at TIMESTAMP DEFAULT NOW(), replied_at TIMESTAMP, papercups_conversation_id VARCHAR(100) ) """) # 添加新欄位(如果不存在) try: await conn.execute("ALTER TABLE reply_queue ADD COLUMN IF NOT EXISTS user_id VARCHAR(255);") await conn.execute("ALTER TABLE reply_queue ADD COLUMN IF NOT EXISTS chat_id VARCHAR(255);") except: pass # 建立索引 await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_status ON reply_queue(status); CREATE INDEX IF NOT EXISTS idx_conversation_id ON reply_queue(conversation_id); CREATE INDEX IF NOT EXISTS idx_chat_id ON reply_queue(chat_id); """) @app.on_event("startup") async def startup(): await init_db() print("✅ 資料庫連接成功") @app.on_event("shutdown") async def shutdown(): if db_pool: await db_pool.close() print("👋 資料庫連接已關閉") @app.get("/") async def root(): """根路徑""" return {"status": "ok", "service": "TobiichiGPT API"} @app.get("/v1/models") async def list_models(): """模擬 OpenAI 的 /v1/models 端點""" return { "object": "list", "data": [ { "id": "tobiichiGPT", "object": "model", "created": int(time.time()), "owned_by": "tobiichi", "permission": [], "root": "tobiichiGPT", "parent": None } ] } @app.post("/v1/chat/completions") async def chat_completions( request_data: ChatRequest, http_request: Request, authorization: Optional[str] = Header(None) ): """ 模擬 OpenAI Chat Completions API 將用戶訊息寫入資料庫,等待管理員回覆(無超時限制) """ # 取得最後一則用戶訊息 user_message = None for msg in reversed(request_data.messages): if msg.role == "user": user_message = msg.content break if not user_message: return JSONResponse( status_code=400, content={"error": "No user message found"} ) # 嘗試從請求中提取用戶信息 user_id = None chat_id = None # 從 headers 提取信息 headers_dict = dict(http_request.headers) user_id = headers_dict.get("x-user-id") or headers_dict.get("user-id") chat_id = headers_dict.get("x-chat-id") or headers_dict.get("chat-id") # 如果沒有,生成一個基於授權 token 的穩定 ID if not user_id and authorization: user_id = hashlib.md5(authorization.encode()).hexdigest()[:16] # 生成消息 ID(用於追蹤單條消息) message_id = str(uuid.uuid4()) # 如果沒有 chat_id,使用 user_id if not chat_id: chat_id = user_id if user_id else message_id # 獲取用戶真實姓名 user_name = await get_user_name(user_id) if user_id else None print(f"📝 收到訊息 [user:{user_name or user_id}, chat:{chat_id}]: {user_message[:50]}...") # 獲取或創建 Papercups 對話 papercups_conv_id = await get_or_create_papercups_conversation(user_id, chat_id, user_name) # 寫入資料庫 async with db_pool.acquire() as conn: await conn.execute( """ INSERT INTO reply_queue (conversation_id, user_id, chat_id, user_message, status, papercups_conversation_id) VALUES ($1, $2, $3, $4, 'pending', $5) """, message_id, user_id, chat_id, user_message, papercups_conv_id ) # 推送到 Papercups if papercups_conv_id: await send_message_to_papercups(papercups_conv_id, user_message, "customer") # 無限等待管理員回覆 check_interval = 2 # 每 2 秒檢查一次 while True: await asyncio.sleep(check_interval) async with db_pool.acquire() as conn: row = await conn.fetchrow( "SELECT admin_reply, status FROM reply_queue WHERE conversation_id = $1", message_id ) if row and row['status'] == 'replied' and row['admin_reply']: admin_reply = row['admin_reply'] print(f"✅ 管理員已回覆 [chat:{chat_id}]") # 回傳 OpenAI 格式的回應 return { "id": f"chatcmpl-{message_id}", "object": "chat.completion", "created": int(time.time()), "model": request_data.model, "choices": [ { "index": 0, "message": { "role": "assistant", "content": admin_reply }, "finish_reason": "stop" } ], "usage": { "prompt_tokens": len(user_message), "completion_tokens": len(admin_reply), "total_tokens": len(user_message) + len(admin_reply) } } @app.post("/papercups/webhook") async def papercups_webhook(request: Request): """接收 Papercups Webhook 回調""" try: data = await request.json() event = data.get("event") # 只處理訊息建立事件 if event != "message:created": return {"status": "ignored", "event": event} payload = data.get("payload", {}) message = payload.get("message", {}) # 檢查是否為管理員回覆(user type = "user") user = message.get("user") if not user: return {"status": "ignored", "reason": "not_agent_reply"} # 取得訊息內容和對話 ID content = message.get("body") conversation_id = message.get("conversation_id") if not content or not conversation_id: return {"status": "error", "message": "Missing content or conversation_id"} # 更新資料庫中所有該對話的待處理消息(最新的一條) async with db_pool.acquire() as conn: result = await conn.execute( """ UPDATE reply_queue SET admin_reply = $1, status = 'replied', replied_at = NOW() WHERE papercups_conversation_id = $2 AND status = 'pending' AND id = ( SELECT id FROM reply_queue WHERE papercups_conversation_id = $2 AND status = 'pending' ORDER BY created_at DESC LIMIT 1 ) """, content, conversation_id ) print(f"✅ Papercups 回覆已處理: 對話 #{conversation_id}") return {"status": "success", "conversation_id": conversation_id} except Exception as e: print(f"❌ Webhook 處理錯誤: {e}") return {"status": "error", "message": str(e)} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)