Files
DifyOpenAI/server.py
daiqingshuang 0e417477aa init
2025-09-23 18:33:03 +08:00

797 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# server.py
"""
OpenAI 到 Dify API 代理服务器 (增强日志版本)
============================
这是一个 FastAPI 应用程序,用于将 OpenAI 兼容的聊天完成请求转换为 Dify API 调用。
该代理服务器支持流式和非流式响应,允许使用标准的 OpenAI 客户端库与 Dify 平台进行交互。
主要功能:
1. 将 OpenAI /v1/chat/completions 请求格式转换为 Dify /chat-messages 格式
2. 支持流式和阻塞式响应模式
3. 提供 OpenAI 兼容的模型列表接口
4. 处理跨域请求和错误响应
5. 全面的结构化日志记录和监控
作者: 系统生成
版本: 0.2.0
"""
# --- 导入依赖模块 (Import Dependencies) ---
import os
import json
import time
import logging
import logging.handlers
from typing import Any, Dict, Optional
import asyncio
import traceback
from datetime import datetime
import uuid
import httpx
from fastapi import FastAPI, Request, Response, Header
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from dotenv import load_dotenv
# --- 日志配置 (Logging Configuration) ---
def setup_logging():
"""配置全局日志系统"""
# 首先确保日志目录存在
log_dir = 'logs'
os.makedirs(log_dir, exist_ok=True)
# 创建日志格式器
log_format = logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(name)s:%(lineno)d | %(funcName)s() | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 配置根日志器
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# 清除已有的处理器,避免重复日志
logger.handlers.clear()
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(log_format)
try:
# 文件处理器 - 按日期轮转
file_handler = logging.handlers.TimedRotatingFileHandler(
filename=os.path.join(log_dir, 'proxy_server.log'),
when='midnight',
interval=1,
backupCount=30,
encoding='utf-8'
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(log_format)
# 错误文件处理器
error_handler = logging.handlers.TimedRotatingFileHandler(
filename=os.path.join(log_dir, 'error.log'),
when='midnight',
interval=1,
backupCount=30,
encoding='utf-8'
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(log_format)
# 添加处理器
logger.addHandler(console_handler)
logger.addHandler(file_handler)
logger.addHandler(error_handler)
print(f"日志系统初始化成功,日志目录: {os.path.abspath(log_dir)}")
except Exception as e:
print(f"文件日志处理器初始化失败,仅使用控制台输出: {e}")
# 如果文件处理器失败,只使用控制台处理器
logger.addHandler(console_handler)
# 设置第三方库日志级别
logging.getLogger('httpx').setLevel(logging.WARNING)
logging.getLogger('uvicorn.access').setLevel(logging.INFO)
return logger
# 初始化日志系统
logger = setup_logging()
# --- 环境配置加载 (Environment Configuration) ---
logger.info("开始加载环境配置...")
load_dotenv()
# Dify API 配置
DIFY_API_KEY = os.getenv("DIFY_API_KEY", "app-qy8vyABbRAeICum1czdJvXIF").strip()
DIFY_BASE_URL = os.getenv("DIFY_BASE_URL", "https://api.dify.ai/v1").rstrip("/")
logger.info(f"Dify API 基础URL: {DIFY_BASE_URL}")
logger.info(f"Dify API Key 长度: {len(DIFY_API_KEY)}")
# --- FastAPI 应用初始化 (FastAPI Application Setup) ---
logger.info("初始化 FastAPI 应用...")
app = FastAPI(
title="OpenAI→Dify Proxy",
version="0.2.0"
)
# --- 跨域中间件配置 (CORS Middleware Configuration) ---
logger.info("配置 CORS 中间件...")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- 环境变量验证 (Environment Variable Validation) ---
if not DIFY_API_KEY:
logger.error("缺少必需的环境变量: DIFY_API_KEY")
raise RuntimeError("Missing DIFY_API_KEY environment variable.")
logger.info("环境配置加载完成")
# --- 请求中间件 (Request Middleware) ---
@app.middleware("http")
async def log_requests(request: Request, call_next):
"""记录所有HTTP请求的详细信息"""
request_id = str(uuid.uuid4())[:8]
start_time = time.time()
# 记录请求开始
logger.info(f"[{request_id}] 收到请求: {request.method} {request.url.path}")
logger.debug(f"[{request_id}] 请求头: {dict(request.headers)}")
# 记录客户端信息
client_ip = request.client.host if request.client else "unknown"
user_agent = request.headers.get("user-agent", "unknown")
logger.info(f"[{request_id}] 客户端: {client_ip}, UA: {user_agent}")
response = await call_next(request)
# 记录响应信息
process_time = time.time() - start_time
logger.info(f"[{request_id}] 响应状态: {response.status_code}, 耗时: {process_time:.3f}s")
# 添加请求ID到响应头
response.headers["X-Request-ID"] = request_id
return response
# --- 工具函数 (Utility Functions) ---
def build_openai_completion_id() -> str:
"""生成 OpenAI 兼容的聊天完成 ID"""
completion_id = f"chatcmpl-{int(time.time() * 1000)}"
logger.debug(f"生成完成ID: {completion_id}")
return completion_id
def openai_chunk(data: Dict[str, Any]) -> str:
"""格式化单个 OpenAI 流式响应数据块"""
chunk = f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
logger.debug(f"生成流式数据块,大小: {len(chunk)} bytes")
return chunk
# --- 数据转换函数 (Data Transformation Functions) ---
def map_openai_to_dify(body: Dict[str, Any]) -> Dict[str, Any]:
"""将 OpenAI 聊天完成请求格式转换为 Dify 请求格式"""
logger.debug(f"开始转换请求格式,原始数据: {json.dumps(body, ensure_ascii=False)}")
messages = body.get("messages", []) or []
stream = bool(body.get("stream", False))
logger.info(f"消息数量: {len(messages)}, 流式模式: {stream}")
# 查找最后一条用户消息内容
last_user_content = ""
for i, msg in enumerate(reversed(messages)):
if msg.get("role") == "user":
last_user_content = msg.get("content", "")
logger.debug(f"找到用户消息 (位置 {len(messages) - 1 - i}): {last_user_content[:100]}...")
break
if not last_user_content:
logger.warning("未找到用户消息内容")
# 提取扩展参数
extra = body.get("extra", {}) or {}
conversation_id = extra.get("conversation_id", "")
inputs = extra.get("inputs", {}) or {}
if conversation_id:
logger.info(f"使用对话ID: {conversation_id}")
if inputs:
logger.info(f"应用输入变量: {list(inputs.keys())}")
# 构建 Dify API 请求体
dify_payload = {
"inputs": inputs,
"query": last_user_content,
"response_mode": "streaming" if stream else "blocking",
"conversation_id": conversation_id,
"user": body.get("user", "default-user"),
}
logger.debug(f"转换后的Dify请求: {json.dumps(dify_payload, ensure_ascii=False)}")
return dify_payload
def map_dify_blocking_to_openai(dify_resp: Dict[str, Any], model: str = "dify-app") -> Dict[str, Any]:
"""将 Dify 阻塞式响应转换为 OpenAI 非流式响应格式"""
logger.debug(f"转换Dify阻塞响应: {json.dumps(dify_resp, ensure_ascii=False)}")
# 提取答案内容
answer = dify_resp.get("answer", "") or dify_resp.get("data", {}).get("answer", "")
usage_obj = dify_resp.get("metadata", {}).get("usage", {}) or {}
logger.info(f"响应内容长度: {len(answer)} 字符")
logger.info(f"Token使用情况: {usage_obj}")
openai_response = {
"id": build_openai_completion_id(),
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": answer
},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": usage_obj.get("prompt_tokens", 0),
"completion_tokens": usage_obj.get("completion_tokens", 0),
"total_tokens": usage_obj.get("total_tokens", 0),
},
}
logger.debug(f"转换后的OpenAI响应: {json.dumps(openai_response, ensure_ascii=False)}")
return openai_response
# --- 流式响应处理函数 (Streaming Response Handler) ---
def create_openai_stream_generator(dify_url: str, headers: Dict[str, str], dify_payload: Dict[str, Any],
model: str = "dify-app"):
"""创建OpenAI格式的流式响应生成器"""
completion_id = build_openai_completion_id()
created_ts = int(time.time())
logger.info(f"开始流式响应完成ID: {completion_id}")
logger.debug(f"请求URL: {dify_url}")
logger.debug(f"请求头: {headers}")
chunk_count = 0
total_content_length = 0
start_time = time.time()
try:
# 发送初始角色块
logger.debug("发送初始角色块")
yield openai_chunk({
"id": completion_id,
"object": "chat.completion.chunk",
"created": created_ts,
"model": model,
"choices": [
{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}
],
})
chunk_count += 1
# 建立流式连接
logger.info("建立与Dify的流式连接...")
with httpx.Client(timeout=httpx.Timeout(120.0)) as client:
with client.stream("POST", dify_url, headers=headers, json=dify_payload) as response:
logger.info(f"连接成功,响应状态码: {response.status_code}")
if response.status_code >= 400:
logger.error(f"Dify API响应错误: {response.status_code}")
error_content = response.text
logger.error(f"错误内容: {error_content}")
yield openai_chunk({
"id": completion_id,
"object": "chat.completion.chunk",
"created": created_ts,
"model": model,
"choices": [
{"index": 0, "delta": {"content": ""}, "finish_reason": "error"}
],
})
return
# 处理流式数据
for line in response.iter_lines():
if not line:
continue
logger.debug(f"收到原始数据行: {line[:200]}...") # 只记录前200字符
if line.startswith("data:"):
try:
payload = json.loads(line[len("data:"):].strip())
event = payload.get("event")
logger.debug(f"解析事件类型: {event}")
if event == "message":
ans = payload.get("answer", "")
if ans:
chunk_count += 1
total_content_length += len(ans)
logger.debug(f"收到消息块 #{chunk_count}, 长度: {len(ans)}")
yield openai_chunk({
"id": completion_id,
"object": "chat.completion.chunk",
"created": created_ts,
"model": model,
"choices": [
{"index": 0, "delta": {"content": ans}, "finish_reason": None}
],
})
elif event == "message_end":
logger.info(
f"流式响应结束,共发送 {chunk_count} 个数据块,总内容长度: {total_content_length}")
break
elif event == "error":
error_msg = payload.get("message", "未知错误")
logger.error(f"Dify返回错误事件: {error_msg}")
yield openai_chunk({
"id": completion_id,
"object": "chat.completion.chunk",
"created": created_ts,
"model": model,
"choices": [
{"index": 0, "delta": {"content": ""}, "finish_reason": "error"}
],
})
break
elif event == "workflow_started":
logger.info("工作流开始执行")
elif event == "node_started":
node_id = payload.get("data", {}).get("id", "unknown")
logger.info(f"节点开始执行: {node_id}")
elif event == "node_finished":
node_id = payload.get("data", {}).get("id", "unknown")
logger.info(f"节点执行完成: {node_id}")
except json.JSONDecodeError as e:
logger.error(f"JSON解析失败: {e}, 原始数据: {line}")
continue
except Exception as e:
logger.error(f"处理流式数据时发生异常: {e}, 数据: {line}")
continue
# 记录流式响应统计信息
elapsed_time = time.time() - start_time
logger.info(
f"流式响应完成 - 总耗时: {elapsed_time:.3f}s, 数据块: {chunk_count}, 内容长度: {total_content_length}")
except httpx.TimeoutException:
logger.error(f"请求超时 (120s): {dify_url}")
yield openai_chunk({
"id": completion_id,
"object": "chat.completion.chunk",
"created": created_ts,
"model": model,
"choices": [
{"index": 0, "delta": {"content": ""}, "finish_reason": "error"}
],
})
except httpx.RequestError as e:
logger.error(f"网络请求错误: {e}")
yield openai_chunk({
"id": completion_id,
"object": "chat.completion.chunk",
"created": created_ts,
"model": model,
"choices": [
{"index": 0, "delta": {"content": ""}, "finish_reason": "error"}
],
})
except Exception as e:
logger.error(f"流式响应生成器异常: {e}")
logger.error(f"异常堆栈: {traceback.format_exc()}")
yield openai_chunk({
"id": completion_id,
"object": "chat.completion.chunk",
"created": created_ts,
"model": model,
"choices": [
{"index": 0, "delta": {"content": ""}, "finish_reason": "error"}
],
})
finally:
# 发送最终的[DONE]标记
logger.debug("发送流式响应结束标记")
yield "data: [DONE]\n\n"
# --- API 路由处理器 (API Route Handlers) ---
@app.post("/v1/chat/completions")
async def chat_completions(request: Request, authorization: Optional[str] = Header(None)):
"""OpenAI 兼容的聊天完成代理端点"""
request_id = request.headers.get("X-Request-ID", "unknown")
logger.info(f"[{request_id}] 处理聊天完成请求")
try:
# 解析请求体
body = await request.json()
model = body.get("model", "dify-app")
stream = bool(body.get("stream", False))
logger.info(f"[{request_id}] 模型: {model}, 流式: {stream}")
logger.debug(f"[{request_id}] 完整请求体: {json.dumps(body, ensure_ascii=False)}")
# 验证请求格式
messages = body.get("messages", [])
if not messages:
logger.warning(f"[{request_id}] 请求中缺少messages字段")
return JSONResponse(
status_code=400,
content={
"error": {
"message": "缺少required参数: messages",
"type": "invalid_request_error"
}
}
)
# 转换为Dify格式
dify_payload = map_openai_to_dify(body)
# 构建请求头
headers = {
"Authorization": f"Bearer {DIFY_API_KEY}",
"Content-Type": "application/json",
}
if stream:
logger.info(f"[{request_id}] 处理流式请求")
dify_url = f"{DIFY_BASE_URL}/chat-messages"
# 预检查连接有效性
logger.debug(f"[{request_id}] 执行预检查...")
async with httpx.AsyncClient(timeout=httpx.Timeout(10.0)) as client:
try:
# 使用HEAD请求检查API可用性
test_resp = await client.head(f"{DIFY_BASE_URL}/",
headers={"Authorization": f"Bearer {DIFY_API_KEY}"})
logger.debug(f"[{request_id}] API预检查状态: {test_resp.status_code}")
except Exception as e:
logger.error(f"[{request_id}] API预检查失败: {e}")
return JSONResponse(
status_code=503,
content={
"error": {
"message": f"Dify API不可用: {str(e)}",
"type": "service_unavailable"
}
}
)
# 返回流式响应
logger.info(f"[{request_id}] 开始返回流式响应")
return StreamingResponse(
create_openai_stream_generator(dify_url, headers, dify_payload, model),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # 禁用nginx缓冲
}
)
else:
logger.info(f"[{request_id}] 处理阻塞式请求")
dify_url = f"{DIFY_BASE_URL}/chat-messages"
start_time = time.time()
async with httpx.AsyncClient(timeout=httpx.Timeout(120.0)) as client:
try:
logger.debug(f"[{request_id}] 发送请求到: {dify_url}")
resp = await client.post(dify_url, headers=headers, json=dify_payload)
request_time = time.time() - start_time
logger.info(f"[{request_id}] Dify响应: {resp.status_code}, 耗时: {request_time:.3f}s")
if resp.status_code >= 400:
error_text = resp.text
logger.error(f"[{request_id}] Dify API错误 ({resp.status_code}): {error_text}")
return JSONResponse(
status_code=resp.status_code,
content={
"error": {
"message": error_text,
"type": "invalid_request_error",
"code": resp.status_code
}
}
)
dify_json = resp.json()
logger.debug(f"[{request_id}] Dify响应数据: {json.dumps(dify_json, ensure_ascii=False)}")
# 转换为OpenAI格式
openai_json = map_dify_blocking_to_openai(dify_json, model=model)
logger.info(f"[{request_id}] 成功返回阻塞式响应")
return JSONResponse(status_code=200, content=openai_json)
except httpx.TimeoutException:
logger.error(f"[{request_id}] 请求超时 (120s)")
return JSONResponse(
status_code=504,
content={
"error": {
"message": "请求超时,请稍后重试",
"type": "timeout_error"
}
}
)
except httpx.RequestError as e:
logger.error(f"[{request_id}] 网络请求异常: {e}")
return JSONResponse(
status_code=502,
content={
"error": {
"message": f"网络连接失败: {str(e)}",
"type": "network_error"
}
}
)
except json.JSONDecodeError as e:
logger.error(f"[{request_id}] JSON解析失败: {e}")
return JSONResponse(
status_code=400,
content={
"error": {
"message": "请求体JSON格式无效",
"type": "invalid_json"
}
}
)
except Exception as e:
logger.error(f"[{request_id}] 处理请求时发生异常: {e}")
logger.error(f"[{request_id}] 异常堆栈: {traceback.format_exc()}")
return JSONResponse(
status_code=500,
content={
"error": {
"message": "内部服务器错误",
"type": "internal_server_error"
}
}
)
@app.get("/v1/models")
@app.get("/models")
async def list_models(request: Request):
"""模拟 OpenAI 模型列表接口"""
request_id = request.headers.get("X-Request-ID", "unknown")
logger.info(f"[{request_id}] 请求模型列表")
try:
fake_model = {
"id": "dify",
"object": "model",
"created": int(time.time()),
"owned_by": "dify-proxy",
"permission": [],
"root": "dify",
"parent": None
}
response_data = {
"object": "list",
"data": [fake_model]
}
logger.info(f"[{request_id}] 成功返回模型列表,包含 {len(response_data['data'])} 个模型")
logger.debug(f"[{request_id}] 模型列表: {json.dumps(response_data, ensure_ascii=False)}")
return response_data
except Exception as e:
logger.error(f"[{request_id}] 获取模型列表异常: {e}")
logger.error(f"[{request_id}] 异常堆栈: {traceback.format_exc()}")
return JSONResponse(
status_code=500,
content={
"error": {
"message": "获取模型列表失败",
"type": "internal_server_error"
}
}
)
@app.get("/health")
def health(request: Request):
"""服务健康检查端点"""
request_id = request.headers.get("X-Request-ID", "unknown")
logger.info(f"[{request_id}] 健康检查请求")
try:
# 检查基本配置
config_status = {
"dify_api_key_configured": bool(DIFY_API_KEY),
"dify_base_url": DIFY_BASE_URL,
"timestamp": datetime.now().isoformat(),
"uptime": time.time() - start_timestamp
}
health_data = {
"status": "ok",
"service": "openai-dify-proxy",
"version": "0.2.0",
"config": config_status
}
logger.debug(f"[{request_id}] 健康检查详情: {json.dumps(health_data, ensure_ascii=False)}")
return health_data
except Exception as e:
logger.error(f"[{request_id}] 健康检查异常: {e}")
return JSONResponse(
status_code=500,
content={
"status": "error",
"message": str(e)
}
)
@app.get("/")
async def root(request: Request):
"""根路径信息端点"""
request_id = request.headers.get("X-Request-ID", "unknown")
logger.info(f"[{request_id}] 访问根路径")
return {
"service": "OpenAI → Dify API Proxy",
"version": "0.2.0",
"status": "running",
"endpoints": {
"chat_completions": "/v1/chat/completions",
"models": "/v1/models",
"health": "/health"
},
"documentation": "/docs"
}
# --- 启动和关闭事件 (Startup and Shutdown Events) ---
start_timestamp = time.time()
@app.on_event("startup")
async def startup_event():
"""应用启动事件"""
logger.info("=" * 60)
logger.info("OpenAI → Dify API Proxy 启动中...")
logger.info(f"版本: 0.2.0")
logger.info(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"Dify Base URL: {DIFY_BASE_URL}")
logger.info(f"日志级别: {logger.level}")
logger.info("=" * 60)
# 测试Dify API连接
try:
logger.info("测试 Dify API 连接...")
async with httpx.AsyncClient(timeout=httpx.Timeout(10.0)) as client:
test_response = await client.get(
f"{DIFY_BASE_URL}/",
headers={"Authorization": f"Bearer {DIFY_API_KEY}"}
)
if test_response.status_code < 500:
logger.info("✓ Dify API 连接正常")
else:
logger.warning(f"⚠ Dify API 响应异常: {test_response.status_code}")
except Exception as e:
logger.error(f"✗ Dify API 连接测试失败: {e}")
@app.on_event("shutdown")
async def shutdown_event():
"""应用关闭事件"""
uptime = time.time() - start_timestamp
logger.info("=" * 60)
logger.info("OpenAI → Dify API Proxy 正在关闭...")
logger.info(f"运行时长: {uptime:.1f}")
logger.info(f"关闭时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("感谢使用!")
logger.info("=" * 60)
# --- 异常处理器 (Exception Handlers) ---
@app.exception_handler(404)
async def not_found_handler(request: Request, exc):
"""404错误处理器"""
request_id = request.headers.get("X-Request-ID", "unknown")
logger.warning(f"[{request_id}] 404 错误: {request.method} {request.url.path}")
return JSONResponse(
status_code=404,
content={
"error": {
"message": f"路径未找到: {request.url.path}",
"type": "not_found",
"available_endpoints": [
"/v1/chat/completions",
"/v1/models",
"/models",
"/health",
"/"
]
}
}
)
@app.exception_handler(500)
async def internal_error_handler(request: Request, exc):
"""500错误处理器"""
request_id = request.headers.get("X-Request-ID", "unknown")
logger.error(f"[{request_id}] 500 内部服务器错误: {exc}")
logger.error(f"[{request_id}] 异常堆栈: {traceback.format_exc()}")
return JSONResponse(
status_code=500,
content={
"error": {
"message": "内部服务器错误",
"type": "internal_server_error"
}
}
)
# --- 主程序入口 (Main Execution Block) ---
def main():
"""应用程序主入口函数"""
logger.info("准备启动 uvicorn 服务器...")
import uvicorn
# 配置uvicorn日志
uvicorn_log_config = uvicorn.config.LOGGING_CONFIG
uvicorn_log_config["formatters"]["default"]["fmt"] = "%(asctime)s | %(levelname)-8s | uvicorn | %(message)s"
uvicorn_log_config["formatters"]["access"][
"fmt"] = "%(asctime)s | %(levelname)-8s | uvicorn.access | %(client_addr)s - \"%(request_line)s\" %(status_code)s"
try:
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_config=uvicorn_log_config,
access_log=True
)
except KeyboardInterrupt:
logger.info("收到中断信号,正在关闭服务器...")
except Exception as e:
logger.error(f"服务器启动异常: {e}")
logger.error(f"异常堆栈: {traceback.format_exc()}")
finally:
logger.info("服务器已关闭")
if __name__ == "__main__":
main()