diff --git a/pkg/linedb/index.go b/pkg/linedb/index.go index 184bee2..13a0bb8 100644 --- a/pkg/linedb/index.go +++ b/pkg/linedb/index.go @@ -33,8 +33,9 @@ type IndexStore interface { UnindexRecord(collection, partition string, fields []string, record map[string]any, lineIndex int) // Rebuild перестраивает вклад в индекс для одной партиции (или всей коллекции, если одна партиция). - // partition — имя партиции; records — записи, позиция в срезе = lineIndex. - Rebuild(collection, partition string, fields []string, records []any) error + // lineIndexes[i] — 0-based номер физической строки в файле для records[i] (после пустых/удалённых строк). + // Если lineIndexes == nil или len != len(records), используются плотные индексы 0..len-1 (устаревшее поведение). + Rebuild(collection, partition string, fields []string, records []any, lineIndexes []int) error // Clear очищает индекс коллекции (все партиции). Clear(collection string) error @@ -164,7 +165,7 @@ func (s *InMemoryIndexStore) Lookup(collection, field, value string) ([]IndexPos } // Rebuild перестраивает вклад партиции в индекс. -func (s *InMemoryIndexStore) Rebuild(collection, partition string, fields []string, records []any) error { +func (s *InMemoryIndexStore) Rebuild(collection, partition string, fields []string, records []any, lineIndexes []int) error { if partition == "" { partition = DefaultPartition } @@ -194,7 +195,11 @@ func (s *InMemoryIndexStore) Rebuild(collection, partition string, fields []stri } // Добавляем новые позиции - for idx, rec := range records { + for i, rec := range records { + lineIdx := i + if lineIndexes != nil && i < len(lineIndexes) { + lineIdx = lineIndexes[i] + } recMap, ok := rec.(map[string]any) if !ok { continue @@ -205,7 +210,7 @@ func (s *InMemoryIndexStore) Rebuild(collection, partition string, fields []stri if s.index[key] == nil { s.index[key] = make(map[string][]IndexPosition) } - s.index[key][val] = append(s.index[key][val], IndexPosition{Partition: partition, LineIndex: idx}) + s.index[key][val] = append(s.index[key][val], IndexPosition{Partition: partition, LineIndex: lineIdx}) } } return nil diff --git a/pkg/linedb/index_memcached.go b/pkg/linedb/index_memcached.go index 7d53754..c1b4ba0 100644 --- a/pkg/linedb/index_memcached.go +++ b/pkg/linedb/index_memcached.go @@ -133,7 +133,7 @@ func (s *MemcachedIndexStore) Lookup(collection, field, value string) ([]IndexPo } // Rebuild перестраивает вклад партиции в индекс. -func (s *MemcachedIndexStore) Rebuild(collection, partition string, fields []string, records []any) error { +func (s *MemcachedIndexStore) Rebuild(collection, partition string, fields []string, records []any, lineIndexes []int) error { if partition == "" { partition = DefaultPartition } @@ -155,7 +155,11 @@ func (s *MemcachedIndexStore) Rebuild(collection, partition string, fields []str // встречаются в records. Для остальных value старые позиции этой partition останутся. // Значит нужен merge: для каждого value в records делаем Get, убираем старые Position с этой partition, добавляем новые. byFieldValue := make(map[string]map[string][]IndexPosition) - for idx, rec := range records { + for i, rec := range records { + lineIdx := i + if lineIndexes != nil && i < len(lineIndexes) { + lineIdx = lineIndexes[i] + } recMap, ok := rec.(map[string]any) if !ok { continue @@ -165,7 +169,7 @@ func (s *MemcachedIndexStore) Rebuild(collection, partition string, fields []str if byFieldValue[field] == nil { byFieldValue[field] = make(map[string][]IndexPosition) } - byFieldValue[field][val] = append(byFieldValue[field][val], IndexPosition{Partition: partition, LineIndex: idx}) + byFieldValue[field][val] = append(byFieldValue[field][val], IndexPosition{Partition: partition, LineIndex: lineIdx}) } } diff --git a/pkg/linedb/jsonl_file.go b/pkg/linedb/jsonl_file.go index b08bf0b..a0763b6 100644 --- a/pkg/linedb/jsonl_file.go +++ b/pkg/linedb/jsonl_file.go @@ -402,6 +402,83 @@ func (j *JSONLFile) Read(options LineDbAdapterOptions) ([]any, error) { return records, scanner.Err() } +// ReadWithPhysicalLineIndexes как Read, но для каждой записи возвращает 0-based индекс строки (слота), +// в том же смысле, что ReadByLineIndexes/WriteAtLineIndexes: смещение в файле = lineIndex * allocSize. +// Пустые слоты (пробелы после удаления и т.п.) пропускаются; индекс — номер слота, а не порядковый номер записи. +func (j *JSONLFile) ReadWithPhysicalLineIndexes(options LineDbAdapterOptions) ([]any, []int, error) { + if !options.InTransaction { + j.mutex.RLock() + defer j.mutex.RUnlock() + } + + if !j.initialized { + return nil, nil, fmt.Errorf("file not initialized") + } + if j.allocSize <= 0 { + return nil, nil, fmt.Errorf("invalid allocSize") + } + + file, err := os.Open(j.filename) + if err != nil { + return nil, nil, fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + info, err := file.Stat() + if err != nil { + return nil, nil, fmt.Errorf("stat file: %w", err) + } + nSlots := int(info.Size()) / j.allocSize + + buf := make([]byte, j.allocSize) + var records []any + var lineIndexes []int + + for slot := 0; slot < nSlots; slot++ { + n, err := io.ReadFull(file, buf) + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } + if err != nil { + return nil, nil, fmt.Errorf("read slot %d: %w", slot, err) + } + if n != j.allocSize { + break + } + + line := string(buf[:n]) + line = strings.TrimRight(line, "\n") + line = strings.TrimRight(line, " ") + if strings.TrimSpace(line) == "" { + continue + } + + if j.cypherKey != "" && !j.options.Encode { + decoded, err := base64.StdEncoding.DecodeString(line) + if err != nil { + if j.options.SkipInvalidLines { + continue + } + return nil, nil, fmt.Errorf("failed to decode base64: %w", err) + } + line = string(decoded) + } + + var record any + if err := j.jsonUnmarshal([]byte(line), &record); err != nil { + if j.options.SkipInvalidLines { + continue + } + return nil, nil, fmt.Errorf("failed to unmarshal JSON: %w", err) + } + + records = append(records, record) + lineIndexes = append(lineIndexes, slot) + } + + return records, lineIndexes, nil +} + // ReadByLineIndexes читает записи по номерам строк (0-based) с использованием random access. // Ожидается, что файл нормализован и каждая строка имеет длину allocSize байт (включая \n), // как это обеспечивает Init/normalizeExistingFile/rewriteFile. diff --git a/pkg/linedb/line_db.go b/pkg/linedb/line_db.go index 2fd00fd..4fbc371 100644 --- a/pkg/linedb/line_db.go +++ b/pkg/linedb/line_db.go @@ -143,20 +143,20 @@ func (db *LineDb) Init(force bool, initOptions *LineDbInitOptions) error { if !strings.HasPrefix(name, baseName+"_") { continue } - records, err := adapter.Read(LineDbAdapterOptions{}) + records, lineIdx, err := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{}) if err != nil { continue } - _ = db.indexStore.Rebuild(opt.CollectionName, name, opt.IndexedFields, records) + _ = db.indexStore.Rebuild(opt.CollectionName, name, opt.IndexedFields, records, lineIdx) } } else { adapter := db.adapters[opt.CollectionName] if adapter != nil { - records, err := adapter.Read(LineDbAdapterOptions{}) + records, lineIdx, err := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{}) if err != nil { return fmt.Errorf("failed to read collection %s for index rebuild: %w", opt.CollectionName, err) } - if err := db.indexStore.Rebuild(opt.CollectionName, DefaultPartition, opt.IndexedFields, records); err != nil { + if err := db.indexStore.Rebuild(opt.CollectionName, DefaultPartition, opt.IndexedFields, records, lineIdx); err != nil { return fmt.Errorf("failed to rebuild index for %s: %w", opt.CollectionName, err) } } @@ -386,9 +386,9 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO if opts != nil && len(opts.IndexedFields) > 0 && !db.isCollectionPartitioned(collectionName) { adapter, exists := db.adapters[collectionName] if exists { - allRecords, readErr := adapter.Read(LineDbAdapterOptions{}) + allRecords, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{}) if readErr == nil { - _ = db.indexStore.Rebuild(collectionName, DefaultPartition, opts.IndexedFields, allRecords) + _ = db.indexStore.Rebuild(collectionName, DefaultPartition, opts.IndexedFields, allRecords, lineIdx) } } } @@ -541,9 +541,9 @@ func (db *LineDb) Update(data any, collectionName string, filter any, options Li if db.indexStore != nil { opts := db.getCollectionOptions(collectionName) if opts != nil && len(opts.IndexedFields) > 0 { - allRecords, readErr := adapter.Read(LineDbAdapterOptions{}) + allRecords, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{}) if readErr == nil { - _ = db.indexStore.Rebuild(collectionName, DefaultPartition, opts.IndexedFields, allRecords) + _ = db.indexStore.Rebuild(collectionName, DefaultPartition, opts.IndexedFields, allRecords, lineIdx) } } } @@ -607,9 +607,9 @@ func (db *LineDb) Delete(data any, collectionName string, options LineDbAdapterO if db.indexStore != nil { opts := db.getCollectionOptions(collectionName) if opts != nil && len(opts.IndexedFields) > 0 { - allRecords, readErr := adapter.Read(LineDbAdapterOptions{}) + allRecords, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{}) if readErr == nil { - _ = db.indexStore.Rebuild(collectionName, DefaultPartition, opts.IndexedFields, allRecords) + _ = db.indexStore.Rebuild(collectionName, DefaultPartition, opts.IndexedFields, allRecords, lineIdx) } } } @@ -813,18 +813,18 @@ func (db *LineDb) indexRebuildTimerLoop(interval time.Duration) { if !strings.HasPrefix(name, baseName+"_") { continue } - records, err := adapter.Read(LineDbAdapterOptions{}) + records, lineIdx, err := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{}) if err != nil { continue } - _ = db.indexStore.Rebuild(opt.CollectionName, name, opt.IndexedFields, records) + _ = db.indexStore.Rebuild(opt.CollectionName, name, opt.IndexedFields, records, lineIdx) } } else { adapter := db.adapters[opt.CollectionName] if adapter != nil { - records, err := adapter.Read(LineDbAdapterOptions{}) + records, lineIdx, err := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{}) if err == nil { - _ = db.indexStore.Rebuild(opt.CollectionName, DefaultPartition, opt.IndexedFields, records) + _ = db.indexStore.Rebuild(opt.CollectionName, DefaultPartition, opt.IndexedFields, records, lineIdx) } } } @@ -1614,9 +1614,9 @@ func (db *LineDb) updatePartitioned(data any, collectionName string, filter any, } allResults = append(allResults, results...) if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { - records, readErr := adapter.Read(LineDbAdapterOptions{InTransaction: true}) + records, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{InTransaction: true}) if readErr == nil { - _ = db.indexStore.Rebuild(collectionName, partitionName, opts.IndexedFields, records) + _ = db.indexStore.Rebuild(collectionName, partitionName, opts.IndexedFields, records, lineIdx) } } } @@ -1651,9 +1651,9 @@ func (db *LineDb) deletePartitioned(data any, collectionName string, options Lin } allResults = append(allResults, results...) if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { - records, readErr := adapter.Read(LineDbAdapterOptions{InTransaction: true}) + records, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{InTransaction: true}) if readErr == nil { - _ = db.indexStore.Rebuild(collectionName, partitionName, opts.IndexedFields, records) + _ = db.indexStore.Rebuild(collectionName, partitionName, opts.IndexedFields, records, lineIdx) } } } diff --git a/tests/nonpartitioned/index_fail_test.go b/tests/nonpartitioned/index_fail_test.go index 49d0a88..54c4481 100644 --- a/tests/nonpartitioned/index_fail_test.go +++ b/tests/nonpartitioned/index_fail_test.go @@ -27,8 +27,8 @@ func (f *failingLookupStore) UnindexRecord(collection, partition string, fields f.inner.UnindexRecord(collection, partition, fields, record, lineIndex) } -func (f *failingLookupStore) Rebuild(collection, partition string, fields []string, records []any) error { - return f.inner.Rebuild(collection, partition, fields, records) +func (f *failingLookupStore) Rebuild(collection, partition string, fields []string, records []any, lineIndexes []int) error { + return f.inner.Rebuild(collection, partition, fields, records, lineIndexes) } func (f *failingLookupStore) Clear(collection string) error {