@@ -1,7 +1,9 @@
package linedb
package linedb
import (
import (
"encoding/json"
"fmt"
"fmt"
"math"
"os"
"os"
"path/filepath"
"path/filepath"
"sort"
"sort"
@@ -358,6 +360,10 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO
itemMap [ "id" ] = newID
itemMap [ "id" ] = newID
} else {
} else {
if n , ok := coerceInsertIDToStrictInt ( itemMap [ "id" ] ) ; ok {
itemMap [ "id" ] = n
db . lastIDManager . SetLastID ( collectionName , n )
}
// Проверяем существование записи если не пропускаем проверку
// Проверяем существование записи если не пропускаем проверку
if ! options . SkipCheckExistingForWrite {
if ! options . SkipCheckExistingForWrite {
filter := map [ string ] any { "id" : itemMap [ "id" ] }
filter := map [ string ] any { "id" : itemMap [ "id" ] }
@@ -391,7 +397,7 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO
writeOptions := LineDbAdapterOptions { InTransaction : true , InternalCall : true }
writeOptions := LineDbAdapterOptions { InTransaction : true , InternalCall : true }
if db . indexStore != nil {
if db . indexStore != nil {
opts := db . getCollectionOptions ( collectionName )
opts := db . getCollectionOptions ( collectionName )
if opts != nil && len ( opts . IndexedFields ) > 0 {
if opts != nil && len ( opts . IndexedFields ) > 0 && ! opts . RebuildIndexesOnFileChange {
// Точечное IndexRecord в Write (LineCount → стартовая строка для новых записей)
// Точечное IndexRecord в Write (LineCount → стартовая строка для новых записей)
writeOptions . DoIndexing = true
writeOptions . DoIndexing = true
}
}
@@ -417,10 +423,13 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
collectionName = db . getFirstCollection ( )
collectionName = db . getFirstCollection ( )
}
}
opts := db . getCollectionOptions ( collectionName )
suppressPartialIndex := opts != nil && opts . RebuildIndexesOnFileChange
doPartial := options . DoIndexing && ! suppressPartialIndex
// Проверяем партиционирование
// Проверяем партиционирование
if db . isCollectionPartitioned ( collectionName ) {
if db . isCollectionPartitioned ( collectionName ) {
dataArray := db . normalizeDataArray ( data )
dataArray := db . normalizeDataArray ( data )
opts := db . getCollectionOptions ( collectionName )
for _ , item := range dataArray {
for _ , item := range dataArray {
adapter , err := db . getPartitionAdapter ( item , collectionName )
adapter , err := db . getPartitionAdapter ( item , collectionName )
if err != nil {
if err != nil {
@@ -428,7 +437,7 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
}
}
partitionName := adapter . GetCollectionName ( )
partitionName := adapter . GetCollectionName ( )
var startLine int
var startLine int
if options . DoIndexing && db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if doPartial && db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if c , err := adapter . LineCount ( ) ; err == nil {
if c , err := adapter . LineCount ( ) ; err == nil {
startLine = c
startLine = c
}
}
@@ -436,12 +445,18 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
if err := adapter . Write ( item , options ) ; err != nil {
if err := adapter . Write ( item , options ) ; err != nil {
return fmt . Errorf ( "failed to write to partition: %w" , err )
return fmt . Errorf ( "failed to write to partition: %w" , err )
}
}
if options . DoIndexing && db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if doPartial && db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if m , err := db . toMap ( item ) ; err == nil {
if m , err := db . toMap ( item ) ; err == nil {
db . indexStore . IndexRecord ( collectionName , partitionName , opts . IndexedFields , m , startLine )
db . indexStore . IndexRecord ( collectionName , partitionName , opts . IndexedFields , m , startLine )
}
}
}
}
}
}
if suppressPartialIndex {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
db . rebuildAllIndexesForLogicalCollection ( collectionName )
}
db . clearCacheForCollection ( collectionName )
}
return nil
return nil
}
}
@@ -452,7 +467,7 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
}
}
var startLine int
var startLine int
if options . DoIndexing && db . indexStore != nil {
if doPartial && db . indexStore != nil {
if c , err := adapter . LineCount ( ) ; err == nil {
if c , err := adapter . LineCount ( ) ; err == nil {
startLine = c
startLine = c
}
}
@@ -463,8 +478,7 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
}
}
// Точечное индексирование при DoIndexing
// Точечное индексирование при DoIndexing
if options . DoIndexing && db . indexStore != nil {
if doPartial && db . indexStore != nil {
opts := db . getCollectionOptions ( collectionName )
if opts != nil && len ( opts . IndexedFields ) > 0 {
if opts != nil && len ( opts . IndexedFields ) > 0 {
dataArray := db . normalizeDataArray ( data )
dataArray := db . normalizeDataArray ( data )
for i , record := range dataArray {
for i , record := range dataArray {
@@ -474,6 +488,12 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
}
}
}
}
}
}
if suppressPartialIndex {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
db . rebuildAllIndexesForLogicalCollection ( collectionName )
}
db . clearCacheForCollection ( collectionName )
}
return nil
return nil
}
}
@@ -527,7 +547,7 @@ func (db *LineDb) Update(data any, collectionName string, filter any, options Li
// Пробуем точечный Update через индекс (без полного чтения файла)
// Пробуем точечный Update через индекс (без полного чтения файла)
opts := db . getCollectionOptions ( collectionName )
opts := db . getCollectionOptions ( collectionName )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! db . isCollectionPartitioned ( collectionName ) {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! db . isCollectionPartitioned ( collectionName ) && ! opts . RebuildIndexesOnFileChange {
result , used , err := db . tryIndexUpdate ( adapter , filter , dataMap , collectionName , DefaultPartition , opts . IndexedFields , options )
result , used , err := db . tryIndexUpdate ( adapter , filter , dataMap , collectionName , DefaultPartition , opts . IndexedFields , options )
if err != nil {
if err != nil {
return nil , err
return nil , err
@@ -587,7 +607,7 @@ func (db *LineDb) Delete(data any, collectionName string, options LineDbAdapterO
}
}
opts := db . getCollectionOptions ( collectionName )
opts := db . getCollectionOptions ( collectionName )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! opts . RebuildIndexesOnFileChange {
result , used , err := db . tryIndexDelete ( adapter , data , collectionName , DefaultPartition , opts . IndexedFields , options )
result , used , err := db . tryIndexDelete ( adapter , data , collectionName , DefaultPartition , opts . IndexedFields , options )
if err != nil {
if err != nil {
return nil , err
return nil , err
@@ -764,6 +784,113 @@ func (db *LineDb) clearCacheForCollection(collectionName string) {
db . cacheExternal . ClearCollection ( collectionName )
db . cacheExternal . ClearCollection ( collectionName )
}
}
// rebuildAllIndexesForLogicalCollection полностью пересобирает индекс по всем файлам/партициям логической коллекции.
// Вызывать при удержании db.mutex (или внутри транзакции с внешней блокировкой).
func ( db * LineDb ) rebuildAllIndexesForLogicalCollection ( logicalName string ) {
if db . indexStore == nil {
return
}
opts := db . getCollectionOptions ( logicalName )
if opts == nil || len ( opts . IndexedFields ) == 0 {
return
}
if db . isCollectionPartitioned ( logicalName ) {
for _ , adapter := range db . partitionStorageAdaptersOrdered ( logicalName ) {
partName := adapter . GetCollectionName ( )
records , lineIdx , err := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { } )
if err != nil {
continue
}
_ = db . indexStore . Rebuild ( logicalName , partName , opts . IndexedFields , records , lineIdx )
}
return
}
adapter := db . adapters [ logicalName ]
if adapter == nil {
return
}
records , lineIdx , err := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { } )
if err != nil {
return
}
_ = db . indexStore . Rebuild ( logicalName , DefaultPartition , opts . IndexedFields , records , lineIdx )
}
func coerceInsertIDToStrictInt ( v any ) ( int , bool ) {
switch x := v . ( type ) {
case int :
return x , true
case int8 :
return int ( x ) , true
case int16 :
return int ( x ) , true
case int32 :
return int ( x ) , true
case int64 :
if x < int64 ( math . MinInt ) || x > int64 ( math . MaxInt ) {
return 0 , false
}
return int ( x ) , true
case uint :
if uint64 ( x ) > uint64 ( math . MaxInt ) {
return 0 , false
}
return int ( x ) , true
case uint8 :
return int ( x ) , true
case uint16 :
return int ( x ) , true
case uint32 :
return int ( x ) , true
case uint64 :
if x > uint64 ( math . MaxInt ) {
return 0 , false
}
return int ( x ) , true
case float32 :
return coerceFloatToStrictInt ( float64 ( x ) )
case float64 :
return coerceFloatToStrictInt ( x )
case string :
s := strings . TrimSpace ( x )
if s == "" {
return 0 , false
}
n , err := strconv . ParseInt ( s , 10 , 64 )
if err != nil {
return 0 , false
}
if n < int64 ( math . MinInt ) || n > int64 ( math . MaxInt ) {
return 0 , false
}
return int ( n ) , true
case json . Number :
n , err := x . Int64 ( )
if err != nil {
return 0 , false
}
if n < int64 ( math . MinInt ) || n > int64 ( math . MaxInt ) {
return 0 , false
}
return int ( n ) , true
default :
return 0 , false
}
}
func coerceFloatToStrictInt ( x float64 ) ( int , bool ) {
if x != x {
return 0 , false
}
if x < float64 ( math . MinInt ) || x > float64 ( math . MaxInt ) {
return 0 , false
}
if x != math . Trunc ( x ) {
return 0 , false
}
return int ( x ) , true
}
// Close закрывает базу данных
// Close закрывает базу данных
func ( db * LineDb ) Close ( ) {
func ( db * LineDb ) Close ( ) {
db . mutex . Lock ( )
db . mutex . Lock ( )
@@ -808,7 +935,7 @@ func (db *LineDb) indexRebuildTimerLoop(interval time.Duration) {
db . mutex . Lock ( )
db . mutex . Lock ( )
if db . indexStore != nil && db . initOptions != nil {
if db . indexStore != nil && db . initOptions != nil {
for _ , opt := range db . initOptions . Collections {
for _ , opt := range db . initOptions . Collections {
if len ( opt . IndexedFields ) == 0 {
if len ( opt . IndexedFields ) == 0 || opt . RebuildIndexesOnFileChange {
continue
continue
}
}
if db . isCollectionPartitioned ( opt . CollectionName ) {
if db . isCollectionPartitioned ( opt . CollectionName ) {
@@ -1639,7 +1766,7 @@ func (db *LineDb) updatePartitioned(data any, collectionName string, filter any,
var allResults [ ] any
var allResults [ ] any
for _ , adapter := range db . partitionStorageAdaptersOrdered ( baseName ) {
for _ , adapter := range db . partitionStorageAdaptersOrdered ( baseName ) {
partitionName := adapter . GetCollectionName ( )
partitionName := adapter . GetCollectionName ( )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! opts . RebuildIndexesOnFileChange {
upd , used , terr := db . tryIndexUpdate ( adapter , filter , dataMap , collectionName , partitionName , opts . IndexedFields , options )
upd , used , terr := db . tryIndexUpdate ( adapter , filter , dataMap , collectionName , partitionName , opts . IndexedFields , options )
if terr != nil {
if terr != nil {
return nil , terr
return nil , terr
@@ -1654,7 +1781,7 @@ func (db *LineDb) updatePartitioned(data any, collectionName string, filter any,
return nil , err
return nil , err
}
}
allResults = append ( allResults , results ... )
allResults = append ( allResults , results ... )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! opts . RebuildIndexesOnFileChange {
records , lineIdx , readErr := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { InTransaction : true } )
records , lineIdx , readErr := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { InTransaction : true } )
if readErr == nil {
if readErr == nil {
_ = db . indexStore . Rebuild ( collectionName , partitionName , opts . IndexedFields , records , lineIdx )
_ = db . indexStore . Rebuild ( collectionName , partitionName , opts . IndexedFields , records , lineIdx )
@@ -1662,6 +1789,10 @@ func (db *LineDb) updatePartitioned(data any, collectionName string, filter any,
}
}
}
}
if opts != nil && opts . RebuildIndexesOnFileChange && db . indexStore != nil && len ( opts . IndexedFields ) > 0 {
db . rebuildAllIndexesForLogicalCollection ( collectionName )
}
db . clearCacheForCollection ( collectionName )
db . clearCacheForCollection ( collectionName )
return allResults , nil
return allResults , nil
}
}
@@ -1673,7 +1804,7 @@ func (db *LineDb) deletePartitioned(data any, collectionName string, options Lin
var allResults [ ] any
var allResults [ ] any
for _ , adapter := range db . partitionStorageAdaptersOrdered ( baseName ) {
for _ , adapter := range db . partitionStorageAdaptersOrdered ( baseName ) {
partitionName := adapter . GetCollectionName ( )
partitionName := adapter . GetCollectionName ( )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! opts . RebuildIndexesOnFileChange {
del , used , derr := db . tryIndexDelete ( adapter , data , collectionName , partitionName , opts . IndexedFields , options )
del , used , derr := db . tryIndexDelete ( adapter , data , collectionName , partitionName , opts . IndexedFields , options )
if derr != nil {
if derr != nil {
return nil , derr
return nil , derr
@@ -1688,7 +1819,7 @@ func (db *LineDb) deletePartitioned(data any, collectionName string, options Lin
return nil , err
return nil , err
}
}
allResults = append ( allResults , results ... )
allResults = append ( allResults , results ... )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! opts . RebuildIndexesOnFileChange {
records , lineIdx , readErr := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { InTransaction : true } )
records , lineIdx , readErr := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { InTransaction : true } )
if readErr == nil {
if readErr == nil {
_ = db . indexStore . Rebuild ( collectionName , partitionName , opts . IndexedFields , records , lineIdx )
_ = db . indexStore . Rebuild ( collectionName , partitionName , opts . IndexedFields , records , lineIdx )
@@ -1696,6 +1827,10 @@ func (db *LineDb) deletePartitioned(data any, collectionName string, options Lin
}
}
}
}
if opts != nil && opts . RebuildIndexesOnFileChange && db . indexStore != nil && len ( opts . IndexedFields ) > 0 {
db . rebuildAllIndexesForLogicalCollection ( collectionName )
}
db . clearCacheForCollection ( collectionName )
db . clearCacheForCollection ( collectionName )
return allResults , nil
return allResults , nil
}
}