389 lines
13 KiB
Python
389 lines
13 KiB
Python
"""
|
||
API 轉接層 - 偽裝 OpenAI API,將請求轉為人工回覆隊列
|
||
"""
|
||
|
||
from fastapi import FastAPI, Request, Header
|
||
from fastapi.responses import JSONResponse
|
||
from pydantic import BaseModel
|
||
import asyncpg
|
||
import asyncio
|
||
import time
|
||
import uuid
|
||
import os
|
||
import httpx
|
||
from typing import Optional
|
||
from datetime import datetime
|
||
import hashlib
|
||
|
||
app = FastAPI()
|
||
|
||
# 資料庫連接池
|
||
db_pool = None
|
||
|
||
# 資料庫設定
|
||
DB_CONFIG = {
|
||
"host": os.getenv("DB_HOST", "postgres"),
|
||
"port": int(os.getenv("DB_PORT", 5432)),
|
||
"database": os.getenv("DB_NAME", "tobiichiGPT"),
|
||
"user": os.getenv("DB_USER", "tobiichi3227"),
|
||
"password": os.getenv("DB_PASSWORD", "tobiichi_password")
|
||
}
|
||
|
||
# Papercups 設定
|
||
PAPERCUPS_URL = os.getenv("PAPERCUPS_URL", "http://papercups:4000")
|
||
PAPERCUPS_API_TOKEN = os.getenv("PAPERCUPS_API_TOKEN", "")
|
||
|
||
|
||
class Message(BaseModel):
|
||
role: str
|
||
content: str
|
||
|
||
|
||
class ChatRequest(BaseModel):
|
||
model: str
|
||
messages: list[Message]
|
||
stream: Optional[bool] = False
|
||
|
||
|
||
class PapercupsWebhook(BaseModel):
|
||
"""Papercups Webhook 資料格式"""
|
||
event: str
|
||
payload: Optional[dict] = None
|
||
|
||
|
||
async def get_user_name(user_id: str):
|
||
"""從資料庫獲取用戶真實姓名"""
|
||
if not user_id:
|
||
return None
|
||
|
||
try:
|
||
async with db_pool.acquire() as conn:
|
||
row = await conn.fetchrow(
|
||
'SELECT name, email FROM "user" WHERE id = $1',
|
||
user_id
|
||
)
|
||
if row:
|
||
name = row['name'] or row['email']
|
||
if name:
|
||
return name
|
||
except Exception as e:
|
||
print(f"⚠️ 獲取用戶名稱失敗: {e}")
|
||
|
||
return None
|
||
|
||
|
||
async def get_or_create_papercups_conversation(user_id: str, chat_id: str, user_name: str = None):
|
||
"""獲取或創建 Papercups 對話(使用 chat_id 作為唯一標識)"""
|
||
if not PAPERCUPS_API_TOKEN:
|
||
print("⚠️ Papercups 未配置,跳過推送")
|
||
return None
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
headers = {"Authorization": f"Bearer {PAPERCUPS_API_TOKEN}"}
|
||
|
||
# 使用 chat_id 作為唯一標識符
|
||
identifier = chat_id if chat_id else user_id
|
||
|
||
# 如果沒有提供用戶名,嘗試從數據庫獲取
|
||
if not user_name and user_id:
|
||
user_name = await get_user_name(user_id)
|
||
|
||
# 生成顯示名稱
|
||
if user_name:
|
||
display_name = user_name
|
||
elif user_id:
|
||
display_name = f"User-{user_id[:8]}"
|
||
else:
|
||
display_name = f"Chat-{chat_id[:8]}"
|
||
|
||
# Papercups API: 創建或獲取對話
|
||
conversation_url = f"{PAPERCUPS_URL}/api/conversations"
|
||
conversation_payload = {
|
||
"customer": {
|
||
"external_id": identifier,
|
||
"name": display_name
|
||
}
|
||
}
|
||
|
||
conversation_response = await client.post(conversation_url, json=conversation_payload, headers=headers)
|
||
if conversation_response.status_code in [200, 201]:
|
||
conversation_data = conversation_response.json()
|
||
papercups_conv_id = conversation_data.get("id")
|
||
print(f"✅ Papercups 對話: #{papercups_conv_id} ({display_name})")
|
||
return papercups_conv_id
|
||
else:
|
||
print(f"⚠️ Papercups 對話創建失敗: {conversation_response.status_code}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"❌ Papercups 錯誤: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
|
||
async def send_message_to_papercups(papercups_conv_id: str, message: str, sent_by: str = "customer"):
|
||
"""發送訊息到 Papercups 對話"""
|
||
if not PAPERCUPS_API_TOKEN or not papercups_conv_id:
|
||
return False
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
headers = {"Authorization": f"Bearer {PAPERCUPS_API_TOKEN}"}
|
||
msg_url = f"{PAPERCUPS_URL}/api/messages"
|
||
msg_payload = {
|
||
"conversation_id": papercups_conv_id,
|
||
"body": message,
|
||
"sent_by": sent_by
|
||
}
|
||
|
||
msg_response = await client.post(msg_url, json=msg_payload, headers=headers)
|
||
if msg_response.status_code in [200, 201]:
|
||
print(f"✅ 訊息已發送到 Papercups 對話 #{papercups_conv_id}")
|
||
return True
|
||
else:
|
||
print(f"⚠️ 訊息發送失敗: {msg_response.status_code}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"❌ 發送訊息錯誤: {e}")
|
||
return False
|
||
|
||
|
||
async def init_db():
|
||
"""初始化資料庫連接池和表格"""
|
||
global db_pool
|
||
db_pool = await asyncpg.create_pool(**DB_CONFIG, min_size=2, max_size=10)
|
||
|
||
# 建立對話隊列表格
|
||
async with db_pool.acquire() as conn:
|
||
await conn.execute("""
|
||
CREATE TABLE IF NOT EXISTS reply_queue (
|
||
id SERIAL PRIMARY KEY,
|
||
conversation_id VARCHAR(50) UNIQUE NOT NULL,
|
||
user_id VARCHAR(255),
|
||
chat_id VARCHAR(255),
|
||
user_message TEXT NOT NULL,
|
||
admin_reply TEXT,
|
||
status VARCHAR(20) DEFAULT 'pending',
|
||
created_at TIMESTAMP DEFAULT NOW(),
|
||
replied_at TIMESTAMP,
|
||
papercups_conversation_id VARCHAR(100)
|
||
)
|
||
""")
|
||
|
||
# 添加新欄位(如果不存在)
|
||
try:
|
||
await conn.execute("ALTER TABLE reply_queue ADD COLUMN IF NOT EXISTS user_id VARCHAR(255);")
|
||
await conn.execute("ALTER TABLE reply_queue ADD COLUMN IF NOT EXISTS chat_id VARCHAR(255);")
|
||
except:
|
||
pass
|
||
|
||
# 建立索引
|
||
await conn.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_status ON reply_queue(status);
|
||
CREATE INDEX IF NOT EXISTS idx_conversation_id ON reply_queue(conversation_id);
|
||
CREATE INDEX IF NOT EXISTS idx_chat_id ON reply_queue(chat_id);
|
||
""")
|
||
|
||
|
||
@app.on_event("startup")
|
||
async def startup():
|
||
await init_db()
|
||
print("✅ 資料庫連接成功")
|
||
|
||
|
||
@app.on_event("shutdown")
|
||
async def shutdown():
|
||
if db_pool:
|
||
await db_pool.close()
|
||
print("👋 資料庫連接已關閉")
|
||
|
||
|
||
@app.get("/")
|
||
async def root():
|
||
"""根路徑"""
|
||
return {"status": "ok", "service": "TobiichiGPT API"}
|
||
|
||
|
||
@app.get("/v1/models")
|
||
async def list_models():
|
||
"""模擬 OpenAI 的 /v1/models 端點"""
|
||
return {
|
||
"object": "list",
|
||
"data": [
|
||
{
|
||
"id": "tobiichiGPT",
|
||
"object": "model",
|
||
"created": int(time.time()),
|
||
"owned_by": "tobiichi",
|
||
"permission": [],
|
||
"root": "tobiichiGPT",
|
||
"parent": None
|
||
}
|
||
]
|
||
}
|
||
|
||
|
||
@app.post("/v1/chat/completions")
|
||
async def chat_completions(
|
||
request_data: ChatRequest,
|
||
http_request: Request,
|
||
authorization: Optional[str] = Header(None)
|
||
):
|
||
"""
|
||
模擬 OpenAI Chat Completions API
|
||
將用戶訊息寫入資料庫,等待管理員回覆(無超時限制)
|
||
"""
|
||
# 取得最後一則用戶訊息
|
||
user_message = None
|
||
for msg in reversed(request_data.messages):
|
||
if msg.role == "user":
|
||
user_message = msg.content
|
||
break
|
||
|
||
if not user_message:
|
||
return JSONResponse(
|
||
status_code=400,
|
||
content={"error": "No user message found"}
|
||
)
|
||
|
||
# 嘗試從請求中提取用戶信息
|
||
user_id = None
|
||
chat_id = None
|
||
|
||
# 從 headers 提取信息
|
||
headers_dict = dict(http_request.headers)
|
||
user_id = headers_dict.get("x-user-id") or headers_dict.get("user-id")
|
||
chat_id = headers_dict.get("x-chat-id") or headers_dict.get("chat-id")
|
||
|
||
# 如果沒有,生成一個基於授權 token 的穩定 ID
|
||
if not user_id and authorization:
|
||
user_id = hashlib.md5(authorization.encode()).hexdigest()[:16]
|
||
|
||
# 生成消息 ID(用於追蹤單條消息)
|
||
message_id = str(uuid.uuid4())
|
||
|
||
# 如果沒有 chat_id,使用 user_id
|
||
if not chat_id:
|
||
chat_id = user_id if user_id else message_id
|
||
|
||
# 獲取用戶真實姓名
|
||
user_name = await get_user_name(user_id) if user_id else None
|
||
|
||
print(f"📝 收到訊息 [user:{user_name or user_id}, chat:{chat_id}]: {user_message[:50]}...")
|
||
|
||
# 獲取或創建 Papercups 對話
|
||
papercups_conv_id = await get_or_create_papercups_conversation(user_id, chat_id, user_name)
|
||
|
||
# 寫入資料庫
|
||
async with db_pool.acquire() as conn:
|
||
await conn.execute(
|
||
"""
|
||
INSERT INTO reply_queue (conversation_id, user_id, chat_id, user_message, status, papercups_conversation_id)
|
||
VALUES ($1, $2, $3, $4, 'pending', $5)
|
||
""",
|
||
message_id, user_id, chat_id, user_message, papercups_conv_id
|
||
)
|
||
|
||
# 推送到 Papercups
|
||
if papercups_conv_id:
|
||
await send_message_to_papercups(papercups_conv_id, user_message, "customer")
|
||
|
||
# 無限等待管理員回覆
|
||
check_interval = 2 # 每 2 秒檢查一次
|
||
|
||
while True:
|
||
await asyncio.sleep(check_interval)
|
||
|
||
async with db_pool.acquire() as conn:
|
||
row = await conn.fetchrow(
|
||
"SELECT admin_reply, status FROM reply_queue WHERE conversation_id = $1",
|
||
message_id
|
||
)
|
||
|
||
if row and row['status'] == 'replied' and row['admin_reply']:
|
||
admin_reply = row['admin_reply']
|
||
print(f"✅ 管理員已回覆 [chat:{chat_id}]")
|
||
|
||
# 回傳 OpenAI 格式的回應
|
||
return {
|
||
"id": f"chatcmpl-{message_id}",
|
||
"object": "chat.completion",
|
||
"created": int(time.time()),
|
||
"model": request_data.model,
|
||
"choices": [
|
||
{
|
||
"index": 0,
|
||
"message": {
|
||
"role": "assistant",
|
||
"content": admin_reply
|
||
},
|
||
"finish_reason": "stop"
|
||
}
|
||
],
|
||
"usage": {
|
||
"prompt_tokens": len(user_message),
|
||
"completion_tokens": len(admin_reply),
|
||
"total_tokens": len(user_message) + len(admin_reply)
|
||
}
|
||
}
|
||
|
||
|
||
@app.post("/papercups/webhook")
|
||
async def papercups_webhook(request: Request):
|
||
"""接收 Papercups Webhook 回調"""
|
||
try:
|
||
data = await request.json()
|
||
event = data.get("event")
|
||
|
||
# 只處理訊息建立事件
|
||
if event != "message:created":
|
||
return {"status": "ignored", "event": event}
|
||
|
||
payload = data.get("payload", {})
|
||
message = payload.get("message", {})
|
||
|
||
# 檢查是否為管理員回覆(user type = "user")
|
||
user = message.get("user")
|
||
if not user:
|
||
return {"status": "ignored", "reason": "not_agent_reply"}
|
||
|
||
# 取得訊息內容和對話 ID
|
||
content = message.get("body")
|
||
conversation_id = message.get("conversation_id")
|
||
|
||
if not content or not conversation_id:
|
||
return {"status": "error", "message": "Missing content or conversation_id"}
|
||
|
||
# 更新資料庫中所有該對話的待處理消息(最新的一條)
|
||
async with db_pool.acquire() as conn:
|
||
result = await conn.execute(
|
||
"""
|
||
UPDATE reply_queue
|
||
SET admin_reply = $1, status = 'replied', replied_at = NOW()
|
||
WHERE papercups_conversation_id = $2
|
||
AND status = 'pending'
|
||
AND id = (
|
||
SELECT id FROM reply_queue
|
||
WHERE papercups_conversation_id = $2 AND status = 'pending'
|
||
ORDER BY created_at DESC
|
||
LIMIT 1
|
||
)
|
||
""",
|
||
content, conversation_id
|
||
)
|
||
|
||
print(f"✅ Papercups 回覆已處理: 對話 #{conversation_id}")
|
||
return {"status": "success", "conversation_id": conversation_id}
|
||
|
||
except Exception as e:
|
||
print(f"❌ Webhook 處理錯誤: {e}")
|
||
return {"status": "error", "message": str(e)}
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import uvicorn
|
||
uvicorn.run(app, host="0.0.0.0", port=8000)
|