跳到主要内容

向量数据库最佳实践

本文档总结了向量数据库使用和优化的最佳实践。

数据管理最佳实践

1. 向量维度选择

合适的维度设置

# 向量维度选择
dimension_selection = {
"原则": [
"根据模型输出维度选择",
"平衡精度和性能",
"考虑存储成本",
"测试不同维度效果"
],
"建议": {
"小规模": "128-256维",
"中等规模": "384-512维",
"大规模": "768-1536维"
}
}

2. 向量归一化

归一化处理

import numpy as np

def normalize_vector(vector):
"""向量归一化"""
norm = np.linalg.norm(vector)
if norm == 0:
return vector
return vector / norm

# 批量归一化
def normalize_vectors(vectors):
"""批量归一化向量"""
norms = np.linalg.norm(vectors, axis=1, keepdims=True)
norms[norms == 0] = 1 # 避免除零
return vectors / norms

3. 数据预处理

高效预处理流程

def preprocess_vectors(vectors, metadata=None):
"""预处理向量数据"""
# 1. 归一化
normalized = normalize_vectors(vectors)

# 2. 验证维度
assert normalized.shape[1] > 0, "向量维度必须大于0"

# 3. 处理NaN和Inf
normalized = np.nan_to_num(normalized, nan=0.0, posinf=1.0, neginf=-1.0)

# 4. 添加元数据
if metadata:
# 关联元数据
pass

return normalized

索引优化最佳实践

1. 索引类型选择

选择合适的索引

# 索引类型选择
index_selection = {
"HNSW": {
"适用": "高维向量,快速检索",
"特点": "构建快,检索快",
"内存": "中等"
},
"IVF": {
"适用": "大规模数据",
"特点": "内存占用小",
"检索": "需要训练"
},
"Flat": {
"适用": "小规模数据",
"特点": "精确检索",
"内存": "大"
}
}

2. 索引参数调优

HNSW参数优化

# HNSW参数配置
hnsw_config = {
"M": 16, # 每个节点的连接数
"ef_construction": 200, # 构建时的搜索范围
"ef_search": 50, # 检索时的搜索范围
"原则": [
"M越大,精度越高,但构建越慢",
"ef_construction越大,精度越高",
"ef_search越大,精度越高,但检索越慢"
]
}

3. 索引构建

高效构建索引

import faiss

def build_index(vectors, index_type="HNSW", dimension=768):
"""构建向量索引"""
# 创建索引
if index_type == "HNSW":
index = faiss.IndexHNSWFlat(dimension, 16)
index.hnsw.ef_construction = 200
elif index_type == "IVF":
nlist = 100
quantizer = faiss.IndexFlatL2(dimension)
index = faiss.IndexIVFFlat(quantizer, dimension, nlist)
index.train(vectors)
else:
index = faiss.IndexFlatL2(dimension)

# 添加向量
index.add(vectors)

return index

检索优化最佳实践

1. 检索策略

高效检索

def efficient_search(index, query_vector, top_k=10, ef_search=50):
"""高效检索"""
# 设置搜索参数
if hasattr(index, 'hnsw'):
index.hnsw.ef_search = ef_search

# 归一化查询向量
query_vector = normalize_vector(query_vector)

# 检索
distances, indices = index.search(query_vector.reshape(1, -1), top_k)

return distances[0], indices[0]

2. 批量检索

批量优化

def batch_search(index, query_vectors, top_k=10, batch_size=100):
"""批量检索"""
results = []

for i in range(0, len(query_vectors), batch_size):
batch = query_vectors[i:i+batch_size]
batch_normalized = normalize_vectors(batch)

distances, indices = index.search(batch_normalized, top_k)
results.append((distances, indices))

return results

3. 混合检索

结合多种方法

def hybrid_search(vector_index, keyword_index, query_vector, query_text, top_k=10):
"""混合检索"""
# 向量检索
vector_results = vector_index.search(query_vector, top_k * 2)

# 关键词检索
keyword_results = keyword_index.search(query_text, top_k * 2)

# 结果融合
combined_results = merge_results(vector_results, keyword_results, top_k)

return combined_results

性能优化最佳实践

1. 缓存策略

智能缓存

from functools import lru_cache
import hashlib

class VectorCache:
def __init__(self, max_size=1000):
self.cache = {}
self.max_size = max_size

def get_cache_key(self, vector):
"""生成缓存键"""
vector_str = vector.tobytes()
return hashlib.md5(vector_str).hexdigest()

def get(self, vector):
"""获取缓存结果"""
key = self.get_cache_key(vector)
return self.cache.get(key)

def set(self, vector, results):
"""设置缓存"""
if len(self.cache) >= self.max_size:
# 删除最旧的
self.cache.pop(next(iter(self.cache)))

key = self.get_cache_key(vector)
self.cache[key] = results

2. 并发处理

并行检索

from concurrent.futures import ThreadPoolExecutor

def parallel_search(index, query_vectors, top_k=10, num_workers=4):
"""并行检索"""
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [
executor.submit(efficient_search, index, qv, top_k)
for qv in query_vectors
]
results = [future.result() for future in futures]

return results

总结

遵循这些最佳实践可以:

  1. 提高检索精度:通过合适的索引和参数
  2. 优化性能:通过缓存和并发
  3. 降低成本:通过合理的资源使用
  4. 提升用户体验:通过快速响应