added reindexonfilechange
This commit is contained in:
@@ -1,7 +1,9 @@
|
|||||||
package linedb
|
package linedb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
@@ -358,6 +360,10 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO
|
|||||||
|
|
||||||
itemMap["id"] = newID
|
itemMap["id"] = newID
|
||||||
} else {
|
} else {
|
||||||
|
if n, ok := coerceInsertIDToStrictInt(itemMap["id"]); ok {
|
||||||
|
itemMap["id"] = n
|
||||||
|
db.lastIDManager.SetLastID(collectionName, n)
|
||||||
|
}
|
||||||
// Проверяем существование записи если не пропускаем проверку
|
// Проверяем существование записи если не пропускаем проверку
|
||||||
if !options.SkipCheckExistingForWrite {
|
if !options.SkipCheckExistingForWrite {
|
||||||
filter := map[string]any{"id": itemMap["id"]}
|
filter := map[string]any{"id": itemMap["id"]}
|
||||||
@@ -391,7 +397,7 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO
|
|||||||
writeOptions := LineDbAdapterOptions{InTransaction: true, InternalCall: true}
|
writeOptions := LineDbAdapterOptions{InTransaction: true, InternalCall: true}
|
||||||
if db.indexStore != nil {
|
if db.indexStore != nil {
|
||||||
opts := db.getCollectionOptions(collectionName)
|
opts := db.getCollectionOptions(collectionName)
|
||||||
if opts != nil && len(opts.IndexedFields) > 0 {
|
if opts != nil && len(opts.IndexedFields) > 0 && !opts.RebuildIndexesOnFileChange {
|
||||||
// Точечное IndexRecord в Write (LineCount → стартовая строка для новых записей)
|
// Точечное IndexRecord в Write (LineCount → стартовая строка для новых записей)
|
||||||
writeOptions.DoIndexing = true
|
writeOptions.DoIndexing = true
|
||||||
}
|
}
|
||||||
@@ -417,10 +423,13 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
|
|||||||
collectionName = db.getFirstCollection()
|
collectionName = db.getFirstCollection()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
opts := db.getCollectionOptions(collectionName)
|
||||||
|
suppressPartialIndex := opts != nil && opts.RebuildIndexesOnFileChange
|
||||||
|
doPartial := options.DoIndexing && !suppressPartialIndex
|
||||||
|
|
||||||
// Проверяем партиционирование
|
// Проверяем партиционирование
|
||||||
if db.isCollectionPartitioned(collectionName) {
|
if db.isCollectionPartitioned(collectionName) {
|
||||||
dataArray := db.normalizeDataArray(data)
|
dataArray := db.normalizeDataArray(data)
|
||||||
opts := db.getCollectionOptions(collectionName)
|
|
||||||
for _, item := range dataArray {
|
for _, item := range dataArray {
|
||||||
adapter, err := db.getPartitionAdapter(item, collectionName)
|
adapter, err := db.getPartitionAdapter(item, collectionName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -428,7 +437,7 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
|
|||||||
}
|
}
|
||||||
partitionName := adapter.GetCollectionName()
|
partitionName := adapter.GetCollectionName()
|
||||||
var startLine int
|
var startLine int
|
||||||
if options.DoIndexing && db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
|
if doPartial && db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
|
||||||
if c, err := adapter.LineCount(); err == nil {
|
if c, err := adapter.LineCount(); err == nil {
|
||||||
startLine = c
|
startLine = c
|
||||||
}
|
}
|
||||||
@@ -436,12 +445,18 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
|
|||||||
if err := adapter.Write(item, options); err != nil {
|
if err := adapter.Write(item, options); err != nil {
|
||||||
return fmt.Errorf("failed to write to partition: %w", err)
|
return fmt.Errorf("failed to write to partition: %w", err)
|
||||||
}
|
}
|
||||||
if options.DoIndexing && db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
|
if doPartial && db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
|
||||||
if m, err := db.toMap(item); err == nil {
|
if m, err := db.toMap(item); err == nil {
|
||||||
db.indexStore.IndexRecord(collectionName, partitionName, opts.IndexedFields, m, startLine)
|
db.indexStore.IndexRecord(collectionName, partitionName, opts.IndexedFields, m, startLine)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if suppressPartialIndex {
|
||||||
|
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
|
||||||
|
db.rebuildAllIndexesForLogicalCollection(collectionName)
|
||||||
|
}
|
||||||
|
db.clearCacheForCollection(collectionName)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -452,7 +467,7 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
|
|||||||
}
|
}
|
||||||
|
|
||||||
var startLine int
|
var startLine int
|
||||||
if options.DoIndexing && db.indexStore != nil {
|
if doPartial && db.indexStore != nil {
|
||||||
if c, err := adapter.LineCount(); err == nil {
|
if c, err := adapter.LineCount(); err == nil {
|
||||||
startLine = c
|
startLine = c
|
||||||
}
|
}
|
||||||
@@ -463,8 +478,7 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Точечное индексирование при DoIndexing
|
// Точечное индексирование при DoIndexing
|
||||||
if options.DoIndexing && db.indexStore != nil {
|
if doPartial && db.indexStore != nil {
|
||||||
opts := db.getCollectionOptions(collectionName)
|
|
||||||
if opts != nil && len(opts.IndexedFields) > 0 {
|
if opts != nil && len(opts.IndexedFields) > 0 {
|
||||||
dataArray := db.normalizeDataArray(data)
|
dataArray := db.normalizeDataArray(data)
|
||||||
for i, record := range dataArray {
|
for i, record := range dataArray {
|
||||||
@@ -474,6 +488,12 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if suppressPartialIndex {
|
||||||
|
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
|
||||||
|
db.rebuildAllIndexesForLogicalCollection(collectionName)
|
||||||
|
}
|
||||||
|
db.clearCacheForCollection(collectionName)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -527,7 +547,7 @@ func (db *LineDb) Update(data any, collectionName string, filter any, options Li
|
|||||||
|
|
||||||
// Пробуем точечный Update через индекс (без полного чтения файла)
|
// Пробуем точечный Update через индекс (без полного чтения файла)
|
||||||
opts := db.getCollectionOptions(collectionName)
|
opts := db.getCollectionOptions(collectionName)
|
||||||
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 && !db.isCollectionPartitioned(collectionName) {
|
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 && !db.isCollectionPartitioned(collectionName) && !opts.RebuildIndexesOnFileChange {
|
||||||
result, used, err := db.tryIndexUpdate(adapter, filter, dataMap, collectionName, DefaultPartition, opts.IndexedFields, options)
|
result, used, err := db.tryIndexUpdate(adapter, filter, dataMap, collectionName, DefaultPartition, opts.IndexedFields, options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -587,7 +607,7 @@ func (db *LineDb) Delete(data any, collectionName string, options LineDbAdapterO
|
|||||||
}
|
}
|
||||||
|
|
||||||
opts := db.getCollectionOptions(collectionName)
|
opts := db.getCollectionOptions(collectionName)
|
||||||
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
|
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 && !opts.RebuildIndexesOnFileChange {
|
||||||
result, used, err := db.tryIndexDelete(adapter, data, collectionName, DefaultPartition, opts.IndexedFields, options)
|
result, used, err := db.tryIndexDelete(adapter, data, collectionName, DefaultPartition, opts.IndexedFields, options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -764,6 +784,113 @@ func (db *LineDb) clearCacheForCollection(collectionName string) {
|
|||||||
db.cacheExternal.ClearCollection(collectionName)
|
db.cacheExternal.ClearCollection(collectionName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// rebuildAllIndexesForLogicalCollection полностью пересобирает индекс по всем файлам/партициям логической коллекции.
|
||||||
|
// Вызывать при удержании db.mutex (или внутри транзакции с внешней блокировкой).
|
||||||
|
func (db *LineDb) rebuildAllIndexesForLogicalCollection(logicalName string) {
|
||||||
|
if db.indexStore == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
opts := db.getCollectionOptions(logicalName)
|
||||||
|
if opts == nil || len(opts.IndexedFields) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if db.isCollectionPartitioned(logicalName) {
|
||||||
|
for _, adapter := range db.partitionStorageAdaptersOrdered(logicalName) {
|
||||||
|
partName := adapter.GetCollectionName()
|
||||||
|
records, lineIdx, err := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{})
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_ = db.indexStore.Rebuild(logicalName, partName, opts.IndexedFields, records, lineIdx)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
adapter := db.adapters[logicalName]
|
||||||
|
if adapter == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
records, lineIdx, err := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_ = db.indexStore.Rebuild(logicalName, DefaultPartition, opts.IndexedFields, records, lineIdx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func coerceInsertIDToStrictInt(v any) (int, bool) {
|
||||||
|
switch x := v.(type) {
|
||||||
|
case int:
|
||||||
|
return x, true
|
||||||
|
case int8:
|
||||||
|
return int(x), true
|
||||||
|
case int16:
|
||||||
|
return int(x), true
|
||||||
|
case int32:
|
||||||
|
return int(x), true
|
||||||
|
case int64:
|
||||||
|
if x < int64(math.MinInt) || x > int64(math.MaxInt) {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
return int(x), true
|
||||||
|
case uint:
|
||||||
|
if uint64(x) > uint64(math.MaxInt) {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
return int(x), true
|
||||||
|
case uint8:
|
||||||
|
return int(x), true
|
||||||
|
case uint16:
|
||||||
|
return int(x), true
|
||||||
|
case uint32:
|
||||||
|
return int(x), true
|
||||||
|
case uint64:
|
||||||
|
if x > uint64(math.MaxInt) {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
return int(x), true
|
||||||
|
case float32:
|
||||||
|
return coerceFloatToStrictInt(float64(x))
|
||||||
|
case float64:
|
||||||
|
return coerceFloatToStrictInt(x)
|
||||||
|
case string:
|
||||||
|
s := strings.TrimSpace(x)
|
||||||
|
if s == "" {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
n, err := strconv.ParseInt(s, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
if n < int64(math.MinInt) || n > int64(math.MaxInt) {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
return int(n), true
|
||||||
|
case json.Number:
|
||||||
|
n, err := x.Int64()
|
||||||
|
if err != nil {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
if n < int64(math.MinInt) || n > int64(math.MaxInt) {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
return int(n), true
|
||||||
|
default:
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func coerceFloatToStrictInt(x float64) (int, bool) {
|
||||||
|
if x != x {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
if x < float64(math.MinInt) || x > float64(math.MaxInt) {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
if x != math.Trunc(x) {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
return int(x), true
|
||||||
|
}
|
||||||
|
|
||||||
// Close закрывает базу данных
|
// Close закрывает базу данных
|
||||||
func (db *LineDb) Close() {
|
func (db *LineDb) Close() {
|
||||||
db.mutex.Lock()
|
db.mutex.Lock()
|
||||||
@@ -808,7 +935,7 @@ func (db *LineDb) indexRebuildTimerLoop(interval time.Duration) {
|
|||||||
db.mutex.Lock()
|
db.mutex.Lock()
|
||||||
if db.indexStore != nil && db.initOptions != nil {
|
if db.indexStore != nil && db.initOptions != nil {
|
||||||
for _, opt := range db.initOptions.Collections {
|
for _, opt := range db.initOptions.Collections {
|
||||||
if len(opt.IndexedFields) == 0 {
|
if len(opt.IndexedFields) == 0 || opt.RebuildIndexesOnFileChange {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if db.isCollectionPartitioned(opt.CollectionName) {
|
if db.isCollectionPartitioned(opt.CollectionName) {
|
||||||
@@ -1639,7 +1766,7 @@ func (db *LineDb) updatePartitioned(data any, collectionName string, filter any,
|
|||||||
var allResults []any
|
var allResults []any
|
||||||
for _, adapter := range db.partitionStorageAdaptersOrdered(baseName) {
|
for _, adapter := range db.partitionStorageAdaptersOrdered(baseName) {
|
||||||
partitionName := adapter.GetCollectionName()
|
partitionName := adapter.GetCollectionName()
|
||||||
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
|
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 && !opts.RebuildIndexesOnFileChange {
|
||||||
upd, used, terr := db.tryIndexUpdate(adapter, filter, dataMap, collectionName, partitionName, opts.IndexedFields, options)
|
upd, used, terr := db.tryIndexUpdate(adapter, filter, dataMap, collectionName, partitionName, opts.IndexedFields, options)
|
||||||
if terr != nil {
|
if terr != nil {
|
||||||
return nil, terr
|
return nil, terr
|
||||||
@@ -1654,7 +1781,7 @@ func (db *LineDb) updatePartitioned(data any, collectionName string, filter any,
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
allResults = append(allResults, results...)
|
allResults = append(allResults, results...)
|
||||||
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
|
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 && !opts.RebuildIndexesOnFileChange {
|
||||||
records, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{InTransaction: true})
|
records, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{InTransaction: true})
|
||||||
if readErr == nil {
|
if readErr == nil {
|
||||||
_ = db.indexStore.Rebuild(collectionName, partitionName, opts.IndexedFields, records, lineIdx)
|
_ = db.indexStore.Rebuild(collectionName, partitionName, opts.IndexedFields, records, lineIdx)
|
||||||
@@ -1662,6 +1789,10 @@ func (db *LineDb) updatePartitioned(data any, collectionName string, filter any,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if opts != nil && opts.RebuildIndexesOnFileChange && db.indexStore != nil && len(opts.IndexedFields) > 0 {
|
||||||
|
db.rebuildAllIndexesForLogicalCollection(collectionName)
|
||||||
|
}
|
||||||
|
|
||||||
db.clearCacheForCollection(collectionName)
|
db.clearCacheForCollection(collectionName)
|
||||||
return allResults, nil
|
return allResults, nil
|
||||||
}
|
}
|
||||||
@@ -1673,7 +1804,7 @@ func (db *LineDb) deletePartitioned(data any, collectionName string, options Lin
|
|||||||
var allResults []any
|
var allResults []any
|
||||||
for _, adapter := range db.partitionStorageAdaptersOrdered(baseName) {
|
for _, adapter := range db.partitionStorageAdaptersOrdered(baseName) {
|
||||||
partitionName := adapter.GetCollectionName()
|
partitionName := adapter.GetCollectionName()
|
||||||
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
|
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 && !opts.RebuildIndexesOnFileChange {
|
||||||
del, used, derr := db.tryIndexDelete(adapter, data, collectionName, partitionName, opts.IndexedFields, options)
|
del, used, derr := db.tryIndexDelete(adapter, data, collectionName, partitionName, opts.IndexedFields, options)
|
||||||
if derr != nil {
|
if derr != nil {
|
||||||
return nil, derr
|
return nil, derr
|
||||||
@@ -1688,7 +1819,7 @@ func (db *LineDb) deletePartitioned(data any, collectionName string, options Lin
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
allResults = append(allResults, results...)
|
allResults = append(allResults, results...)
|
||||||
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
|
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 && !opts.RebuildIndexesOnFileChange {
|
||||||
records, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{InTransaction: true})
|
records, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{InTransaction: true})
|
||||||
if readErr == nil {
|
if readErr == nil {
|
||||||
_ = db.indexStore.Rebuild(collectionName, partitionName, opts.IndexedFields, records, lineIdx)
|
_ = db.indexStore.Rebuild(collectionName, partitionName, opts.IndexedFields, records, lineIdx)
|
||||||
@@ -1696,6 +1827,10 @@ func (db *LineDb) deletePartitioned(data any, collectionName string, options Lin
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if opts != nil && opts.RebuildIndexesOnFileChange && db.indexStore != nil && len(opts.IndexedFields) > 0 {
|
||||||
|
db.rebuildAllIndexesForLogicalCollection(collectionName)
|
||||||
|
}
|
||||||
|
|
||||||
db.clearCacheForCollection(collectionName)
|
db.clearCacheForCollection(collectionName)
|
||||||
return allResults, nil
|
return allResults, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -105,6 +105,10 @@ type JSONLFileOptions struct {
|
|||||||
SkipInvalidLines bool `json:"skipInvalidLines,omitempty"`
|
SkipInvalidLines bool `json:"skipInvalidLines,omitempty"`
|
||||||
DecryptKey string `json:"decryptKey,omitempty"`
|
DecryptKey string `json:"decryptKey,omitempty"`
|
||||||
ConvertStringIdToNumber bool `json:"convertStringIdToNumber,omitempty"`
|
ConvertStringIdToNumber bool `json:"convertStringIdToNumber,omitempty"`
|
||||||
|
// RebuildIndexesOnFileChange — после каждой мутации файла коллекции (Insert/Update/Delete через LineDb)
|
||||||
|
// полная пересборка всех индексов по этой логической коллекции; частичное индексирование (IndexRecord),
|
||||||
|
// tryIndexUpdate/tryIndexDelete и периодический ребилд по таймеру для этой коллекции отключаются.
|
||||||
|
RebuildIndexesOnFileChange bool `json:"rebuildIndexesOnFileChange,omitempty"`
|
||||||
// Функции сериализации и десериализации JSON
|
// Функции сериализации и десериализации JSON
|
||||||
JSONMarshal func(any) ([]byte, error) `json:"-"`
|
JSONMarshal func(any) ([]byte, error) `json:"-"`
|
||||||
JSONUnmarshal func([]byte, any) error `json:"-"`
|
JSONUnmarshal func([]byte, any) error `json:"-"`
|
||||||
|
|||||||
Reference in New Issue
Block a user