@@ -365,8 +365,9 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO
writeOptions := LineDbAdapterOptions { InTransaction : true , InternalCall : true }
if db . indexStore != nil {
opts := db . getCollectionOptions ( collectionName )
if opts != nil && len ( opts . IndexedFields ) > 0 && db . isCollectionPartitioned ( collectionName ) {
writeOptions . DoIndexing = true // индекс строится точечно при Write в партиции
if opts != nil && len ( opts . IndexedFields ) > 0 {
// Точечное IndexRecord в Write (LineCount → стартовая строка для новых записей)
writeOptions . DoIndexing = true
}
}
if err := db . Write ( resultDataArray , collectionName , writeOptions ) ; err != nil {
@@ -380,20 +381,6 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO
}
}
// Обновляем индекс (полная пересборка по коллекции) — только для непартиционированных
if db . indexStore != nil {
opts := db . getCollectionOptions ( collectionName )
if opts != nil && len ( opts . IndexedFields ) > 0 && ! db . isCollectionPartitioned ( collectionName ) {
adapter , exists := db . adapters [ collectionName ]
if exists {
allRecords , lineIdx , readErr := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { } )
if readErr == nil {
_ = db . indexStore . Rebuild ( collectionName , DefaultPartition , opts . IndexedFields , allRecords , lineIdx )
}
}
}
}
return nil
}
@@ -520,7 +507,7 @@ func (db *LineDb) Update(data any, collectionName string, filter any, options Li
// Пробуем точечный Update через индекс (без полного чтения файла)
opts := db . getCollectionOptions ( collectionName )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! db . isCollectionPartitioned ( collectionName ) {
result , used , err := db . tryIndexUpdate ( adapter , filter , dataMap , collectionName , opts . IndexedFields , options )
result , used , err := db . tryIndexUpdate ( adapter , filter , dataMap , collectionName , DefaultPartition , opts . IndexedFields , options )
if err != nil {
return nil , err
}
@@ -586,7 +573,7 @@ func (db *LineDb) Delete(data any, collectionName string, options LineDbAdapterO
opts := db . getCollectionOptions ( collectionName )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
result , used , err := db . tryIndexDelete ( adapter , data , collectionName , opts . IndexedFields , options )
result , used , err := db . tryIndexDelete ( adapter , data , collectionName , DefaultPartition , opts . IndexedFields , options )
if err != nil {
return nil , err
}
@@ -792,6 +779,8 @@ func (db *LineDb) Close() {
// indexRebuildTimerLoop выполняет полный ребилд всех индексов с заданным интервалом (IndexRebuildTimer).
// Работает в отдельной горутине; при каждом тике берёт db.mutex.Lock() на время ребилда.
// Перед ребилдом коллекции: если в файле есть пустые слоты (след точечного удаления), вызывается
// CompactFile — перезапись без «дыр», затем индексирование по сжатому файлу.
// Останавливается при закрытии db.indexRebuildDone.
func ( db * LineDb ) indexRebuildTimerLoop ( interval time . Duration ) {
ticker := time . NewTicker ( interval )
@@ -813,6 +802,13 @@ func (db *LineDb) indexRebuildTimerLoop(interval time.Duration) {
if ! strings . HasPrefix ( name , baseName + "_" ) {
continue
}
dirty , err := adapter . HasBlankSlots ( LineDbAdapterOptions { } )
if err != nil {
continue
}
if dirty {
_ = adapter . CompactFile ( LineDbAdapterOptions { } )
}
records , lineIdx , err := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { } )
if err != nil {
continue
@@ -822,6 +818,10 @@ func (db *LineDb) indexRebuildTimerLoop(interval time.Duration) {
} else {
adapter := db . adapters [ opt . CollectionName ]
if adapter != nil {
dirty , err := adapter . HasBlankSlots ( LineDbAdapterOptions { } )
if err == nil && dirty {
_ = adapter . CompactFile ( LineDbAdapterOptions { } )
}
records , lineIdx , err := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { } )
if err == nil {
_ = db . indexStore . Rebuild ( opt . CollectionName , DefaultPartition , opt . IndexedFields , records , lineIdx )
@@ -965,8 +965,23 @@ func (db *LineDb) tryIndexLookupPartitioned(filter any, collectionName string, o
return true , filtered , nil
}
// indexedFieldKeysEqual возвращает true, если ключи индекса по всем indexedFields совпадают
// (та же семантика, что getFieldValue/valueToIndexKey в индексе).
func indexedFieldKeysEqual ( oldRec , newRec map [ string ] any , indexedFields [ ] string ) bool {
for _ , f := range indexedFields {
if getFieldValue ( oldRec , f ) != getFieldValue ( newRec , f ) {
return false
}
}
return true
}
// tryIndexUpdate выполняет точечное обновление через индекс. Возвращает (result, used, err).
func ( db * LineDb ) tryIndexUpdate ( adapter * JSONLFile , filter any , dataMap map [ string ] any , collectionName string , indexedFields [ ] string , options LineDbAdapterOptions ) ( [ ] any , bool , error ) {
// partition — имя партиции для IndexStore (для непартиционированных коллекций: DefaultPartition).
func ( db * LineDb ) tryIndexUpdate ( adapter * JSONLFile , filter any , dataMap map [ string ] any , collectionName string , partition string , indexedFields [ ] string , options LineDbAdapterOptions ) ( [ ] any , bool , error ) {
if partition == "" {
partition = DefaultPartition
}
filterMap , ok := filter . ( map [ string ] any )
if ! ok || len ( filterMap ) == 0 {
return nil , false , nil
@@ -990,7 +1005,7 @@ func (db *LineDb) tryIndexUpdate(adapter *JSONLFile, filter any, dataMap map[str
}
var indexes [ ] int
for _ , p := range posList {
if p . Partition == DefaultP artition {
if p . Partition == p artition {
indexes = append ( indexes , p . LineIndex )
}
}
@@ -1033,21 +1048,26 @@ func (db *LineDb) tryIndexUpdate(adapter *JSONLFile, filter any, dataMap map[str
if err := adapter . WriteAtLineIndexes ( updated , toUpdatePos , opt ) ; err != nil {
return nil , false , err
}
for i , rec := range toUpdate {
if m , ok := rec . ( map [ string ] any ) ; ok {
db . indexStore . UnindexRecord ( collectionName , DefaultPartition , indexedFields , m , toUpdatePos [ i ] )
for i := range toUpdate {
oldM , ok1 := toUpdate [ i ] . ( map [ string ] any )
newM , ok2 := updated [ i ] . ( map [ string ] any )
if ! ok1 || ! ok2 {
continue
}
if indexedFieldKeysEqual ( oldM , newM , indexedFields ) {
continue
}
for i , rec := range updated {
if m , ok := rec . ( map [ string ] any ) ; ok {
db . indexStore . IndexRecord ( collectionName , DefaultPartition , indexedFields , m , toUpdatePos [ i ] )
}
db . indexStore . UnindexRecord ( collectionName , partition , indexedFields , oldM , toUpdatePos [ i ] )
db . indexSto re. IndexRecord ( collectionName , partition , indexedFields , newM , toUpdatePos [ i ] )
}
return updated , true , nil
}
// tryIndexDelete выполняет точечное удаление через индекс. Возвращает (deletedRecords, used, err).
func ( db * LineDb ) tryIndexDelete ( adapter * JSONLFile , filter any , collectionName string , indexedFields [ ] string , options LineDbAdapterOptions ) ( [ ] any , bool , error ) {
func ( db * LineDb ) tryIndexDelete ( adapter * JSONLFile , filter any , collectionName string , partition string , indexedFields [ ] string , options LineDbAdapterOptions ) ( [ ] any , bool , error ) {
if partition == "" {
partition = DefaultPartition
}
filterMap , ok := filter . ( map [ string ] any )
if ! ok || len ( filterMap ) == 0 {
return nil , false , nil
@@ -1071,7 +1091,7 @@ func (db *LineDb) tryIndexDelete(adapter *JSONLFile, filter any, collectionName
}
var indexes [ ] int
for _ , p := range posList {
if p . Partition == DefaultP artition {
if p . Partition == p artition {
indexes = append ( indexes , p . LineIndex )
}
}
@@ -1101,7 +1121,7 @@ func (db *LineDb) tryIndexDelete(adapter *JSONLFile, filter any, collectionName
}
for i , rec := range toDel {
if m , ok := rec . ( map [ string ] any ) ; ok {
db . indexStore . UnindexRecord ( collectionName , DefaultP artition, indexedFields , m , toDelPos [ i ] )
db . indexStore . UnindexRecord ( collectionName , p artition, indexedFields , m , toDelPos [ i ] )
}
}
return toDel , true , nil
@@ -1593,6 +1613,10 @@ func (db *LineDb) updatePartitioned(data any, collectionName string, filter any,
if err != nil {
return nil , err
}
dataMap , err := db . toMap ( data )
if err != nil {
return nil , fmt . Errorf ( "invalid update data format: %w" , err )
}
opts := db . getCollectionOptions ( collectionName )
var allResults [ ] any
@@ -1608,6 +1632,16 @@ func (db *LineDb) updatePartitioned(data any, collectionName string, filter any,
}
if adapter != nil {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
upd , used , terr := db . tryIndexUpdate ( adapter , filter , dataMap , collectionName , partitionName , opts . IndexedFields , options )
if terr != nil {
return nil , terr
}
if used {
allResults = append ( allResults , upd ... )
continue
}
}
results , err := adapter . Update ( data , filter , options )
if err != nil {
return nil , err
@@ -1645,6 +1679,16 @@ func (db *LineDb) deletePartitioned(data any, collectionName string, options Lin
}
if adapter != nil {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
del , used , derr := db . tryIndexDelete ( adapter , data , collectionName , partitionName , opts . IndexedFields , options )
if derr != nil {
return nil , derr
}
if used {
allResults = append ( allResults , del ... )
continue
}
}
results , err := adapter . Delete ( data , options )
if err != nil {
return nil , err