Files
agno-notebooks/数据库优化工程师.py
cruld 0455760852 feat(database_optimizer): 支持异步数据库分析
- 新增异步主函数以提升性能
- 重构知识库加载及智能体创建逻辑
- 添加客户端会话以连接MCP服务器
2025-03-31 01:55:21 +08:00

130 lines
4.1 KiB
Python

#!/usr/bin/env python3
"""
PostgreSQL 数据库优化工程师智能体
功能:
- 使用 DeepSeek 模型进行智能分析
- 集成 PostgreSQL MCP 服务器工具
- 加载数据库知识库
- 提供数据库优化建议
"""
import asyncio
import os
from typing import Optional
from dataclasses import dataclass
from dotenv import load_dotenv
from agno.agent import Agent
from agno.models.deepseek import DeepSeek
from agno.knowledge.pdf import PDFKnowledgeBase
from agno.vectordb.lancedb import LanceDb, SearchType
from agno.embedder.openai import OpenAIEmbedder
from agno.knowledge.combined import CombinedKnowledgeBase
from agno.tools.mcp import MCPTools
from mcp.client.stdio import stdio_client
from mcp import ClientSession, StdioServerParameters
@dataclass
class DatabaseOptimizerConfig:
"""数据库优化器配置"""
pdf_path: str = "D:\\Sources\\DONGJAK-TOOLS\\pdfs\\Database Fundamentals.pdf"
db_connection: str = "postgresql://postgres:postgres@192.168.1.7:5432/postgres"
model_id: str = "deepseek-chat"
vector_db_path: str = "tmp/lancedb"
class DatabaseOptimizer:
"""PostgreSQL 数据库优化引擎"""
def __init__(self, config: Optional[DatabaseOptimizerConfig] = None):
self.config = config or DatabaseOptimizerConfig()
self._load_environment()
self.knowledge_base = self._setup_knowledge_base()
self.postgres_tools = self._setup_postgres_tools()
self.agent = self._create_agent()
def _setup_postgres_tools(self) -> MCPTools:
"""设置 PostgreSQL 工具"""
server_params = StdioServerParameters(
command="cmd",
args=[
"/c",
"npx",
"-y",
"@modelcontextprotocol/server-postgres",
self.config.db_connection,
],
env={},
)
# Create a client session to connect to the MCP server
with stdio_client(server_params) as (read, write):
with ClientSession(read, write) as session:
agent = create_filesystem_agent(session)
# Run the agent
agent.print_response(message, stream=True)
return MCPTools(server_params=server_params)
def analyze_database(self, query: str, stream: bool = True):
"""分析数据库"""
with self.postgres_tools:
self.agent.print_response(query, stream=stream)
async def main():
load_dotenv()
"""设置知识库"""
local_pdf_kb = PDFKnowledgeBase(
path="D:\\Sources\\DONGJAK-TOOLS\\pdfs\\Database Fundamentals.pdf",
vector_db=LanceDb(
table_name="database_fundamentals",
uri="tmp/lancedb",
search_type=SearchType.vector,
embedder=OpenAIEmbedder(id="text-embedding-3-small"),
),
)
knowledge_base = CombinedKnowledgeBase(
sources=[local_pdf_kb],
vector_db=LanceDb(
table_name="combined_documents",
uri="tmp/lancedb",
search_type=SearchType.vector,
embedder=OpenAIEmbedder(id="text-embedding-3-small"),
),
)
knowledge_base.load()
server_params = StdioServerParameters(
command="cmd",
args=[
"/c",
"npx",
"-y",
"@modelcontextprotocol/server-postgres",
"postgresql://postgres:postgres@192.168.1.7:5432/postgres",
],
env={},
)
# Create a client session to connect to the MCP server
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
postgres_tools = MCPTools(session=session)
await postgres_tools.initialize()
agent = Agent(
model=DeepSeek(id="deepseek-chat"),
markdown=True,
knowledge=knowledge_base,
search_knowledge=True,
show_tool_calls=True,
tools=[postgres_tools],
)
await agent.aprint_response("看下aq这个数据库", stream=True)
if __name__ == "__main__":
asyncio.run(main())