From adee0bc805901798f950e75e26c7329618c995bf Mon Sep 17 00:00:00 2001 From: "direct-dev.ru" Date: Thu, 14 May 2026 14:31:09 +0600 Subject: [PATCH] added reindexonfilechange --- pkg/linedb/line_db.go | 163 ++++++++++++++++++++++++++++++++++++++---- pkg/linedb/types.go | 4 ++ 2 files changed, 153 insertions(+), 14 deletions(-) diff --git a/pkg/linedb/line_db.go b/pkg/linedb/line_db.go index c1763b8..c3218d5 100644 --- a/pkg/linedb/line_db.go +++ b/pkg/linedb/line_db.go @@ -1,7 +1,9 @@ package linedb import ( + "encoding/json" "fmt" + "math" "os" "path/filepath" "sort" @@ -358,6 +360,10 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO itemMap["id"] = newID } else { + if n, ok := coerceInsertIDToStrictInt(itemMap["id"]); ok { + itemMap["id"] = n + db.lastIDManager.SetLastID(collectionName, n) + } // Проверяем существование записи если не пропускаем проверку if !options.SkipCheckExistingForWrite { filter := map[string]any{"id": itemMap["id"]} @@ -391,7 +397,7 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO writeOptions := LineDbAdapterOptions{InTransaction: true, InternalCall: true} if db.indexStore != nil { opts := db.getCollectionOptions(collectionName) - if opts != nil && len(opts.IndexedFields) > 0 { + if opts != nil && len(opts.IndexedFields) > 0 && !opts.RebuildIndexesOnFileChange { // Точечное IndexRecord в Write (LineCount → стартовая строка для новых записей) writeOptions.DoIndexing = true } @@ -417,10 +423,13 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp collectionName = db.getFirstCollection() } + opts := db.getCollectionOptions(collectionName) + suppressPartialIndex := opts != nil && opts.RebuildIndexesOnFileChange + doPartial := options.DoIndexing && !suppressPartialIndex + // Проверяем партиционирование if db.isCollectionPartitioned(collectionName) { dataArray := db.normalizeDataArray(data) - opts := db.getCollectionOptions(collectionName) for _, item := range dataArray { adapter, err := db.getPartitionAdapter(item, collectionName) if err != nil { @@ -428,7 +437,7 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp } partitionName := adapter.GetCollectionName() var startLine int - if options.DoIndexing && db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { + if doPartial && db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { if c, err := adapter.LineCount(); err == nil { startLine = c } @@ -436,12 +445,18 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp if err := adapter.Write(item, options); err != nil { return fmt.Errorf("failed to write to partition: %w", err) } - if options.DoIndexing && db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { + if doPartial && db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { if m, err := db.toMap(item); err == nil { db.indexStore.IndexRecord(collectionName, partitionName, opts.IndexedFields, m, startLine) } } } + if suppressPartialIndex { + if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { + db.rebuildAllIndexesForLogicalCollection(collectionName) + } + db.clearCacheForCollection(collectionName) + } return nil } @@ -452,7 +467,7 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp } var startLine int - if options.DoIndexing && db.indexStore != nil { + if doPartial && db.indexStore != nil { if c, err := adapter.LineCount(); err == nil { startLine = c } @@ -463,8 +478,7 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp } // Точечное индексирование при DoIndexing - if options.DoIndexing && db.indexStore != nil { - opts := db.getCollectionOptions(collectionName) + if doPartial && db.indexStore != nil { if opts != nil && len(opts.IndexedFields) > 0 { dataArray := db.normalizeDataArray(data) for i, record := range dataArray { @@ -474,6 +488,12 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp } } } + if suppressPartialIndex { + if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { + db.rebuildAllIndexesForLogicalCollection(collectionName) + } + db.clearCacheForCollection(collectionName) + } return nil } @@ -527,7 +547,7 @@ func (db *LineDb) Update(data any, collectionName string, filter any, options Li // Пробуем точечный Update через индекс (без полного чтения файла) opts := db.getCollectionOptions(collectionName) - if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 && !db.isCollectionPartitioned(collectionName) { + if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 && !db.isCollectionPartitioned(collectionName) && !opts.RebuildIndexesOnFileChange { result, used, err := db.tryIndexUpdate(adapter, filter, dataMap, collectionName, DefaultPartition, opts.IndexedFields, options) if err != nil { return nil, err @@ -587,7 +607,7 @@ func (db *LineDb) Delete(data any, collectionName string, options LineDbAdapterO } opts := db.getCollectionOptions(collectionName) - if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { + if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 && !opts.RebuildIndexesOnFileChange { result, used, err := db.tryIndexDelete(adapter, data, collectionName, DefaultPartition, opts.IndexedFields, options) if err != nil { return nil, err @@ -764,6 +784,113 @@ func (db *LineDb) clearCacheForCollection(collectionName string) { db.cacheExternal.ClearCollection(collectionName) } +// rebuildAllIndexesForLogicalCollection полностью пересобирает индекс по всем файлам/партициям логической коллекции. +// Вызывать при удержании db.mutex (или внутри транзакции с внешней блокировкой). +func (db *LineDb) rebuildAllIndexesForLogicalCollection(logicalName string) { + if db.indexStore == nil { + return + } + opts := db.getCollectionOptions(logicalName) + if opts == nil || len(opts.IndexedFields) == 0 { + return + } + if db.isCollectionPartitioned(logicalName) { + for _, adapter := range db.partitionStorageAdaptersOrdered(logicalName) { + partName := adapter.GetCollectionName() + records, lineIdx, err := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{}) + if err != nil { + continue + } + _ = db.indexStore.Rebuild(logicalName, partName, opts.IndexedFields, records, lineIdx) + } + return + } + adapter := db.adapters[logicalName] + if adapter == nil { + return + } + records, lineIdx, err := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{}) + if err != nil { + return + } + _ = db.indexStore.Rebuild(logicalName, DefaultPartition, opts.IndexedFields, records, lineIdx) +} + +func coerceInsertIDToStrictInt(v any) (int, bool) { + switch x := v.(type) { + case int: + return x, true + case int8: + return int(x), true + case int16: + return int(x), true + case int32: + return int(x), true + case int64: + if x < int64(math.MinInt) || x > int64(math.MaxInt) { + return 0, false + } + return int(x), true + case uint: + if uint64(x) > uint64(math.MaxInt) { + return 0, false + } + return int(x), true + case uint8: + return int(x), true + case uint16: + return int(x), true + case uint32: + return int(x), true + case uint64: + if x > uint64(math.MaxInt) { + return 0, false + } + return int(x), true + case float32: + return coerceFloatToStrictInt(float64(x)) + case float64: + return coerceFloatToStrictInt(x) + case string: + s := strings.TrimSpace(x) + if s == "" { + return 0, false + } + n, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return 0, false + } + if n < int64(math.MinInt) || n > int64(math.MaxInt) { + return 0, false + } + return int(n), true + case json.Number: + n, err := x.Int64() + if err != nil { + return 0, false + } + if n < int64(math.MinInt) || n > int64(math.MaxInt) { + return 0, false + } + return int(n), true + default: + return 0, false + } +} + +func coerceFloatToStrictInt(x float64) (int, bool) { + if x != x { + return 0, false + } + if x < float64(math.MinInt) || x > float64(math.MaxInt) { + return 0, false + } + if x != math.Trunc(x) { + return 0, false + } + return int(x), true +} + // Close закрывает базу данных func (db *LineDb) Close() { db.mutex.Lock() @@ -808,7 +935,7 @@ func (db *LineDb) indexRebuildTimerLoop(interval time.Duration) { db.mutex.Lock() if db.indexStore != nil && db.initOptions != nil { for _, opt := range db.initOptions.Collections { - if len(opt.IndexedFields) == 0 { + if len(opt.IndexedFields) == 0 || opt.RebuildIndexesOnFileChange { continue } if db.isCollectionPartitioned(opt.CollectionName) { @@ -1639,7 +1766,7 @@ func (db *LineDb) updatePartitioned(data any, collectionName string, filter any, var allResults []any for _, adapter := range db.partitionStorageAdaptersOrdered(baseName) { partitionName := adapter.GetCollectionName() - if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { + if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 && !opts.RebuildIndexesOnFileChange { upd, used, terr := db.tryIndexUpdate(adapter, filter, dataMap, collectionName, partitionName, opts.IndexedFields, options) if terr != nil { return nil, terr @@ -1654,7 +1781,7 @@ func (db *LineDb) updatePartitioned(data any, collectionName string, filter any, return nil, err } allResults = append(allResults, results...) - if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { + if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 && !opts.RebuildIndexesOnFileChange { records, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{InTransaction: true}) if readErr == nil { _ = db.indexStore.Rebuild(collectionName, partitionName, opts.IndexedFields, records, lineIdx) @@ -1662,6 +1789,10 @@ func (db *LineDb) updatePartitioned(data any, collectionName string, filter any, } } + if opts != nil && opts.RebuildIndexesOnFileChange && db.indexStore != nil && len(opts.IndexedFields) > 0 { + db.rebuildAllIndexesForLogicalCollection(collectionName) + } + db.clearCacheForCollection(collectionName) return allResults, nil } @@ -1673,7 +1804,7 @@ func (db *LineDb) deletePartitioned(data any, collectionName string, options Lin var allResults []any for _, adapter := range db.partitionStorageAdaptersOrdered(baseName) { partitionName := adapter.GetCollectionName() - if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { + if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 && !opts.RebuildIndexesOnFileChange { del, used, derr := db.tryIndexDelete(adapter, data, collectionName, partitionName, opts.IndexedFields, options) if derr != nil { return nil, derr @@ -1688,7 +1819,7 @@ func (db *LineDb) deletePartitioned(data any, collectionName string, options Lin return nil, err } allResults = append(allResults, results...) - if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { + if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 && !opts.RebuildIndexesOnFileChange { records, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{InTransaction: true}) if readErr == nil { _ = db.indexStore.Rebuild(collectionName, partitionName, opts.IndexedFields, records, lineIdx) @@ -1696,6 +1827,10 @@ func (db *LineDb) deletePartitioned(data any, collectionName string, options Lin } } + if opts != nil && opts.RebuildIndexesOnFileChange && db.indexStore != nil && len(opts.IndexedFields) > 0 { + db.rebuildAllIndexesForLogicalCollection(collectionName) + } + db.clearCacheForCollection(collectionName) return allResults, nil } diff --git a/pkg/linedb/types.go b/pkg/linedb/types.go index 617b6b7..7731c06 100644 --- a/pkg/linedb/types.go +++ b/pkg/linedb/types.go @@ -105,6 +105,10 @@ type JSONLFileOptions struct { SkipInvalidLines bool `json:"skipInvalidLines,omitempty"` DecryptKey string `json:"decryptKey,omitempty"` ConvertStringIdToNumber bool `json:"convertStringIdToNumber,omitempty"` + // RebuildIndexesOnFileChange — после каждой мутации файла коллекции (Insert/Update/Delete через LineDb) + // полная пересборка всех индексов по этой логической коллекции; частичное индексирование (IndexRecord), + // tryIndexUpdate/tryIndexDelete и периодический ребилд по таймеру для этой коллекции отключаются. + RebuildIndexesOnFileChange bool `json:"rebuildIndexesOnFileChange,omitempty"` // Функции сериализации и десериализации JSON JSONMarshal func(any) ([]byte, error) `json:"-"` JSONUnmarshal func([]byte, any) error `json:"-"`