高级开发指南
概述
本指南面向需要深度定制毕昇平台功能的高级开发者,涵盖自定义组件开发、高级工作流设计、插件系统、API开发等核心技术内容。通过本指南,您将能够充分利用毕昇平台的扩展能力,构建满足特定业务需求的企业级AI应用。
自定义组件开发
毕昇平台支持开发自定义组件来扩展工作流的功能。组件是工作流的基本构建单元,每个组件封装特定的业务逻辑。
组件开发基础
1. 组件结构
一个标准的毕昇组件包含以下文件:
custom_components/
└── my_component/
├── __init__.py
├── component.py # 组件主逻辑
├── schema.py # 组件配置架构
├── frontend.py # 前端展示逻辑
└── README.md # 组件说明文档
2. 组件基类
所有自定义组件都需要继承BaseComponent基类:
from bisheng.components.base import BaseComponent
from bisheng.field_typing import Input, Output, Text, Integer
from typing import Optional
class MyCustomComponent(BaseComponent):
display_name = "我的自定义组件"
description = "这是一个示例自定义组件"
icon = "🔧"
inputs = [
Input(
display_name="输入文本",
name="input_text",
field_type=Text,
required=True,
placeholder="请输 入要处理的文本"
),
Input(
display_name="处理参数",
name="param",
field_type=Integer,
required=False,
value=10,
info="处理参数,范围1-100"
)
]
outputs = [
Output(
display_name="处理结果",
name="result",
field_type=Text
)
]
def build_config(self):
return {
"input_text": {"display_name": "输入文本"},
"param": {"display_name": "处理参数", "value": 10}
}
def build(self, input_text: str, param: Optional[int] = 10) -> str:
"""
组件的核心处理逻辑
"""
# 在这里实现您的业务逻辑
processed_text = self.process_text(input_text, param)
return processed_text
def process_text(self, text: str, param: int) -> str:
"""
具体的文本处理逻辑
"""
# 示例:简单的文本处理
if param > 50:
return text.upper()
else:
return text.lower()
3. 高级组件特性
异步处理
对于需要长时间执行的任务,可以使用异步组件:
import asyncio
from bisheng.components.base import BaseComponent
class AsyncProcessComponent(BaseComponent):
display_name = "异步处理组件"
async def build(self, input_data: str) -> str:
# 模拟长时间运行的任务
await asyncio.sleep(2)
# 执行实际的处理逻辑
result = await self.async_process(input_data)
return result
async def async_process(self, data: str) -> str:
# 异步处理逻辑
return f"异步处理结果: {data}"
流式输出
支持流式输出的组件,适用于文本生成等场景:
from typing import Iterator
class StreamingComponent(BaseComponent):
display_name = "流式输出组件"
def build(self, prompt: str) -> Iterator[str]:
"""
返回生成器,支持流式输出
"""
for i, chunk in enumerate(self.generate_text(prompt)):
yield chunk
def generate_text(self, prompt: str) -> Iterator[str]:
# 模拟流式文本生成
words = prompt.split()
for word in words:
yield f"{word} "
状态管理
组件可以维护内部状态,支持复杂的有状态处理:
class StatefulComponent(BaseComponent):
display_name = "有状态组件"
def __init__(self):
super().__init__()
self.state = {}
self.counter = 0
def build(self, input_data: str) -> dict:
self.counter += 1
# 更新状态
self.state[f"request_{self.counter}"] = {
"input": input_data,
"timestamp": datetime.now().isoformat(),
"processed": True
}
return {
"result": f"处理了第{self.counter}个请求",
"state_size": len(self.state)
}
高级组件开发模式
1. 复合组件
将多个简单组件组合成复杂的复合组件:
class CompositeComponent(BaseComponent):
display_name = "复合处理组件"
def __init__(self):
super().__init__()
# 初始化子组件
self.preprocessor = TextPreprocessor()
self.analyzer = TextAnalyzer()
self.formatter = ResultFormatter()
def build(self, raw_text: str) -> dict:
# 预处理
cleaned_text = self.preprocessor.clean(raw_text)
# 分析
analysis_result = self.analyzer.analyze(cleaned_text)
# 格式化输出
formatted_result = self.formatter.format(analysis_result)
return formatted_result
2. 插件化组件
支持动态加载插件的组件架构:
class PluginableComponent(BaseComponent):
display_name = "插件化组件"
def __init__(self):
super().__init__()
self.plugins = {}
self.load_plugins()
def load_plugins(self):
"""动态加载插件"""
plugin_dir = "plugins/"
for plugin_file in os.listdir(plugin_dir):
if plugin_file.endswith('.py'):
plugin_name = plugin_file[:-3]
plugin_module = importlib.import_module(f"plugins.{plugin_name}")
self.plugins[plugin_name] = plugin_module.Plugin()
def build(self, input_data: str, plugin_name: str = "default") -> str:
if plugin_name in self.plugins:
return self.plugins[plugin_name].process(input_data)
else:
return f"插件 {plugin_name} 未找到"
3. 配置驱动组件
通过配置文件驱动组件行为:
import yaml
class ConfigDrivenComponent(BaseComponent):
display_name = "配置驱动组件"
def __init__(self, config_path: str = "component_config.yaml"):
super().__init__()
self.config = self.load_config(config_path)
def load_config(self, config_path: str) -> dict:
with open(config_path, 'r', encoding='utf-8') as file:
return yaml.safe_load(file)
def build(self, input_data: str) -> str:
# 根据配置选择处理策略
strategy = self.config.get('processing_strategy', 'default')
if strategy == 'advanced':
return self.advanced_process(input_data)
elif strategy == 'simple':
return self.simple_process(input_data)
else:
return self.default_process(input_data)
def advanced_process(self, data: str) -> str:
# 高级处理逻辑
return f"高级处理: {data}"
def simple_process(self, data: str) -> str:
# 简单处理逻辑
return f"简单处理: {data}"
def default_process(self, data: str) -> str:
# 默认处理逻辑
return f"默认处理: {data}"
高级工作流设计
条件分支与循环
1. 复杂条件分支
创建包含多个条件的复杂分支逻辑:
class AdvancedConditionalFlow:
def design_flow(self):
"""设计复杂的条件分支工作流"""
return {
"nodes": [
{
"id": "input",
"type": "InputNode",
"config": {"input_type": "text"}
},
{
"id": "analyzer",
"type": "TextAnalyzer",
"config": {
"analysis_type": ["sentiment", "intent", "language"]
}
},
{
"id": "router",
"type": "ConditionalRouter",
"config": {
"conditions": [
{
"name": "positive_sentiment",
"expression": "sentiment_score > 0.7",
"next_node": "positive_handler"
},
{
"name": "negative_sentiment",
"expression": "sentiment_score < 0.3",
"next_node": "negative_handler"
},
{
"name": "neutral_sentiment",
"expression": "0.3 <= sentiment_score <= 0.7",
"next_node": "neutral_handler"
}
],
"default_node": "fallback_handler"
}
}
],
"edges": [
{"from": "input", "to": "analyzer"},
{"from": "analyzer", "to": "router"}
]
}
2. 循环处理工作流
实现循环处理逻辑,适用于批量数据处理:
class LoopProcessingFlow:
def design_batch_flow(self):
"""设计批量循环处理工作流"""
return {
"nodes": [
{
"id": "batch_input",
"type": "BatchInput",
"config": {"batch_size": 10}
},
{
"id": "loop_controller",
"type": "LoopController",
"config": {
"max_iterations": 100,
"continue_condition": "has_more_data"
}
},
{
"id": "item_processor",
"type": "ItemProcessor",
"config": {"processing_type": "transform"}
},
{
"id": "aggregator",
"type": "ResultAggregator",
"config": {"aggregation_method": "collect"}
}
],
"edges": [
{"from": "batch_input", "to": "loop_controller"},
{"from": "loop_controller", "to": "item_processor"},
{"from": "item_processor", "to": "aggregator"},
{"from": "aggregator", "to": "loop_controller", "condition": "has_more_items"}
]
}
并行处理工作流
1. 并行分支处理
创建并行处理的工作流,提高处理效率:
class ParallelProcessingFlow:
def design_parallel_flow(self):
"""设计并行处理工作流"""
return {
"nodes": [
{
"id": "input",
"type": "InputNode"
},
{
"id": "splitter",
"type": "DataSplitter",
"config": {"split_strategy": "parallel"}
},
{
"id": "branch_a",
"type": "ProcessorA",
"config": {"parallel": True}
},
{
"id": "branch_b",
"type": "ProcessorB",
"config": {"parallel": True}
},
{
"id": "branch_c",
"type": "ProcessorC",
"config": {"parallel": True}
},
{
"id": "merger",
"type": "ResultMerger",
"config": {"merge_strategy": "combine"}
}
],
"edges": [
{"from": "input", "to": "splitter"},
{"from": "splitter", "to": "branch_a"},
{"from": "splitter", "to": "branch_b"},
{"from": "splitter", "to": "branch_c"},
{"from": "branch_a", "to": "merger"},
{"from": "branch_b", "to": "merger"},
{"from": "branch_c", "to": "merger"}
]
}
2. 异步工作流
设计异步执行的工作流,适用于I/O密集型任务:
class AsyncWorkflow:
async def execute_async_flow(self, input_data):
"""异步执行工作流"""
# 异步并行执行多个任务
tasks = [
self.async_task_a(input_data),
self.async_task_b(input_data),
self.async_task_c(input_data)
]
# 等待所有任务完成
results = await asyncio.gather(*tasks)
# 合并结果
final_result = self.merge_results(results)
return final_result
async def async_task_a(self, data):
"""异步任务A"""
await asyncio.sleep(1) # 模拟I/O操作
return f"Task A result: {data}"
async def async_task_b(self, data):
"""异步任务B"""
await asyncio.sleep(2) # 模拟I/O操作
return f"Task B result: {data}"
async def async_task_c(self, data):
"""异步任务C"""
await asyncio.sleep(1.5) # 模拟I/O操作
return f"Task C result: {data}"
def merge_results(self, results):
"""合并异步任务结果"""
return {
"task_a": results[0],
"task_b": results[1],
"task_c": results[2],
"timestamp": datetime.now().isoformat()
}
工作流模板系统
1. 模板定义
创建可重用的工作流模板:
class WorkflowTemplate:
def __init__(self, template_name: str):
self.template_name = template_name
self.template_config = self.load_template()
def load_template(self) -> dict:
"""加载工作流模板配置"""
template_path = f"templates/{self.template_name}.yaml"
with open(template_path, 'r', encoding='utf-8') as file:
return yaml.safe_load(file)
def instantiate(self, parameters: dict) -> dict:
"""根据参数实例化工作流"""
# 深拷贝模板配置
instance_config = copy.deepcopy(self.template_config)
# 替换参数占位符
instance_config = self.replace_placeholders(instance_config, parameters)
return instance_config
def replace_placeholders(self, config: dict, parameters: dict) -> dict:
"""替换配置中的参数占位符"""
config_str = json.dumps(config)
for key, value in parameters.items():
placeholder = f"${{{key}}}"
config_str = config_str.replace(placeholder, str(value))
return json.loads(config_str)
2. 模板配置示例
# templates/rag_qa_template.yaml
name: "RAG知识库问答模板"
description: "基于检索增强生成的知识库问答工作流模板"
version: "1.0"
parameters:
- name: "knowledge_base_id"
type: "string"
required: true
description: "知识库ID"
- name: "llm_model"
type: "string"
required: true
description: "使用的大语言模型"
- name: "top_k"
type: "integer"
default: 5
description: "检索返回的文档数量"
nodes:
- id: "user_input"
type: "InputNode"
config:
input_type: "text"
placeholder: "请输入您的问题"
- id: "knowledge_retriever"
type: "KnowledgeRetriever"
config:
knowledge_base_id: "${knowledge_base_id}"
top_k: ${top_k}
similarity_threshold: 0.7
- id: "context_builder"
type: "ContextBuilder"
config:
template: |
基于以下知识库内容回答用户问题:
知识库内容:
{context}
用户问题:{question}
- id: "llm_generator"
type: "LLMGenerator"
config:
model: "${llm_model}"
temperature: 0.7
max_tokens: 500
edges:
- from: "user_input"
to: "knowledge_retriever"
- from: "knowledge_retriever"
to: "context_builder"
- from: "context_builder"
to: "llm_generator"
插件系统
毕昇平台支持插件系统,允许开发者创建可重用的功能模块。
插件开发基础
1. 插件接口定义
from abc import ABC, abstractmethod
from typing import Any, Dict, List
class PluginInterface(ABC):
"""插件接口基类"""
@property
@abstractmethod
def name(self) -> str:
"""插件名称"""
pass
@property
@abstractmethod
def version(self) -> str:
"""插件版本"""
pass
@property
@abstractmethod
def description(self) -> str:
"""插件描述"""
pass
@abstractmethod
def initialize(self, config: Dict[str, Any]) -> None:
"""初始化插件"""
pass
@abstractmethod
def process(self, input_data: Any) -> Any:
"""处理数据的主要方法"""
pass
@abstractmethod
def cleanup(self) -> None:
"""清理资源"""
pass