工作原理
向量数据库的工作流程概述
向量数据库的核心工作流程包括四个主要阶段:数据向量化、索引构建、向量存储、相似性搜索。每个阶段都涉及复杂的算法和数据结构,共同实现高效的语义搜索功能。
1. 数据向量化过程
1.1 向量化模型
向量化是将原始数据转换为数值向量的过程,不同类型的数据需要使用相应的嵌入模型:
文本向量化
# 使用OpenAI嵌入模型
import openai
client = openai.OpenAI(api_key="your-api-key")
def text_to_vector(text):
response = client.embeddings.create(
input=text,
model="text-embedding-ada-002"
)
return response.data[0].embedding
# 示例
text = "向量数据库是AI时代的重要基础设施"
vector = text_to_vector(text)
print(f"向量维度: {len(vector)}") # 1536维
图像向量化
# 使用CLIP模型进行图像向量化
import torch
import clip
from PIL import Image
device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device)
def image_to_vector(image_path):
image = preprocess(Image.open(image_path)).unsqueeze(0).to(device)
with torch.no_grad():
image_features = model.encode_image(image)
return image_features.cpu().numpy().flatten()
1.2 向量质量优化
为了提高向量质量和搜索效果,需要对向量进行预处理:
向量归一化
import numpy as np
def normalize_vector(vector):
"""L2归一化"""
norm = np.linalg.norm(vector)
if norm == 0:
return vector
return vector / norm
# 示例
original_vector = np.array([3.0, 4.0, 0.0])
normalized_vector = normalize_vector(original_vector)
print(f"原始向量: {original_vector}")
print(f"归一化后: {normalized_vector}")
print(f"向量长度: {np.linalg.norm(normalized_vector)}")
2. 索引构建机制
2.1 HNSW索引原理
HNSW(Hierarchical Navigable Small World)是目前最流行的向量索引算法之一:
层次结构
- 第0层:包含所有向量节点(最密集)
- 上层:稀疏采样的节点(用于快速定位)
- 连接策略:每个节点与其最近邻居建立连接
class SimpleHNSW:
def __init__(self, max_m=16, max_m_l=16, ml=1/np.log(2)):
self.max_m = max_m # 最大连接数
self.max_m_l = max_m_l # 层0最大连接数
self.ml = ml # 层数分布参数
self.graphs = {} # 每层的图结构
self.entry_point = None # 入口点
def get_random_level(self):
"""随机确定节点层数"""
level = int(-np.log(np.random.random()) * self.ml)
return level
2.2 IVF索引原理
IVF(Inverted File)索引使用聚类来组织向量数据:
from sklearn.cluster import KMeans
class IVFIndex:
def __init__(self, n_clusters=1000):
self.n_clusters = n_clusters
self.kmeans = KMeans(n_clusters=n_clusters)
self.inverted_lists = {}
def build_index(self, vectors):
"""构建IVF索引"""
# 训练聚类中心
self.kmeans.fit(vectors)
cluster_centers = self.kmeans.cluster_centers_
# 分配向量到聚类
labels = self.kmeans.predict(vectors)
# 构建倒排表
for i, label in enumerate(labels):
if label not in self.inverted_lists:
self.inverted_lists[label] = []
self.inverted_lists[label].append((i, vectors[i]))
return cluster_centers