目录
- 背景
- 特殊说明
- 源码解读
- 工具类
- FieldsIndexWriter
- 核心类
- TermVectorsConsumer
- Lucene90CompressingTermVectorsWriter
- 父类
- TermVectorsWriter
- 成员变量
- 内部类
- DocData
- FieldData
- 构造方法
- 核心方法
- startDocument
- startField
- startTerm
- addPosition
- finishField
- finishDocument
- triggerFlush
- 构建chunk
- flush
- flushNumFields
- flushFieldNums
- flushFields
- flushFlags
- flushNumTerms
- flushTermLengths
- flushTermFreqs
- flushPositions
- flushOffsets
- flushPayloadLengths
- finish
- TermVectorsConsumerPerField
- 索引文件格式
- tvm
- 字段详解
- Header
- PackedItsVersion
- ChunkSize
- NumChunks
- NumDirtyChunks
- NumDirtyDocs
- NumDocs
- BlockShift
- TotalChunks + 1
- tvxDocStartFP
- DocBlockMeta
- tvxOffsetStartFP
- OffsetBlockMeta
- SPEndPoint
- MaxPointer
- Footer
- tvd 字段详解
- Header
- chunk
- Footer
- tvx 字段详解
- Header
- ChunkStartDocIDs
- ChunkTVDOffsetsBlock
- Footer
背景
词向量存储的信息内容其实和倒排(Posting)是一样的,也是每个term所出现的文档列表以及在文档中的位置信息,区别在于存储结构的不同:
Posting的存储结构是:field -> term -> doc -> freq/pos/offset
也就是说Posting是从字段定位到term,再定位到文档,获取位置信息。
TermVector的存储结构是:doc -> field -> term -> freq/pos/offset
TermVector是从文档定位到字段,再定位term,获取位置信息。
从上面的介绍中,我们可以看出一些基本的规律:
- query查询是通过Posting来查找匹配的文档的,因为query就是从field中查找匹配的term,顺着Posting的结构,下一步就能得到所有匹配的文档了。
- 从指定文档中获取指定字段单个term的匹配位置,则TermVector(doc查找1次,field查找1次,term查找1次,)和Posting(field查找1次,term查找1次,doc查找1次)效率差不多。
- 从指定文档中获取指定字段多个term的匹配位置,则TermVector(doc查找1次,field查找1次,term查找n次)性能比Posting(field查找1次,term查找n次,doc查找n次)好。
- 多字段多term查询位置信息也是TermVector性能比较好,大家可以自行分析。
从上面的规律可以看出,检索过程确实使用的Posting,毕竟这是真的倒排索引。TermVector适用于从特定文档中获取某些字段中term的位置信息,典型应用就是高亮:获取特定的文档中相关term的位置。
特殊说明
词向量构建涉及到几个类之间的关系
词向量的构建中主要有3个类:
- TermVectorsConsumer:调度词向量的构建的最上层逻辑,负责创建TermVectorsConsumerPerField和Lucene90CompressingTermVectorsWriter。
- TermVectorsConsumerPerField:每个开启词向量构建的字段都对应一个TermVectorsConsumerPerField,在TermVectorsConsumerPerField中是把词向量的倒排信息临时存在内存buffer中,在完成一个document的处理之后,会把这个document的所有词向量数据都序列化到Lucene90CompressingTermVectorsWriter的缓存中,然后重置buffer,等到处理下一个document。
- Lucene90CompressingTermVectorsWriter:负责组织词向量的索引文件格式并持久化。
term的存储
在词向量中的term是按序存储的,但是每个Field中的所有的term,除了完整的存储第一个term之外,其他term都是存储除了跟前一个term的最长公共前缀的剩余的后缀部分。
chunk的生成条件
词向量是使用chunk来划分数据的,生成chunk的条件满足以下二者其一即可:
- 缓存中的doc数量超出设置的阈值
- suffix的数据总量超出了设置的阈值
词向量的索引文件
词向量最终构建生成3个索引文件:
- tvd:按chunk存储的term,freq,position,offset,payload信息
- tvx:chunk的索引文件,记录的是每个chunk的起始docID,以及每个chunk的起始位置,方便根据docID快速定位到chunk。
- tvm:词向量索引文件的元信息,用来读取词向量使用的。
源码解读
注意:本文源码基于lucene-9.1.0版本
工具类
FieldsIndexWriter
FieldsIndexWriter这个工具类,以后介绍正排索引文件也会用到,它主要是用来生成所有chunk中的起始doc编号和chunk在数据文件中的位置信息,方便读取的时候快速定位doc所属的chunk,并从文件中读取chunk。总体逻辑是先使用临时文件存储前面说的两个信息,在真正生成索引文件的时候,使用DirectMonotonicWriter进行压缩存储,减小索引文件大小。
public final class FieldsIndexWriter implements Closeable { | |
static final int VERSION_START =; | |
static final int VERSION_CURRENT =; | |
private final Directory dir; | |
// 下面这些信息都是用来创建真正索引文件名的 | |
private final String name; | |
private final String suffix; | |
private final String extension; | |
private final String codecName; | |
private final byte[] id; | |
// DirectMonotonicWriter 所需的参数 | |
private final int blockShift; | |
private final IOContext ioContext; | |
// 临时文件,用来保存所有chunk中的文档数 | |
private IndexOutput docsOut; | |
// 临时文件,用来保存所有chunk在数据文件中的起始位置 | |
private IndexOutput filePointersOut; | |
// doc总数 | |
private int totalDocs; | |
// chunk总数 | |
private int totalChunks; | |
// 前一个chunk在tvd索引文件中的起始位置 | |
private long previousFP; | |
// 添加一个新的index,index就是用来定位doc属于哪个chunk,以及chunk在数据文件中的起始位置。 | |
// numDocs是chunk中的文档总数,后面真正序列化到正式的索引文件会通过换算,得到的是每个chunk的起始docID。 | |
void writeIndex(int numDocs, long startPointer) throws IOException { | |
assert startPointer >= previousFP; | |
docsOut.writeVInt(numDocs); | |
filePointersOut.writeVLong(startPointer - previousFP); | |
previousFP = startPointer; | |
totalDocs += numDocs; | |
totalChunks++; | |
} | |
// metaOut是元信息索引文件 | |
void finish(int numDocs, long maxPointer, IndexOutput metaOut) throws IOException { | |
if (numDocs != totalDocs) { | |
throw new IllegalStateException("Expected " + numDocs + " docs, but got " + totalDocs); | |
} | |
// 完成临时文件的写入 | |
CodecUtil.writeFooter(docsOut); | |
CodecUtil.writeFooter(filePointersOut); | |
IOUtils.close(docsOut, filePointersOut); | |
// 创建真正的索引文件 | |
try (IndexOutput dataOut = | |
dir.createOutput(IndexFileNames.segmentFileName(name, suffix, extension), ioContext)) { | |
CodecUtil.writeIndexHeader(dataOut, codecName + "Idx", VERSION_CURRENT, id, suffix); | |
// chunk中的doc总数 | |
metaOut.writeInt(numDocs); | |
// 后面使用DirectMonotonicWriter压缩的时候需要的参数 | |
metaOut.writeInt(blockShift); | |
// 使用DirectMonotonicWriter写入的数据总数,为什么加,看下面的具体写入逻辑。 | |
metaOut.writeInt(totalChunks +); | |
// chunk索引文件中ChunkStartDocIDs的起始位置 | |
metaOut.writeLong(dataOut.getFilePointer()); | |
try (ChecksumIndexInput docsIn = | |
dir.openChecksumInput(docsOut.getName(), IOContext.READONCE)) { | |
CodecUtil.checkHeader(docsIn, codecName + "Docs", VERSION_CURRENT, VERSION_CURRENT); | |
Throwable priorE = null; | |
try { | |
// 压缩存储所有chunk的起始doc编号 | |
final DirectMonotonicWriter docs = | |
DirectMonotonicWriter.getInstance(metaOut, dataOut, totalChunks +, blockShift); | |
long doc =; | |
docs.add(doc); // 第一个chunk的起始doc编号肯定是,这也是上面totalChunks + 1的原因之一。 | |
for (int i =; i < totalChunks; ++i) { | |
doc += docsIn.readVInt(); | |
docs.add(doc); | |
} | |
docs.finish(); | |
if (doc != totalDocs) { | |
throw new CorruptIndexException("Docs don't add up", docsIn); | |
} | |
} catch (Throwable e) { | |
priorE = e; | |
} finally { | |
CodecUtil.checkFooter(docsIn, priorE); | |
} | |
} | |
// 删除临时文件 | |
dir.deleteFile(docsOut.getName()); | |
docsOut = null; | |
// chunk索引文件中ChunkOffsets的起始位置 | |
metaOut.writeLong(dataOut.getFilePointer()); | |
try (ChecksumIndexInput filePointersIn = | |
dir.openChecksumInput(filePointersOut.getName(), IOContext.READONCE)) { | |
CodecUtil.checkHeader( | |
filePointersIn, codecName + "FilePointers", VERSION_CURRENT, VERSION_CURRENT); | |
Throwable priorE = null; | |
try { | |
// 压缩存储所有chunk在tvd文件中的起始位置 | |
final DirectMonotonicWriter filePointers = | |
DirectMonotonicWriter.getInstance(metaOut, dataOut, totalChunks +, blockShift); | |
long fp =; | |
for (int i =; i < totalChunks; ++i) { | |
fp += filePointersIn.readVLong(); | |
filePointers.add(fp); | |
} | |
if (maxPointer < fp) { | |
throw new CorruptIndexException("File pointers don't add up", filePointersIn); | |
} | |
filePointers.add(maxPointer); // 上面totalChunks +的原因之二。 | |
filePointers.finish(); | |
} catch (Throwable e) { | |
priorE = e; | |
} finally { | |
CodecUtil.checkFooter(filePointersIn, priorE); | |
} | |
} | |
dir.deleteFile(filePointersOut.getName()); | |
filePointersOut = null; | |
metaOut.writeLong(dataOut.getFilePointer()); | |
metaOut.writeLong(maxPointer); | |
CodecUtil.writeFooter(dataOut); | |
} | |
} | |
} |
核心类
TermVectorsConsumer
class TermVectorsConsumer extends TermsHash { | |
protected final Directory directory; | |
protected final SegmentInfo info; | |
protected final Codec codec; | |
// 词向量持久化 | |
TermVectorsWriter writer; | |
/** Scratch term used by TermVectorsConsumerPerField.finishDocument. */ | |
final BytesRef flushTerm = new BytesRef(); | |
// 用来从TermVectorsConsumerPerField的bytepool中读取position信息 | |
final ByteSliceReader vectorSliceReaderPos = new ByteSliceReader(); | |
final ByteSliceReader vectorSliceReaderOff = new ByteSliceReader(); | |
private boolean hasVectors; | |
private int numVectorFields; | |
int lastDocID; | |
// 每一个开启词向量构建的Field,都有一个TermVectorsConsumerPerField,当一个doc处理完之后会把所有的 | |
// TermVectorsConsumerPerField都序列化到LuceneCompressingTermVectorsWriter中,然后重置等待处理下一个doc | |
private TermVectorsConsumerPerField[] perFields = new TermVectorsConsumerPerField[]; | |
Accountable accountable = Accountable.NULL_ACCOUNTABLE; | |
TermVectorsConsumer( | |
final IntBlockPool.Allocator intBlockAllocator, | |
final ByteBlockPool.Allocator byteBlockAllocator, | |
Directory directory, | |
SegmentInfo info, | |
Codec codec) { | |
super(intBlockAllocator, byteBlockAllocator, Counter.newCounter(), null); | |
this.directory = directory; | |
this.info = info; | |
this.codec = codec; | |
} | |
void flush( | |
Map<String, TermsHashPerField> fieldsToFlush, | |
final SegmentWriteState state, | |
Sorter.DocMap sortMap, | |
NormsProducer norms) | |
throws IOException { | |
if (writer != null) { | |
int numDocs = state.segmentInfo.maxDoc(); | |
try { | |
// 把不存在词向量Filed的文档填充下 | |
fill(numDocs); | |
assert state.segmentInfo != null; | |
// 触发词向量索引文件持久化落盘 | |
writer.finish(numDocs); | |
} finally { | |
IOUtils.close(writer); | |
} | |
} | |
} | |
/** | |
* Fills in no-term-vectors for all docs we haven't seen since the last doc that had term vectors. | |
*/ | |
void fill(int docID) throws IOException { | |
while (lastDocID < docID) { | |
writer.startDocument(); | |
writer.finishDocument(); | |
lastDocID++; | |
} | |
} | |
// 创建 LuceneCompressingTermVectorsWriter | |
void initTermVectorsWriter() throws IOException { | |
if (writer == null) { | |
IOContext context = new IOContext(new FlushInfo(lastDocID, bytesUsed.get())); | |
writer = codec.termVectorsFormat().vectorsWriter(directory, info, context); | |
lastDocID =; | |
accountable = writer; | |
} | |
} | |
void setHasVectors() { | |
hasVectors = true; | |
} | |
void finishDocument(int docID) throws IOException { | |
// 不存在词向量,直接返回 | |
if (!hasVectors) { | |
return; | |
} | |
// 按字段名排序TermVectorsConsumerPerField | |
ArrayUtil.introSort(perFields,, numVectorFields); | |
initTermVectorsWriter(); | |
// 为了确保doc是连续,则把lastDocID到docID的空白填充下 | |
fill(docID); | |
// 开始序列化 | |
writer.startDocument(numVectorFields); | |
// 处理document中的所有Field,会把相关的词向量数据写到writer的缓存中 | |
for (int i =; i < numVectorFields; i++) { | |
perFields[i].finishDocument(); | |
} | |
// 结束一个document词向量的序列化 | |
writer.finishDocument(); | |
assert lastDocID == docID : "lastDocID=" + lastDocID + " docID=" + docID; | |
lastDocID++; | |
super.reset(); | |
// 重置TermVectorsConsumerPerField 数组,等待处理下一个document | |
resetFields(); | |
} | |
public void abort() { | |
try { | |
super.abort(); | |
} finally { | |
IOUtils.closeWhileHandlingException(writer); | |
reset(); | |
} | |
} | |
void resetFields() { | |
Arrays.fill(perFields, null); // don't hang onto stuff from previous doc | |
numVectorFields =; | |
} | |
public TermsHashPerField addField(FieldInvertState invertState, FieldInfo fieldInfo) { | |
return new TermVectorsConsumerPerField(invertState, this, fieldInfo); | |
} | |
// 当结束一个Field的所有term的处理之后,就把TermVectorsConsumerPerField存在perFields中, | |
// 等待把数据都序列化到LuceneCompressingTermVectorsWriter中 | |
void addFieldToFlush(TermVectorsConsumerPerField fieldToFlush) { | |
if (numVectorFields == perFields.length) { | |
int newSize = ArrayUtil.oversize(numVectorFields +, RamUsageEstimator.NUM_BYTES_OBJECT_REF); | |
TermVectorsConsumerPerField[] newArray = new TermVectorsConsumerPerField[newSize]; | |
System.arraycopy(perFields,, newArray, 0, numVectorFields); | |
perFields = newArray; | |
} | |
perFields[numVectorFields++] = fieldToFlush; | |
} | |
void startDocument() { | |
resetFields(); | |
numVectorFields =; | |
} | |
} |
Lucene90CompressingTermVectorsWriter
Lucene90CompressingTermVectorsWriter是生成词向量索引文件的核心类,主要负责按照特定的索引文件格式组织数据并持久化。
父类
TermVectorsWriter
Lucene90CompressingTermVectorsWriter的父类有大量的抽象方法,剩下一个模板方法addProx用来添加term在field中的所有的位置信息。
public abstract class TermVectorsWriter implements Closeable, Accountable { | |
/** Sole constructor. (For invocation by subclass constructors, typically implicit.) */ | |
protected TermVectorsWriter() {} | |
// 开始持久化一个doc的所有词向量 | |
public abstract void startDocument(int numVectorFields) throws IOException; | |
// 结束一个文档持久化的时候调用 | |
public void finishDocument() throws IOException {}; | |
// 开始持久化一个Field | |
public abstract void startField( | |
FieldInfo info, int numTerms, boolean positions, boolean offsets, boolean payloads) | |
throws IOException; | |
// 结束一个Field的处理 | |
public void finishField() throws IOException {}; | |
// 开始持久化一个term的倒排信息 | |
public abstract void startTerm(BytesRef term, int freq) throws IOException; | |
// 结束一个term的处理 | |
public void finishTerm() throws IOException {} | |
// 构建term的一个position信息 | |
public abstract void addPosition(int position, int startOffset, int endOffset, BytesRef payload) | |
throws IOException; | |
// 所有文档处理完成,在close方法调用之前,调用finish,numDoc是处理的文档总数 | |
public abstract void finish(int numDocs) throws IOException; | |
// 从positions和offsets中读取所有的位置信息,其实就是从TermVectorsConsumerPerField#bytePool中读取 | |
public void addProx(int numProx, DataInput positions, DataInput offsets) throws IOException { | |
int position =; | |
int lastOffset =; | |
BytesRefBuilder payload = null; | |
for (int i =; i < numProx; i++) { | |
final int startOffset; | |
final int endOffset; | |
final BytesRef thisPayload; | |
if (positions == null) { | |
position = -; | |
thisPayload = null; | |
} else { | |
int code = positions.readVInt(); | |
position += code >>>; | |
if ((code &) != 0) { | |
final int payloadLength = positions.readVInt(); | |
if (payload == null) { | |
payload = new BytesRefBuilder(); | |
} | |
payload.grow(payloadLength); | |
positions.readBytes(payload.bytes(),, payloadLength); | |
payload.setLength(payloadLength); | |
thisPayload = payload.get(); | |
} else { | |
thisPayload = null; | |
} | |
} | |
if (offsets == null) { | |
startOffset = endOffset = -; | |
} else { | |
startOffset = lastOffset + offsets.readVInt(); | |
endOffset = startOffset + offsets.readVInt(); | |
lastOffset = endOffset; | |
} | |
// 子类实现的真正的添加 | |
addPosition(position, startOffset, endOffset, thisPayload); | |
} | |
} | |
// 删除了一些跟merge相关的方法,以后介绍merge的时候再说 | |
public abstract void close() throws IOException; | |
} |
成员变量
// sement的名称 | |
private final String segment; | |
// 生成tvx索引文件 | |
private FieldsIndexWriter indexWriter; | |
// metaStream:生成tvm索引文件 | |
// vectorStream:生成tvd索引文件 | |
private IndexOutput metaStream, vectorsStream; | |
// 压缩算法 | |
private final CompressionMode compressionMode; | |
private final Compressor compressor; | |
// tvx中的chunk大小是^chunkSize | |
private final int chunkSize; | |
// chunk总数 | |
private long numChunks; | |
// 如果一个chunk中包含的doc信息是不完整的,则算一次 | |
private long numDirtyChunks; | |
// 在dirtyChunk中的doc总数 | |
private long numDirtyDocs; | |
// 处理的doc总数 | |
private int numDocs; | |
// 构建过程中暂时存储的DocData,触发flush的话就会持久化 | |
private final Deque<DocData> pendingDocs; | |
// 当前正在处理的doc | |
private DocData curDoc; | |
// 当前正在处理的field | |
private FieldData curField; | |
// 上一个处理的term | |
private final BytesRef lastTerm; | |
// 全局临时存储的buf | |
private int[] positionsBuf, startOffsetsBuf, lengthsBuf, payloadLengthsBuf; | |
// 存储后缀 | |
private final ByteBuffersDataOutput termSuffixes; | |
// 存储payload信息 | |
private final ByteBuffersDataOutput payloadBytes; | |
// 批量整型的压缩工具 | |
private final BlockPackedWriter writer; | |
// 一个chunk中最多的文档数 | |
private final int maxDocsPerChunk; | |
private final ByteBuffersDataOutput scratchBuffer = ByteBuffersDataOutput.newResettableInstance(); |
内部类
DocData
表示当前要序列化的一个doc的所有的词向量数据信息。
private class DocData { | |
// doc中有多少个field | |
final int numFields; | |
// 每个field的词向量信息 | |
final Deque<FieldData> fields; | |
// 当前doc在全局buffer(positionsBuf, startOffsetsBuf, payloadLengthsBuf)中的起始位置 | |
final int posStart, offStart, payStart; | |
DocData(int numFields, int posStart, int offStart, int payStart) { | |
this.numFields = numFields; | |
this.fields = new ArrayDeque<>(numFields); | |
this.posStart = posStart; | |
this.offStart = offStart; | |
this.payStart = payStart; | |
} | |
// 新增一个field | |
FieldData addField( | |
int fieldNum, int numTerms, boolean positions, boolean offsets, boolean payloads) { | |
final FieldData field; | |
if (fields.isEmpty()) { | |
field = | |
new FieldData( | |
fieldNum, numTerms, positions, offsets, payloads, posStart, offStart, payStart); | |
} else { | |
final FieldData last = fields.getLast(); | |
// 计算当前field的一些起始位置,也就是前一个field的起始位置+前一个field的所有的数据量 | |
final int posStart = last.posStart + (last.hasPositions ? last.totalPositions :); | |
final int offStart = last.offStart + (last.hasOffsets ? last.totalPositions :); | |
final int payStart = last.payStart + (last.hasPayloads ? last.totalPositions :); | |
field = | |
new FieldData( | |
fieldNum, numTerms, positions, offsets, payloads, posStart, offStart, payStart); | |
} | |
fields.add(field); | |
return field; | |
} | |
} |
FieldData
存储一个field的所有的词向量的所需的数据。
private class FieldData { | |
final boolean hasPositions, hasOffsets, hasPayloads; | |
// flags是个混合标记位,标记是否需要构建position,offset,payload | |
final int fieldNum, flags, numTerms; | |
// freqs:存储的是每个term的频率 | |
// prefixLengths:存储的是当前term和前一个term的公共前缀的长度 | |
// suffixLengths:存储的是除了当前term和前一个term的公共前缀的剩余部分的长度 | |
final int[] freqs, prefixLengths, suffixLengths; | |
// 当前Field的position,offset,payload数据在全局buf中的起始位置 | |
final int posStart, offStart, payStart; | |
int totalPositions; | |
// 当前处理的是第几个term | |
int ord; | |
FieldData( | |
int fieldNum, | |
int numTerms, | |
boolean positions, | |
boolean offsets, | |
boolean payloads, | |
int posStart, | |
int offStart, | |
int payStart) { | |
this.fieldNum = fieldNum; | |
this.numTerms = numTerms; | |
this.hasPositions = positions; | |
this.hasOffsets = offsets; | |
this.hasPayloads = payloads; | |
this.flags = | |
(positions ? POSITIONS :) | (offsets ? OFFSETS : 0) | (payloads ? PAYLOADS : 0); | |
this.freqs = new int[numTerms]; | |
this.prefixLengths = new int[numTerms]; | |
this.suffixLengths = new int[numTerms]; | |
this.posStart = posStart; | |
this.offStart = offStart; | |
this.payStart = payStart; | |
totalPositions =; | |
ord =; | |
} | |
// 新增一个term | |
// prefixLength:和前一个term的最长公共前缀 | |
// suffixLength:除了prefix剩下的就是suffix | |
void addTerm(int freq, int prefixLength, int suffixLength) { | |
freqs[ord] = freq; | |
prefixLengths[ord] = prefixLength; | |
suffixLengths[ord] = suffixLength; | |
++ord; | |
} | |
// 为当前处理的term新增一个位置信息数据,数据都是暂存在全局的buffer中 | |
void addPosition(int position, int startOffset, int length, int payloadLength) { | |
if (hasPositions) { | |
if (posStart + totalPositions == positionsBuf.length) { | |
positionsBuf = ArrayUtil.grow(positionsBuf); | |
} | |
positionsBuf[posStart + totalPositions] = position; | |
} | |
if (hasOffsets) { | |
if (offStart + totalPositions == startOffsetsBuf.length) { | |
final int newLength = ArrayUtil.oversize(offStart + totalPositions,); | |
startOffsetsBuf = ArrayUtil.growExact(startOffsetsBuf, newLength); | |
lengthsBuf = ArrayUtil.growExact(lengthsBuf, newLength); | |
} | |
startOffsetsBuf[offStart + totalPositions] = startOffset; | |
lengthsBuf[offStart + totalPositions] = length; | |
} | |
if (hasPayloads) { | |
if (payStart + totalPositions == payloadLengthsBuf.length) { | |
payloadLengthsBuf = ArrayUtil.grow(payloadLengthsBuf); | |
} | |
payloadLengthsBuf[payStart + totalPositions] = payloadLength; | |
} | |
++totalPositions; | |
} | |
} |
构造方法
LuceneCompressingTermVectorsWriter( | |
Directory directory, | |
SegmentInfo si, | |
String segmentSuffix, | |
IOContext context, | |
String formatName, | |
CompressionMode compressionMode, | |
int chunkSize, | |
int maxDocsPerChunk, | |
int blockShift) | |
throws IOException { | |
assert directory != null; | |
this.segment = si.name; | |
this.compressionMode = compressionMode; | |
this.compressor = compressionMode.newCompressor(); | |
this.chunkSize = chunkSize; | |
this.maxDocsPerChunk = maxDocsPerChunk; | |
numDocs =; | |
pendingDocs = new ArrayDeque<>(); | |
termSuffixes = ByteBuffersDataOutput.newResettableInstance(); | |
payloadBytes = ByteBuffersDataOutput.newResettableInstance(); | |
lastTerm = new BytesRef(ArrayUtil.oversize(, 1)); | |
boolean success = false; | |
try { | |
metaStream = | |
directory.createOutput( | |
IndexFileNames.segmentFileName(segment, segmentSuffix, VECTORS_META_EXTENSION), | |
context); | |
CodecUtil.writeIndexHeader( | |
metaStream, | |
VECTORS_INDEX_CODEC_NAME + "Meta", | |
VERSION_CURRENT, | |
si.getId(), | |
segmentSuffix); | |
assert CodecUtil.indexHeaderLength(VECTORS_INDEX_CODEC_NAME + "Meta", segmentSuffix) | |
== metaStream.getFilePointer(); | |
vectorsStream = | |
directory.createOutput( | |
IndexFileNames.segmentFileName(segment, segmentSuffix, VECTORS_EXTENSION), context); | |
CodecUtil.writeIndexHeader( | |
vectorsStream, formatName, VERSION_CURRENT, si.getId(), segmentSuffix); | |
assert CodecUtil.indexHeaderLength(formatName, segmentSuffix) | |
== vectorsStream.getFilePointer(); | |
// 生成tvx索引文件 | |
indexWriter = | |
new FieldsIndexWriter( | |
directory, | |
segment, | |
segmentSuffix, | |
VECTORS_INDEX_EXTENSION, | |
VECTORS_INDEX_CODEC_NAME, | |
si.getId(), | |
blockShift, | |
context); | |
// 记录PackedInts的版本 | |
metaStream.writeVInt(PackedInts.VERSION_CURRENT); | |
// 记录chunkSize | |
metaStream.writeVInt(chunkSize); | |
writer = new BlockPackedWriter(vectorsStream, PACKED_BLOCK_SIZE); | |
// 全局buffer,用来临时存储数据 | |
positionsBuf = new int[]; | |
startOffsetsBuf = new int[]; | |
lengthsBuf = new int[]; | |
payloadLengthsBuf = new int[]; | |
success = true; | |
} finally { | |
if (!success) { | |
IOUtils.closeWhileHandlingException(metaStream, vectorsStream, indexWriter, indexWriter); | |
} | |
} | |
} |
核心方法
startDocument
要开始处理一个doc了,创建一个DocData来存储这个doc所有的数据信息。
public void startDocument(int numVectorFields) throws IOException { | |
curDoc = addDocData(numVectorFields); | |
} | |
private DocData addDocData(int numVectorFields) { | |
FieldData last = null; | |
// 逆序遍历pendingDocs列表,获取最后一个DocData,需要根据它来计算在下一个DocData在全局buffer中的起始offset | |
for (Iterator<DocData> it = pendingDocs.descendingIterator(); it.hasNext(); ) { | |
final DocData doc = it.next(); | |
if (!doc.fields.isEmpty()) { | |
last = doc.fields.getLast(); | |
break; | |
} | |
} | |
final DocData doc; | |
if (last == null) { | |
doc = new DocData(numVectorFields,, 0, 0); // 第一个doc | |
} else { | |
final int posStart = last.posStart + (last.hasPositions ? last.totalPositions :); | |
final int offStart = last.offStart + (last.hasOffsets ? last.totalPositions :); | |
final int payStart = last.payStart + (last.hasPayloads ? last.totalPositions :); | |
doc = new DocData(numVectorFields, posStart, offStart, payStart); | |
} | |
pendingDocs.add(doc); | |
return doc; | |
} |
startField
开始处理当前doc中的一个新的Field,创建FieldData,用来存储field的所有的数据信息。
public void startField( | |
FieldInfo info, int numTerms, boolean positions, boolean offsets, boolean payloads) | |
throws IOException { | |
curField = curDoc.addField(info.number, numTerms, positions, offsets, payloads); | |
lastTerm.length =; | |
} |
startTerm
计算当前term和前一个term的最长公共前缀。
public void startTerm(BytesRef term, int freq) throws IOException { | |
// 和前一个term的最长公共前缀 | |
final int prefix; | |
if (lastTerm.length ==) { | |
prefix =; | |
} else { | |
prefix = StringHelper.bytesDifference(lastTerm, term); | |
} | |
// FieldData新增term | |
curField.addTerm(freq, prefix, term.length - prefix); | |
// 存储suffix | |
termSuffixes.writeBytes(term.bytes, term.offset + prefix, term.length - prefix); | |
// 更新lastTerm | |
if (lastTerm.bytes.length < term.length) { | |
lastTerm.bytes = new byte[ArrayUtil.oversize(term.length,)]; | |
} | |
lastTerm.offset =; | |
lastTerm.length = term.length; | |
System.arraycopy(term.bytes, term.offset, lastTerm.bytes,, term.length); | |
} |
addPosition
为当前处理的term新增一个位置相关的信息。
public void addPosition(int position, int startOffset, int endOffset, BytesRef payload) | |
throws IOException { | |
assert curField.flags !=; | |
curField.addPosition( | |
position, startOffset, endOffset - startOffset, payload == null ? : payload.length); | |
if (curField.hasPayloads && payload != null) { | |
payloadBytes.writeBytes(payload.bytes, payload.offset, payload.length); | |
} | |
} |
finishField
结束一个field的处理,就是简单把当前的curFiled清空,等待处理下一个field。
public void finishField() throws IOException { | |
curField = null; | |
} |
finishDocument
可以看到,在结束一个doc的处理时,会判断是否满足一个chunk的构建条件,如果满足的话则进行构建。
public void finishDocument() throws IOException { | |
payloadBytes.copyTo(termSuffixes); | |
payloadBytes.reset(); | |
++numDocs; | |
if (triggerFlush()) { // 是否满足一个chunk | |
flush(false); // 构建一个chunk | |
} | |
curDoc = null; | |
} |
triggerFlush
判断当前是否满足一个chunk的构建条件,二者满足其一即可:
- termSuffixes的大小大于等于chunkSize
- 当前待处理的doc总数大于等于maxDocsPerChunk
private boolean triggerFlush() { | |
return termSuffixes.size() >= chunkSize || pendingDocs.size() >= maxDocsPerChunk; | |
} |
构建chunk
flush
flush触发构建chunk逻辑,里面主要是调度逻辑,按类别构建所需的词向量信息。
private void flush(boolean force) throws IOException { | |
// 当前要构建的chunk中有几个doc | |
final int chunkDocs = pendingDocs.size(); | |
numChunks++; | |
if (force) { // 如果是强制构建chunk,可能是不满足chunk条件的,这种chunk被定义为dirtyChunk | |
numDirtyChunks++; | |
numDirtyDocs += pendingDocs.size(); | |
} | |
// 构建chunk的索引信息 | |
indexWriter.writeIndex(chunkDocs, vectorsStream.getFilePointer()); | |
final int docBase = numDocs - chunkDocs; | |
// chunk的起始docID | |
vectorsStream.writeVInt(docBase); | |
final int dirtyBit = force ? : 0; | |
vectorsStream.writeVInt((chunkDocs <<) | dirtyBit); | |
// 记录每个doc的field数量 | |
final int totalFields = flushNumFields(chunkDocs); | |
if (totalFields >) { | |
// 记录当前chunk中所有Field的编号 | |
final int[] fieldNums = flushFieldNums(); | |
// 记录所有doc的所有field的编号 | |
flushFields(totalFields, fieldNums); | |
// 记录所有doc的所有field的flag | |
flushFlags(totalFields, fieldNums); | |
// 记录所有doc的所有field的term数量 | |
flushNumTerms(totalFields); | |
// 记录所有term的长度信息 | |
flushTermLengths(); | |
// 记录所有term的频率 | |
flushTermFreqs(); | |
// 记录所有term的position信息 | |
flushPositions(); | |
// 记录所有term的offset信息 | |
flushOffsets(fieldNums); | |
// 记录所有position的payload信息 | |
flushPayloadLengths(); | |
// 记录所有的suffix | |
byte[] content = termSuffixes.toArrayCopy(); | |
compressor.compress(content,, content.length, vectorsStream); | |
} | |
// 重置相关变量,等待处理下一个chunk | |
pendingDocs.clear(); | |
curDoc = null; | |
curField = null; | |
termSuffixes.reset(); | |
} |
flushNumFields
记录所有doc的字段总数,分为两种情况:
private int flushNumFields(int chunkDocs) throws IOException { | |
if (chunkDocs ==) { // 如果chunk中只有一个doc,则就直接写这个doc的字段总数 | |
final int numFields = pendingDocs.getFirst().numFields; | |
vectorsStream.writeVInt(numFields); | |
return numFields; | |
} else { // 否则,使用PackedInts压缩存储所有doc的字段数信息 | |
writer.reset(vectorsStream); | |
int totalFields =; | |
for (DocData dd : pendingDocs) { | |
writer.add(dd.numFields); | |
totalFields += dd.numFields; | |
} | |
writer.finish(); | |
return totalFields; | |
} | |
} |
- 如果只有一个doc,则单独记录这个doc的字段数
- 否则,使用PackedInts压缩存储所有的doc的字段数
flushFieldNums
private int[] flushFieldNums() throws IOException { | |
// chunk中所有term的编号按序存储 | |
SortedSet<Integer> fieldNums = new TreeSet<>(); | |
for (DocData dd : pendingDocs) { | |
for (FieldData fd : dd.fields) { | |
fieldNums.add(fd.fieldNum); | |
} | |
} | |
final int numDistinctFields = fieldNums.size(); | |
final int bitsRequired = PackedInts.bitsRequired(fieldNums.last()); | |
// bitsRequired最大就是,所以低5位就够了 | |
final int token = (Math.min(numDistinctFields -, 0x07) << 5) | bitsRequired; | |
vectorsStream.writeByte((byte) token); | |
if (numDistinctFields - >= 0x07) { | |
vectorsStream.writeVInt(numDistinctFields - - 0x07); | |
} | |
final PackedInts.Writer writer = | |
PackedInts.getWriterNoHeader( | |
vectorsStream, PackedInts.Format.PACKED, fieldNums.size(), bitsRequired,); | |
for (Integer fieldNum : fieldNums) { | |
writer.add(fieldNum); | |
} | |
writer.finish(); | |
// Integer转int | |
int[] fns = new int[fieldNums.size()]; | |
int i =; | |
for (Integer key : fieldNums) { | |
fns[i++] = key; | |
} | |
return fns; | |
} |
flushFields
存储doc中所有的field的编号。
private void flushFields(int totalFields, int[] fieldNums) throws IOException { | |
scratchBuffer.reset(); | |
// 使用 DirectWriter 压缩存储 | |
final DirectWriter writer = | |
DirectWriter.getInstance( | |
scratchBuffer, totalFields, DirectWriter.bitsRequired(fieldNums.length -)); | |
for (DocData dd : pendingDocs) { | |
for (FieldData fd : dd.fields) { | |
final int fieldNumIndex = Arrays.binarySearch(fieldNums, fd.fieldNum); | |
assert fieldNumIndex >=; | |
writer.add(fieldNumIndex); | |
} | |
} | |
writer.finish(); | |
vectorsStream.writeVLong(scratchBuffer.size()); | |
scratchBuffer.copyTo(vectorsStream); | |
} |
flushFlags
存储doc中所有field的flag,分为两种情况:
private void flushFlags(int totalFields, int[] fieldNums) throws IOException { | |
// 所有doc中相同的field是否都是一样的flag | |
boolean nonChangingFlags = true; | |
// 如果所有相同的field的flag都一样,则最后只存储这个数组 | |
int[] fieldFlags = new int[fieldNums.length]; | |
Arrays.fill(fieldFlags, -); | |
outer: | |
for (DocData dd : pendingDocs) { // 遍历所有的doc | |
for (FieldData fd : dd.fields) { // 遍历所有的field | |
final int fieldNumOff = Arrays.binarySearch(fieldNums, fd.fieldNum); | |
assert fieldNumOff >=; | |
if (fieldFlags[fieldNumOff] == -) { | |
fieldFlags[fieldNumOff] = fd.flags; | |
} else if (fieldFlags[fieldNumOff] != fd.flags) { // 有一个field不一样 | |
nonChangingFlags = false; | |
break outer; | |
} | |
} | |
} | |
if (nonChangingFlags) { // 如果所有doc相同的field的flag都一样, | |
// 写标记这种情况 | |
vectorsStream.writeVInt(); | |
scratchBuffer.reset(); | |
final DirectWriter writer = | |
DirectWriter.getInstance(scratchBuffer, fieldFlags.length, FLAGS_BITS); | |
for (int flags : fieldFlags) { // 每个field只写一个flag | |
assert flags >=; | |
writer.add(flags); | |
} | |
writer.finish(); | |
vectorsStream.writeVInt(Math.toIntExact(scratchBuffer.size())); | |
scratchBuffer.copyTo(vectorsStream); | |
} else { // 需要记录所有doc中的所有field的flag | |
// 写标记这种情况 | |
vectorsStream.writeVInt(); | |
scratchBuffer.reset(); | |
final DirectWriter writer = DirectWriter.getInstance(scratchBuffer, totalFields, FLAGS_BITS); | |
for (DocData dd : pendingDocs) { // 遍历doc | |
for (FieldData fd : dd.fields) { // 遍历field | |
writer.add(fd.flags); // 记录field的flag | |
} | |
} | |
writer.finish(); | |
vectorsStream.writeVInt(Math.toIntExact(scratchBuffer.size())); | |
scratchBuffer.copyTo(vectorsStream); | |
} | |
} |
- 如果所有doc中相关的field的flag都一样,则每个field的flag单独存储一份就可以
- 否则,需要存储所有doc中所有field的flag
flushNumTerms
存储所有field的term数量,会先统计最大的term数量,用来获取最大的term数据值需要几个bit存储。
private void flushNumTerms(int totalFields) throws IOException { | |
int maxNumTerms =; | |
// 获取最大的term数量的值 | |
for (DocData dd : pendingDocs) { | |
for (FieldData fd : dd.fields) { | |
maxNumTerms |= fd.numTerms; | |
} | |
} | |
final int bitsRequired = DirectWriter.bitsRequired(maxNumTerms); | |
vectorsStream.writeVInt(bitsRequired); | |
scratchBuffer.reset(); | |
// 使用DirectWriter压缩存储 | |
final DirectWriter writer = DirectWriter.getInstance(scratchBuffer, totalFields, bitsRequired); | |
for (DocData dd : pendingDocs) { | |
for (FieldData fd : dd.fields) { | |
writer.add(fd.numTerms); | |
} | |
} | |
writer.finish(); | |
vectorsStream.writeVInt(Math.toIntExact(scratchBuffer.size())); | |
scratchBuffer.copyTo(vectorsStream); | |
} |
flushTermLengths
分别存储term的prefixLength和suffixLength。
private void flushTermLengths() throws IOException { | |
// 存储prefixLength | |
writer.reset(vectorsStream); | |
for (DocData dd : pendingDocs) { | |
for (FieldData fd : dd.fields) { | |
for (int i =; i < fd.numTerms; ++i) { | |
writer.add(fd.prefixLengths[i]); | |
} | |
} | |
} | |
writer.finish(); | |
// 存储suffixLength | |
writer.reset(vectorsStream); | |
for (DocData dd : pendingDocs) { | |
for (FieldData fd : dd.fields) { | |
for (int i =; i < fd.numTerms; ++i) { | |
writer.add(fd.suffixLengths[i]); | |
} | |
} | |
} | |
writer.finish(); | |
} |
flushTermFreqs
存储term的频率,这里有个小小的优化,为了提高压缩率。
private void flushTermFreqs() throws IOException { | |
writer.reset(vectorsStream); | |
for (DocData dd : pendingDocs) { | |
for (FieldData fd : dd.fields) { | |
for (int i =; i < fd.numTerms; ++i) { | |
// 已经确定了freq肯定是大于等于,减1是为了提高writer的压缩率,读取的时候加1就行了。 | |
writer.add(fd.freqs[i] -); | |
} | |
} | |
} | |
writer.finish(); | |
} |
flushPositions
差值存储所有的position。
private void flushPositions() throws IOException { | |
writer.reset(vectorsStream); | |
for (DocData dd : pendingDocs) { | |
for (FieldData fd : dd.fields) { | |
if (fd.hasPositions) { | |
int pos =; | |
for (int i =; i < fd.numTerms; ++i) { | |
int previousPosition =; | |
for (int j =; j < fd.freqs[i]; ++j) { | |
final int position = positionsBuf[fd.posStart + pos++]; | |
writer.add(position - previousPosition); | |
previousPosition = position; | |
} | |
} | |
assert pos == fd.totalPositions; | |
} | |
} | |
} | |
writer.finish(); | |
} |
flushOffsets
offset的存储做了一个优化设计,原因是term出现的不同的offset跨度可能会比较大,如果把原始的offset用PackedInts进行存储,可能压缩率不会很高。因此,在正式存储offset之前,先计算平均的term长度,根据term出现的前后两个offset的position,可以估计两个position的距离,用真实的前后两个offset的距离减去这个估计的距离,就能使得offset的差值向0趋近,可以提高PackedInts的压缩率。
private void flushOffsets(int[] fieldNums) throws IOException { | |
// 至少一个字段开启了offset | |
boolean hasOffsets = false; | |
// term在所有字段中出现的最后一个postition之和 | |
long[] sumPos = new long[fieldNums.length]; | |
// term在所有字段中出现的最后一个startOffset之和 | |
long[] sumOffsets = new long[fieldNums.length]; | |
for (DocData dd : pendingDocs) { // 遍历所有的doc | |
for (FieldData fd : dd.fields) { // 遍历doc中的所有field | |
hasOffsets |= fd.hasOffsets; | |
if (fd.hasOffsets && fd.hasPositions) { // 如果字段开启了offset和position | |
// 查找在term数组中的下标 | |
final int fieldNumOff = Arrays.binarySearch(fieldNums, fd.fieldNum); | |
int pos =; | |
for (int i =; i < fd.numTerms; ++i) { | |
sumPos[fieldNumOff] += positionsBuf[fd.posStart + fd.freqs[i] - + pos]; | |
sumOffsets[fieldNumOff] += startOffsetsBuf[fd.offStart + fd.freqs[i] - + pos]; | |
pos += fd.freqs[i]; | |
} | |
assert pos == fd.totalPositions; | |
} | |
} | |
} | |
if (!hasOffsets) { | |
return; | |
} | |
final float[] charsPerTerm = new float[fieldNums.length]; | |
// 用 sumOffsets[i] / sumPos[i] 估计第i个term的长度 | |
for (int i =; i < fieldNums.length; ++i) { | |
charsPerTerm[i] = | |
(sumPos[i] <= || sumOffsets[i] <= 0) ? 0 : (float) ((double) sumOffsets[i] / sumPos[i]); | |
} | |
// tvd中存储charsPerTerm | |
for (int i =; i < fieldNums.length; ++i) { | |
vectorsStream.writeInt(Float.floatToRawIntBits(charsPerTerm[i])); | |
} | |
writer.reset(vectorsStream); | |
for (DocData dd : pendingDocs) { // 遍历所有的doc | |
for (FieldData fd : dd.fields) { // 遍历doc中所有的field | |
if ((fd.flags & OFFSETS) !=) { // 如果开启了offset | |
final int fieldNumOff = Arrays.binarySearch(fieldNums, fd.fieldNum); | |
final float cpt = charsPerTerm[fieldNumOff]; | |
int pos =; | |
for (int i =; i < fd.numTerms; ++i) { // 遍历field中所有的term | |
int previousPos =; // 差值使用 | |
int previousOff =; // 差值使用 | |
for (int j =; j < fd.freqs[i]; ++j) { // 遍历term出现的所有位置 | |
final int position = fd.hasPositions ? positionsBuf[fd.posStart + pos] :; | |
final int startOffset = startOffsetsBuf[fd.offStart + pos]; | |
// (int) (cpt * (position - previousPos)):当前potition和前一个position之间的长度 | |
// startOffset - previousOff再减去(int) (cpt * (position - previousPos))就把值降到最小 | |
writer.add(startOffset - previousOff - (int) (cpt * (position - previousPos))); | |
previousPos = position; | |
previousOff = startOffset; | |
++pos; | |
} | |
} | |
} | |
} | |
} | |
writer.finish(); | |
// lengths | |
writer.reset(vectorsStream); | |
for (DocData dd : pendingDocs) { // 遍历所有的doc | |
for (FieldData fd : dd.fields) { // 遍历所有的Field | |
if ((fd.flags & OFFSETS) !=) { // 如果开启了offset | |
int pos =; | |
for (int i =; i < fd.numTerms; ++i) { | |
for (int j =; j < fd.freqs[i]; ++j) { | |
writer.add( | |
// 减去前缀长度和后缀长度也是为了把值变小,减少存储空间 | |
lengthsBuf[fd.offStart + pos++] - fd.prefixLengths[i] - fd.suffixLengths[i]); | |
} | |
} | |
assert pos == fd.totalPositions; | |
} | |
} | |
} | |
writer.finish(); | |
} |
flushPayloadLengths
存储所有的payload的长度信息。
private void flushPayloadLengths() throws IOException { | |
writer.reset(vectorsStream); | |
for (DocData dd : pendingDocs) { | |
for (FieldData fd : dd.fields) { | |
if (fd.hasPayloads) { | |
for (int i =; i < fd.totalPositions; ++i) { | |
writer.add(payloadLengthsBuf[fd.payStart + i]); | |
} | |
} | |
} | |
} | |
writer.finish(); | |
} |
finish
结束词向量索引文件的构建,把待处理doc列表中剩下的doc生成一个chunk。
public void finish(int numDocs) throws IOException { | |
if (!pendingDocs.isEmpty()) { // 如果还有待处理的doc,则强制生成一个chunk | |
flush(true); | |
} | |
if (numDocs != this.numDocs) { | |
throw new RuntimeException( | |
"Wrote " + this.numDocs + " docs, finish called with numDocs=" + numDocs); | |
} | |
// 生成tvx索引文件 | |
indexWriter.finish(numDocs, vectorsStream.getFilePointer(), metaStream); | |
metaStream.writeVLong(numChunks); | |
metaStream.writeVLong(numDirtyChunks); | |
metaStream.writeVLong(numDirtyDocs); | |
CodecUtil.writeFooter(metaStream); | |
CodecUtil.writeFooter(vectorsStream); | |
} |
TermVectorsConsumerPerField
TermVectorsConsumerPerField是TermsHashPerField的子类,TermsHashPerField在之前介绍倒排的时候已经非常详细地介绍过了。
在本文中,我们重点介绍不一样的地方。在介绍倒排的时候使用的是FreqProxTermsWriterPerField,它存储了所有的倒排数据,在所有的文档都处理完了之后才进行序列化和持久化,TermVectorsConsumerPerField和它最大的区别是每处理完一个doc,就进行序列化然后重置等待处理下一个doc。
在TermVectorsConsumerPerField的源码中,如果已经看明白之前倒排的逻辑,则大部分地方理解起来都比较容易,这里我们只看一个文档处理完之后进行序列化的逻辑,实际上在TermVectorsConsumerPerField中只负责调度Lucene90CompressingTermVectorsWriter进行操作:
void finishDocument() throws IOException { | |
// 如果没有开启词向量构建 | |
if (doVectors == false) { | |
return; | |
} | |
doVectors = false; | |
// 当前field的term总数 | |
final int numPostings = getNumTerms(); | |
// 用来存储当前序列化的term | |
final BytesRef flushTerm = termsWriter.flushTerm; | |
TermVectorsPostingsArray postings = termVectorsPostingsArray; | |
// 序列化和持久化的核心类,实际上使用的实现类:LuceneCompressingTermVectorsWriter | |
final TermVectorsWriter tv = termsWriter.writer; | |
// 对term进行排序 | |
sortTerms(); | |
// 获取排序后的termID列表 | |
final int[] termIDs = getSortedTermIDs(); | |
// 开始处理一个Field | |
tv.startField(fieldInfo, numPostings, doVectorPositions, doVectorOffsets, hasPayloads); | |
// 用来从bytePool中读取position信息 | |
final ByteSliceReader posReader = doVectorPositions ? termsWriter.vectorSliceReaderPos : null; | |
// 用来从bytePool中读取offset信息 | |
final ByteSliceReader offReader = doVectorOffsets ? termsWriter.vectorSliceReaderOff : null; | |
// 遍历所有的term | |
for (int j =; j < numPostings; j++) { | |
final int termID = termIDs[j]; | |
final int freq = postings.freqs[termID]; | |
// 当前处理的term存入flushTerm | |
termBytePool.setBytesRef(flushTerm, postings.textStarts[termID]); | |
// 准备序列化term的词向量信息 | |
tv.startTerm(flushTerm, freq); | |
if (doVectorPositions || doVectorOffsets) { | |
if (posReader != null) { | |
initReader(posReader, termID,); | |
} | |
if (offReader != null) { | |
initReader(offReader, termID,); | |
} | |
// 序列化所有的position和offset信息 | |
tv.addProx(freq, posReader, offReader); | |
} | |
// 结束term的处理 | |
tv.finishTerm(); | |
} | |
// 结束Field的处理 | |
tv.finishField(); | |
reset(); | |
fieldInfo.setStoreTermVectors(); | |
} |
索引文件格式
tvm
词向量索引文件的元信息,用来读取使用。
字段详解
Header
文件头部信息,主要是包括:
- 文件头魔数(同一lucene版本所有文件相同)
- 该文件使用的codec名称:Lucene90TermVectorsIndexMeta
- codec版本
- segment id(也是Segment_N文件中的N)
- segment后缀名(一般为空)
PackedItsVersion
在词向量的索引文件中有很多数据是使用PackedIts压缩存储,该字段记录PackedInts的版本。、
ChunkSize
用来判断是否满足一个chunk的一种条件,如果chunk的大小超过了ChunkSize的限制,则可以构建一个chunk
NumChunks
chunk总数
NumDirtyChunks
dirtyChunk总数
NumDirtyDocs
dirtyChunk中的doc总数
NumDocs
doc总数
BlockShift
DirectMonotonicWriter需要的参数,DirectMonotonicWriter压缩存储会生成多个block,BlockShift决定了block的大小。
TotalChunks + 1
chunk总数 + 1,在生成tvx索引文件中ChunkStartDocIDs和ChunkTVDOffsets两个字段时,使用DirectMonotonicWriter写入的值的总数。
tvxDocStartFP
tvx索引文件中ChunkStartDocIDs的起始位置
DocBlockMeta
tvx索引文件中ChunkStartDocIDs使用DirectMonotonicWriter编码存储,会生成多个block,这些block的元信息。
tvxOffsetStartFP
tvx中ChunkTVDOffsets的起始位置
OffsetBlockMeta
tvx索引文件中ChunkTVDOffsets使用DirectMonotonicWriter编码存储,会生成多个block,这些block的元信息。
SPEndPoint
tvx文件的结束位置,后面是tvx的footer信息。
MaxPointer
tvd文件的结束位置,后面tvd的footer信息。
Footer
文件尾,主要包括
- 文件尾魔数(同一个lucene版本所有文件一样)
- 0
- 校验码
tvd 字段详解
tvd索引文件主要是存储倒排信息中freq,position,offset,payload。
Header
文件头部信息,主要是包括:
- 文件头魔数(同一lucene版本所有文件相同)
- 该文件使用的codec名称:Lucene90TermVectorsData
- codec版本
- segment id(也是Segment_N文件中的N)
- segment后缀名(一般为空)
chunk
在词向量的构建过程中,
- DocBase:Chunk中Doc的起始编号,Chunk中所有doc的真实编号需要加上这个DocBase
- ChunkDocsCode:是ChunkDocs和isDirty的int组合体
- ChunkDocs:chunk中的doc总数
- isDirty:chunk中是否存在dirtyChunk
- NumField:chunk中文档的字段个数。如果chunk中只有一个doc,则存储的就是这个doc的字段个数。否则使用packedInts压缩存储所有的doc的字段个数信息。
- AllUniqueFieldNums:按序存储chunk中所有去重后的field的编号,用PackedInts压缩存储。
- FieldNums:使用DirectWriter压缩存储所有doc的字段编号在AllUniqueFieldNums列表中的下标。
- FieldFlagsCode:int类型的IsChangeFlag和DirectWriter压缩存储的FiledFlags
- IsChangeFlag:是否有字段在不同的doc中的flag是不一样的。0表示所有的doc中的相同Field的flag都一样,1则不是。
- FieldFlags:如果IsChangeFlag==0,则存储的是AllUniqueFieldNums中每个字段的flag,否则存储的就是所有doc中所有字段的flag。
- FieldNumTerms:使用DirectWriter压缩存储所有的doc中的所有field的term的总数
- PrefixLengths:使用PackedInts存储所有term和同字段中前一个term的最长公共前缀长度
- SuffixLengths:使用PackedInts存储所有term和同字段中前一个term的扣除最长公共前缀剩下的后缀的长度
- TermFreqs:使用PackedInts存储所有doc的所有字段中term出现的频率
- Positions:使用PackedInts存储所有doc中所有Field中term出现的差值position
- StartOffsets:使用PackedInts存储所有doc中所有Field中term出现的StartOffset,具体做了一些处理(看前面的源码分析),占用空间会更小。
- Lengths:使用PackedInts存储所有doc中所有Field中term出现的EndOffset - StartOffset,具体做了一些处理(看前面的源码分析),占用空间会更小。
- PayloadLengths:使用PackedInts存储所有的doc中所有字段中的term的payload长度
- TermSuffixes:使用LZ4压缩存储所有term的suffix
Footer
文件尾,主要包括
- 文件尾魔数(同一个lucene版本所有文件一样)
- 0
- 校验码
tvx 字段详解
tvx索引文件主要存储的是tvd索引文件中的一些索引信息,tvd中每个chunk的起始docID以及存储的起始位置。
Header
文件头部信息,主要是包括:
- 文件头魔数(同一lucene版本所有文件相同)
- 该文件使用的codec名称:Lucene90TermVectorsIndexIdx
- codec版本
- segment id(也是Segment_N文件中的N)
- segment后缀名(一般为空)
ChunkStartDocIDs
所有chunk的起始docID,使用DirectMonotonicWriter编码存储,会生成多个block。
ChunkTVDOffsetsBlock
所有chunk在tvd索引文件中的起始位置,使用DirectMonotonicWriter编码存储,会生成多个block。
Footer
文件尾,主要包括
- 文件尾魔数(同一个lucene版本所有文件一样)
- 0
- 校验码