From 0b1deabbaa215752cfad7faf8e3314b65dbdf932 Mon Sep 17 00:00:00 2001 From: "direct-dev.ru" Date: Thu, 9 Apr 2026 15:55:09 +0600 Subject: [PATCH] change module name 20 --- pkg/linedb/jsonl_file.go | 130 +++++++++++++++++++++++++++++++++++++++ pkg/linedb/line_db.go | 104 ++++++++++++++++++++++--------- 2 files changed, 204 insertions(+), 30 deletions(-) diff --git a/pkg/linedb/jsonl_file.go b/pkg/linedb/jsonl_file.go index a0763b6..f1caea6 100644 --- a/pkg/linedb/jsonl_file.go +++ b/pkg/linedb/jsonl_file.go @@ -479,6 +479,56 @@ func (j *JSONLFile) ReadWithPhysicalLineIndexes(options LineDbAdapterOptions) ([ return records, lineIndexes, nil } +// HasBlankSlots возвращает true, если в файле есть хотя бы один пустой слот +// (пробелы + перевод строки — типичный след точечного удаления BlankLinesAtPositions). +func (j *JSONLFile) HasBlankSlots(options LineDbAdapterOptions) (bool, error) { + if !options.InTransaction { + j.mutex.RLock() + defer j.mutex.RUnlock() + } + if !j.initialized { + return false, fmt.Errorf("file not initialized") + } + if j.allocSize <= 0 { + return false, fmt.Errorf("invalid allocSize") + } + + file, err := os.Open(j.filename) + if err != nil { + return false, fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + info, err := file.Stat() + if err != nil { + return false, fmt.Errorf("stat file: %w", err) + } + nSlots := int(info.Size()) / j.allocSize + buf := make([]byte, j.allocSize) + + for slot := 0; slot < nSlots; slot++ { + n, err := io.ReadFull(file, buf) + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } + if err != nil { + return false, fmt.Errorf("read slot %d: %w", slot, err) + } + if n != j.allocSize { + break + } + + line := string(buf[:n]) + line = strings.TrimRight(line, "\n") + line = strings.TrimRight(line, " ") + if strings.TrimSpace(line) == "" { + return true, nil + } + } + + return false, nil +} + // ReadByLineIndexes читает записи по номерам строк (0-based) с использованием random access. // Ожидается, что файл нормализован и каждая строка имеет длину allocSize байт (включая \n), // как это обеспечивает Init/normalizeExistingFile/rewriteFile. @@ -739,6 +789,86 @@ func (j *JSONLFile) BlankLinesAtPositions(lineIndexes []int, options LineDbAdapt return nil } +// CompactFile перезаписывает файл, копируя подряд только непустые слоты (сырые байты без JSON). +// Пустой слот: после обрезки \n и пробелов строка пустая — как после BlankLinesAtPositions. +// Один буфер на весь проход; без раскодирования и без удержания всех записей в памяти. +func (j *JSONLFile) CompactFile(options LineDbAdapterOptions) error { + if !options.InTransaction { + j.mutex.Lock() + defer j.mutex.Unlock() + } + if !j.initialized { + return fmt.Errorf("file not initialized") + } + if j.allocSize <= 0 { + return fmt.Errorf("invalid allocSize") + } + + info, err := os.Stat(j.filename) + if err != nil { + return fmt.Errorf("compact stat: %w", err) + } + nSlots := int(info.Size()) / j.allocSize + + tempFile := j.filename + ".tmp" + dst, err := os.Create(tempFile) + if err != nil { + return fmt.Errorf("compact create temp: %w", err) + } + + src, err := os.Open(j.filename) + if err != nil { + dst.Close() + _ = os.Remove(tempFile) + return fmt.Errorf("compact open source: %w", err) + } + + buf := make([]byte, j.allocSize) + compactErr := func() error { + defer src.Close() + for slot := 0; slot < nSlots; slot++ { + n, err := io.ReadFull(src, buf) + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } + if err != nil { + return fmt.Errorf("compact read slot %d: %w", slot, err) + } + if n != j.allocSize { + break + } + + slot := buf[:n] + t := bytes.TrimRight(bytes.TrimRight(slot, "\n"), " ") + if len(bytes.TrimSpace(t)) == 0 { + continue + } + if _, err := dst.Write(slot); err != nil { + return fmt.Errorf("compact write: %w", err) + } + } + return nil + }() + + if err := dst.Close(); err != nil { + _ = os.Remove(tempFile) + if compactErr != nil { + return compactErr + } + return fmt.Errorf("compact close temp: %w", err) + } + if compactErr != nil { + _ = os.Remove(tempFile) + return compactErr + } + + if err := os.Rename(tempFile, j.filename); err != nil { + _ = os.Remove(tempFile) + return fmt.Errorf("compact rename: %w", err) + } + return nil +} + // LineCount возвращает число строк в файле (fileSize / allocSize). // Используется для точечного индексирования после Write. func (j *JSONLFile) LineCount() (int, error) { diff --git a/pkg/linedb/line_db.go b/pkg/linedb/line_db.go index 4fbc371..b4b8a30 100644 --- a/pkg/linedb/line_db.go +++ b/pkg/linedb/line_db.go @@ -365,8 +365,9 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO writeOptions := LineDbAdapterOptions{InTransaction: true, InternalCall: true} if db.indexStore != nil { opts := db.getCollectionOptions(collectionName) - if opts != nil && len(opts.IndexedFields) > 0 && db.isCollectionPartitioned(collectionName) { - writeOptions.DoIndexing = true // индекс строится точечно при Write в партиции + if opts != nil && len(opts.IndexedFields) > 0 { + // Точечное IndexRecord в Write (LineCount → стартовая строка для новых записей) + writeOptions.DoIndexing = true } } if err := db.Write(resultDataArray, collectionName, writeOptions); err != nil { @@ -380,20 +381,6 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO } } - // Обновляем индекс (полная пересборка по коллекции) — только для непартиционированных - if db.indexStore != nil { - opts := db.getCollectionOptions(collectionName) - if opts != nil && len(opts.IndexedFields) > 0 && !db.isCollectionPartitioned(collectionName) { - adapter, exists := db.adapters[collectionName] - if exists { - allRecords, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{}) - if readErr == nil { - _ = db.indexStore.Rebuild(collectionName, DefaultPartition, opts.IndexedFields, allRecords, lineIdx) - } - } - } - } - return nil } @@ -520,7 +507,7 @@ func (db *LineDb) Update(data any, collectionName string, filter any, options Li // Пробуем точечный Update через индекс (без полного чтения файла) opts := db.getCollectionOptions(collectionName) if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 && !db.isCollectionPartitioned(collectionName) { - result, used, err := db.tryIndexUpdate(adapter, filter, dataMap, collectionName, opts.IndexedFields, options) + result, used, err := db.tryIndexUpdate(adapter, filter, dataMap, collectionName, DefaultPartition, opts.IndexedFields, options) if err != nil { return nil, err } @@ -586,7 +573,7 @@ func (db *LineDb) Delete(data any, collectionName string, options LineDbAdapterO opts := db.getCollectionOptions(collectionName) if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { - result, used, err := db.tryIndexDelete(adapter, data, collectionName, opts.IndexedFields, options) + result, used, err := db.tryIndexDelete(adapter, data, collectionName, DefaultPartition, opts.IndexedFields, options) if err != nil { return nil, err } @@ -792,6 +779,8 @@ func (db *LineDb) Close() { // indexRebuildTimerLoop выполняет полный ребилд всех индексов с заданным интервалом (IndexRebuildTimer). // Работает в отдельной горутине; при каждом тике берёт db.mutex.Lock() на время ребилда. +// Перед ребилдом коллекции: если в файле есть пустые слоты (след точечного удаления), вызывается +// CompactFile — перезапись без «дыр», затем индексирование по сжатому файлу. // Останавливается при закрытии db.indexRebuildDone. func (db *LineDb) indexRebuildTimerLoop(interval time.Duration) { ticker := time.NewTicker(interval) @@ -813,6 +802,13 @@ func (db *LineDb) indexRebuildTimerLoop(interval time.Duration) { if !strings.HasPrefix(name, baseName+"_") { continue } + dirty, err := adapter.HasBlankSlots(LineDbAdapterOptions{}) + if err != nil { + continue + } + if dirty { + _ = adapter.CompactFile(LineDbAdapterOptions{}) + } records, lineIdx, err := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{}) if err != nil { continue @@ -822,6 +818,10 @@ func (db *LineDb) indexRebuildTimerLoop(interval time.Duration) { } else { adapter := db.adapters[opt.CollectionName] if adapter != nil { + dirty, err := adapter.HasBlankSlots(LineDbAdapterOptions{}) + if err == nil && dirty { + _ = adapter.CompactFile(LineDbAdapterOptions{}) + } records, lineIdx, err := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{}) if err == nil { _ = db.indexStore.Rebuild(opt.CollectionName, DefaultPartition, opt.IndexedFields, records, lineIdx) @@ -965,8 +965,23 @@ func (db *LineDb) tryIndexLookupPartitioned(filter any, collectionName string, o return true, filtered, nil } +// indexedFieldKeysEqual возвращает true, если ключи индекса по всем indexedFields совпадают +// (та же семантика, что getFieldValue/valueToIndexKey в индексе). +func indexedFieldKeysEqual(oldRec, newRec map[string]any, indexedFields []string) bool { + for _, f := range indexedFields { + if getFieldValue(oldRec, f) != getFieldValue(newRec, f) { + return false + } + } + return true +} + // tryIndexUpdate выполняет точечное обновление через индекс. Возвращает (result, used, err). -func (db *LineDb) tryIndexUpdate(adapter *JSONLFile, filter any, dataMap map[string]any, collectionName string, indexedFields []string, options LineDbAdapterOptions) ([]any, bool, error) { +// partition — имя партиции для IndexStore (для непартиционированных коллекций: DefaultPartition). +func (db *LineDb) tryIndexUpdate(adapter *JSONLFile, filter any, dataMap map[string]any, collectionName string, partition string, indexedFields []string, options LineDbAdapterOptions) ([]any, bool, error) { + if partition == "" { + partition = DefaultPartition + } filterMap, ok := filter.(map[string]any) if !ok || len(filterMap) == 0 { return nil, false, nil @@ -990,7 +1005,7 @@ func (db *LineDb) tryIndexUpdate(adapter *JSONLFile, filter any, dataMap map[str } var indexes []int for _, p := range posList { - if p.Partition == DefaultPartition { + if p.Partition == partition { indexes = append(indexes, p.LineIndex) } } @@ -1033,21 +1048,26 @@ func (db *LineDb) tryIndexUpdate(adapter *JSONLFile, filter any, dataMap map[str if err := adapter.WriteAtLineIndexes(updated, toUpdatePos, opt); err != nil { return nil, false, err } - for i, rec := range toUpdate { - if m, ok := rec.(map[string]any); ok { - db.indexStore.UnindexRecord(collectionName, DefaultPartition, indexedFields, m, toUpdatePos[i]) + for i := range toUpdate { + oldM, ok1 := toUpdate[i].(map[string]any) + newM, ok2 := updated[i].(map[string]any) + if !ok1 || !ok2 { + continue } - } - for i, rec := range updated { - if m, ok := rec.(map[string]any); ok { - db.indexStore.IndexRecord(collectionName, DefaultPartition, indexedFields, m, toUpdatePos[i]) + if indexedFieldKeysEqual(oldM, newM, indexedFields) { + continue } + db.indexStore.UnindexRecord(collectionName, partition, indexedFields, oldM, toUpdatePos[i]) + db.indexStore.IndexRecord(collectionName, partition, indexedFields, newM, toUpdatePos[i]) } return updated, true, nil } // tryIndexDelete выполняет точечное удаление через индекс. Возвращает (deletedRecords, used, err). -func (db *LineDb) tryIndexDelete(adapter *JSONLFile, filter any, collectionName string, indexedFields []string, options LineDbAdapterOptions) ([]any, bool, error) { +func (db *LineDb) tryIndexDelete(adapter *JSONLFile, filter any, collectionName string, partition string, indexedFields []string, options LineDbAdapterOptions) ([]any, bool, error) { + if partition == "" { + partition = DefaultPartition + } filterMap, ok := filter.(map[string]any) if !ok || len(filterMap) == 0 { return nil, false, nil @@ -1071,7 +1091,7 @@ func (db *LineDb) tryIndexDelete(adapter *JSONLFile, filter any, collectionName } var indexes []int for _, p := range posList { - if p.Partition == DefaultPartition { + if p.Partition == partition { indexes = append(indexes, p.LineIndex) } } @@ -1101,7 +1121,7 @@ func (db *LineDb) tryIndexDelete(adapter *JSONLFile, filter any, collectionName } for i, rec := range toDel { if m, ok := rec.(map[string]any); ok { - db.indexStore.UnindexRecord(collectionName, DefaultPartition, indexedFields, m, toDelPos[i]) + db.indexStore.UnindexRecord(collectionName, partition, indexedFields, m, toDelPos[i]) } } return toDel, true, nil @@ -1593,6 +1613,10 @@ func (db *LineDb) updatePartitioned(data any, collectionName string, filter any, if err != nil { return nil, err } + dataMap, err := db.toMap(data) + if err != nil { + return nil, fmt.Errorf("invalid update data format: %w", err) + } opts := db.getCollectionOptions(collectionName) var allResults []any @@ -1608,6 +1632,16 @@ func (db *LineDb) updatePartitioned(data any, collectionName string, filter any, } if adapter != nil { + if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { + upd, used, terr := db.tryIndexUpdate(adapter, filter, dataMap, collectionName, partitionName, opts.IndexedFields, options) + if terr != nil { + return nil, terr + } + if used { + allResults = append(allResults, upd...) + continue + } + } results, err := adapter.Update(data, filter, options) if err != nil { return nil, err @@ -1645,6 +1679,16 @@ func (db *LineDb) deletePartitioned(data any, collectionName string, options Lin } if adapter != nil { + if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { + del, used, derr := db.tryIndexDelete(adapter, data, collectionName, partitionName, opts.IndexedFields, options) + if derr != nil { + return nil, derr + } + if used { + allResults = append(allResults, del...) + continue + } + } results, err := adapter.Delete(data, options) if err != nil { return nil, err