@@ -1,7 +1,9 @@
package linedb
import (
"encoding/json"
"fmt"
"math"
"os"
"path/filepath"
"sort"
@@ -88,6 +90,8 @@ func (db *LineDb) Init(force bool, initOptions *LineDbInitOptions) error {
if dbFolder == "" {
dbFolder = "linedb"
}
// Нормализуем и сохраняем фактический путь, чтобы партиции создавались в той же папке
db . initOptions . DBFolder = dbFolder
if err := os . MkdirAll ( dbFolder , 0755 ) ; err != nil {
return fmt . Errorf ( "failed to create database folder: %w" , err )
@@ -229,7 +233,13 @@ func (db *LineDb) seedLastIDPartitioned(dbFolder, baseName string) error {
if existing , ok := db . adapters [ partName ] ; ok {
adapter = existing
} else {
adapter = NewJSONLFile ( path , "" , JSONLFileOptions { CollectionName : partName } )
// Партиции должны наследовать опции базовой коллекции (allocSize/encode/crypto/marshal/etc. )
opts := JSONLFileOptions { CollectionName : partName }
if baseOpts := db . getCollectionOptions ( baseName ) ; baseOpts != nil {
opts = * baseOpts
opts . CollectionName = partName
}
adapter = NewJSONLFile ( path , opts . EncryptKeyForLineDb , opts )
if err := adapter . Init ( false , LineDbAdapterOptions { } ) ; err != nil {
continue
}
@@ -350,6 +360,10 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO
itemMap [ "id" ] = newID
} else {
if n , ok := coerceInsertIDToStrictInt ( itemMap [ "id" ] ) ; ok {
itemMap [ "id" ] = n
db . lastIDManager . SetLastID ( collectionName , n )
}
// Проверяем существование записи если не пропускаем проверку
if ! options . SkipCheckExistingForWrite {
filter := map [ string ] any { "id" : itemMap [ "id" ] }
@@ -383,7 +397,7 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO
writeOptions := LineDbAdapterOptions { InTransaction : true , InternalCall : true }
if db . indexStore != nil {
opts := db . getCollectionOptions ( collectionName )
if opts != nil && len ( opts . IndexedFields ) > 0 {
if opts != nil && len ( opts . IndexedFields ) > 0 && ! opts . RebuildIndexesOnFileChange {
// Точечное IndexRecord в Write (LineCount → стартовая строка для новых записей)
writeOptions . DoIndexing = true
}
@@ -392,12 +406,7 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO
return fmt . Errorf ( "failed to write data: %w" , err )
}
// Обновляем кэш
if db . cacheExternal != nil {
for _ , item := range resultDataArray {
db . cacheExternal . UpdateCacheAfterInsert ( item , collectionName )
}
}
db . clearCacheForCollection ( collectionName )
return nil
}
@@ -414,10 +423,13 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
collectionName = db . getFirstCollection ( )
}
opts := db . getCollectionOptions ( collectionName )
suppressPartialIndex := opts != nil && opts . RebuildIndexesOnFileChange
doPartial := options . DoIndexing && ! suppressPartialIndex
// Проверяем партиционирование
if db . isCollectionPartitioned ( collectionName ) {
dataArray := db . normalizeDataArray ( data )
opts := db . getCollectionOptions ( collectionName )
for _ , item := range dataArray {
adapter , err := db . getPartitionAdapter ( item , collectionName )
if err != nil {
@@ -425,7 +437,7 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
}
partitionName := adapter . GetCollectionName ( )
var startLine int
if options . DoIndexing && db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if doPartial && db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if c , err := adapter . LineCount ( ) ; err == nil {
startLine = c
}
@@ -433,12 +445,18 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
if err := adapter . Write ( item , options ) ; err != nil {
return fmt . Errorf ( "failed to write to partition: %w" , err )
}
if options . DoIndexing && db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if doPartial && db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if m , err := db . toMap ( item ) ; err == nil {
db . indexStore . IndexRecord ( collectionName , partitionName , opts . IndexedFields , m , startLine )
}
}
}
if suppressPartialIndex {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
db . rebuildAllIndexesForLogicalCollection ( collectionName )
}
db . clearCacheForCollection ( collectionName )
}
return nil
}
@@ -449,7 +467,7 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
}
var startLine int
if options . DoIndexing && db . indexStore != nil {
if doPartial && db . indexStore != nil {
if c , err := adapter . LineCount ( ) ; err == nil {
startLine = c
}
@@ -460,8 +478,7 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
}
// Точечное индексирование при DoIndexing
if options . DoIndexing && db . indexStore != nil {
opts := db . getCollectionOptions ( collectionName )
if doPartial && db . indexStore != nil {
if opts != nil && len ( opts . IndexedFields ) > 0 {
dataArray := db . normalizeDataArray ( data )
for i , record := range dataArray {
@@ -471,6 +488,12 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
}
}
}
if suppressPartialIndex {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
db . rebuildAllIndexesForLogicalCollection ( collectionName )
}
db . clearCacheForCollection ( collectionName )
}
return nil
}
@@ -524,16 +547,13 @@ func (db *LineDb) Update(data any, collectionName string, filter any, options Li
// Пробуем точечный Update через индекс (без полного чтения файла)
opts := db . getCollectionOptions ( collectionName )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! db . isCollectionPartitioned ( collectionName ) {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! db . isCollectionPartitioned ( collectionName ) && ! opts . RebuildIndexesOnFileChange {
result , used , err := db . tryIndexUpdate ( adapter , filter , dataMap , collectionName , DefaultPartition , opts . IndexedFields , options )
if err != nil {
return nil , err
}
if used {
if db . cacheExternal != nil {
ids := extractIDsFromRecords ( result )
db . cacheExternal . ClearEntriesContainingIDs ( ids )
}
db . clearCacheForCollection ( collectionName )
return result , nil
}
}
@@ -552,10 +572,7 @@ func (db *LineDb) Update(data any, collectionName string, filter any, options Li
}
}
}
if db . cacheExternal != nil {
ids := extractIDsFromRecords ( result )
db . cacheExternal . ClearEntriesContainingIDs ( ids )
}
db . clearCacheForCollection ( collectionName )
return result , nil
}
@@ -590,16 +607,13 @@ func (db *LineDb) Delete(data any, collectionName string, options LineDbAdapterO
}
opts := db . getCollectionOptions ( collectionName )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! opts . RebuildIndexesOnFileChange {
result , used , err := db . tryIndexDelete ( adapter , data , collectionName , DefaultPartition , opts . IndexedFields , options )
if err != nil {
return nil , err
}
if used {
if db . cacheExternal != nil {
ids := extractIDsFromRecords ( result )
db . cacheExternal . ClearEntriesContainingIDs ( ids )
}
db . clearCacheForCollection ( collectionName )
return result , nil
}
}
@@ -618,10 +632,7 @@ func (db *LineDb) Delete(data any, collectionName string, options LineDbAdapterO
}
}
}
if db . cacheExternal != nil {
ids := extractIDsFromRecords ( result )
db . cacheExternal . ClearEntriesContainingIDs ( ids )
}
db . clearCacheForCollection ( collectionName )
return result , nil
}
@@ -672,7 +683,7 @@ func (db *LineDb) ReadByFilter(filter any, collectionName string, options LineDb
if err != nil || ( options . FailOnFailureIndexRead && ! hit ) {
return nil , fmt . Errorf ( "index read failed: %w" , err )
}
if hit && err == nil {
if hit {
if db . cacheExternal != nil && ! options . InTransaction {
db . cacheExternal . Set ( db . generateCacheKey ( filter , collectionName ) , result )
}
@@ -703,7 +714,7 @@ func (db *LineDb) ReadByFilter(filter any, collectionName string, options LineDb
if err != nil || ( options . FailOnFailureIndexRead && ! hit ) {
return nil , fmt . Errorf ( "index read failed: %w" , err )
}
if hit && err == nil {
if hit {
if db . cacheExternal != nil && ! options . InTransaction {
db . cacheExternal . Set ( db . generateCacheKey ( filter , collectionName ) , result )
}
@@ -758,15 +769,128 @@ func (db *LineDb) ClearCache(collectionName string, options LineDbAdapterOptions
if collectionName == "" {
db . cacheExternal . Clear ( )
} else {
// Очищаем только записи для конкретной коллекции
// Это упрощенная реализация
db . cacheExternal . Clear ( )
db . cacheExternal . ClearCollection ( collectionName )
}
}
return nil
}
// clearCacheForCollection сбрасывает кэш запросов по логической коллекции (включая ключи партиций base_*).
func ( db * LineDb ) clearCacheForCollection ( collectionName string ) {
if db . cacheExternal == nil || collectionName == "" {
return
}
db . cacheExternal . ClearCollection ( collectionName )
}
// rebuildAllIndexesForLogicalCollection полностью пересобирает индекс по всем файлам/партициям логической коллекции.
// Вызывать при удержании db.mutex (или внутри транзакции с внешней блокировкой).
func ( db * LineDb ) rebuildAllIndexesForLogicalCollection ( logicalName string ) {
if db . indexStore == nil {
return
}
opts := db . getCollectionOptions ( logicalName )
if opts == nil || len ( opts . IndexedFields ) == 0 {
return
}
if db . isCollectionPartitioned ( logicalName ) {
for _ , adapter := range db . partitionStorageAdaptersOrdered ( logicalName ) {
partName := adapter . GetCollectionName ( )
records , lineIdx , err := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { } )
if err != nil {
continue
}
_ = db . indexStore . Rebuild ( logicalName , partName , opts . IndexedFields , records , lineIdx )
}
return
}
adapter := db . adapters [ logicalName ]
if adapter == nil {
return
}
records , lineIdx , err := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { } )
if err != nil {
return
}
_ = db . indexStore . Rebuild ( logicalName , DefaultPartition , opts . IndexedFields , records , lineIdx )
}
func coerceInsertIDToStrictInt ( v any ) ( int , bool ) {
switch x := v . ( type ) {
case int :
return x , true
case int8 :
return int ( x ) , true
case int16 :
return int ( x ) , true
case int32 :
return int ( x ) , true
case int64 :
if x < int64 ( math . MinInt ) || x > int64 ( math . MaxInt ) {
return 0 , false
}
return int ( x ) , true
case uint :
if uint64 ( x ) > uint64 ( math . MaxInt ) {
return 0 , false
}
return int ( x ) , true
case uint8 :
return int ( x ) , true
case uint16 :
return int ( x ) , true
case uint32 :
return int ( x ) , true
case uint64 :
if x > uint64 ( math . MaxInt ) {
return 0 , false
}
return int ( x ) , true
case float32 :
return coerceFloatToStrictInt ( float64 ( x ) )
case float64 :
return coerceFloatToStrictInt ( x )
case string :
s := strings . TrimSpace ( x )
if s == "" {
return 0 , false
}
n , err := strconv . ParseInt ( s , 10 , 64 )
if err != nil {
return 0 , false
}
if n < int64 ( math . MinInt ) || n > int64 ( math . MaxInt ) {
return 0 , false
}
return int ( n ) , true
case json . Number :
n , err := x . Int64 ( )
if err != nil {
return 0 , false
}
if n < int64 ( math . MinInt ) || n > int64 ( math . MaxInt ) {
return 0 , false
}
return int ( n ) , true
default :
return 0 , false
}
}
func coerceFloatToStrictInt ( x float64 ) ( int , bool ) {
if x != x {
return 0 , false
}
if x < float64 ( math . MinInt ) || x > float64 ( math . MaxInt ) {
return 0 , false
}
if x != math . Trunc ( x ) {
return 0 , false
}
return int ( x ) , true
}
// Close закрывает базу данных
func ( db * LineDb ) Close ( ) {
db . mutex . Lock ( )
@@ -811,7 +935,7 @@ func (db *LineDb) indexRebuildTimerLoop(interval time.Duration) {
db . mutex . Lock ( )
if db . indexStore != nil && db . initOptions != nil {
for _ , opt := range db . initOptions . Collections {
if len ( opt . IndexedFields ) == 0 {
if len ( opt . IndexedFields ) == 0 || opt . RebuildIndexesOnFileChange {
continue
}
if db . isCollectionPartitioned ( opt . CollectionName ) {
@@ -1159,7 +1283,7 @@ func (db *LineDb) getCollectionOptions(collectionName string) *JSONLFileOptions
}
// isValueEmpty проверяет, считается ли значение "пустым" (nil или "" для string) — для обратной совместимости
func ( db * LineDb ) i sValueEmpty( v any ) bool {
func ( db * LineDb ) I sValueEmpty( v any ) bool {
if v == nil {
return true
}
@@ -1397,7 +1521,13 @@ func (db *LineDb) getPartitionAdapter(data any, collectionName string) (*JSONLFi
if ! exists {
// Создаем новый адаптер для партиции
filename := filepath . Join ( db . initOptions . DBFolder , partitionName + ".jsonl" )
adapter = NewJSONLFile ( filename , "" , JSONLFileOptions { CollectionName : partitionName } )
// Партиции должны наследовать опции базовой коллекции (allocSize/encode/crypto/marshal/etc. )
opts := JSONLFileOptions { CollectionName : partitionName }
if baseOpts := db . getCollectionOptions ( collectionName ) ; baseOpts != nil {
opts = * baseOpts
opts . CollectionName = partitionName
}
adapter = NewJSONLFile ( filename , opts . EncryptKeyForLineDb , opts )
if err := adapter . Init ( false , LineDbAdapterOptions { } ) ; err != nil {
return nil , fmt . Errorf ( "failed to init partition adapter: %w" , err )
@@ -1471,7 +1601,7 @@ func (db *LineDb) valuesMatch(a, b any, strictCompare bool) bool {
// Сравнение строк
if aStr , ok := a . ( string ) ; ok {
if bStr , ok := b . ( string ) ; ok {
return strings . EqualFold ( aStr , bStr )
return matchStringByPattern ( aStr , bStr , strictCompare )
}
}
@@ -1561,24 +1691,6 @@ func (db *LineDb) compareIDs(a, b any) bool {
return a == b
}
// extractIDsFromRecords извлекает id из списка записей (map[string]any).
func extractIDsFromRecords ( records [ ] any ) [ ] any {
if len ( records ) == 0 {
return nil
}
ids := make ( [ ] any , 0 , len ( records ) )
for _ , rec := range records {
m , ok := rec . ( map [ string ] any )
if ! ok {
continue
}
if id , exists := m [ "id" ] ; exists && id != nil {
ids = append ( ids , id )
}
}
return ids
}
func ( db * LineDb ) generateCacheKey ( filter any , collectionName string ) string {
// Упрощенная реализация генерации ключа кэша
return fmt . Sprintf ( "%s:%v" , collectionName , filter )
@@ -1654,7 +1766,7 @@ func (db *LineDb) updatePartitioned(data any, collectionName string, filter any,
var allResults [ ] any
for _ , adapter := range db . partitionStorageAdaptersOrdered ( baseName ) {
partitionName := adapter . GetCollectionName ( )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! opts . RebuildIndexesOnFileChange {
upd , used , terr := db . tryIndexUpdate ( adapter , filter , dataMap , collectionName , partitionName , opts . IndexedFields , options )
if terr != nil {
return nil , terr
@@ -1669,7 +1781,7 @@ func (db *LineDb) updatePartitioned(data any, collectionName string, filter any,
return nil , err
}
allResults = append ( allResults , results ... )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! opts . RebuildIndexesOnFileChange {
records , lineIdx , readErr := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { InTransaction : true } )
if readErr == nil {
_ = db . indexStore . Rebuild ( collectionName , partitionName , opts . IndexedFields , records , lineIdx )
@@ -1677,6 +1789,11 @@ func (db *LineDb) updatePartitioned(data any, collectionName string, filter any,
}
}
if opts != nil && opts . RebuildIndexesOnFileChange && db . indexStore != nil && len ( opts . IndexedFields ) > 0 {
db . rebuildAllIndexesForLogicalCollection ( collectionName )
}
db . clearCacheForCollection ( collectionName )
return allResults , nil
}
@@ -1687,7 +1804,7 @@ func (db *LineDb) deletePartitioned(data any, collectionName string, options Lin
var allResults [ ] any
for _ , adapter := range db . partitionStorageAdaptersOrdered ( baseName ) {
partitionName := adapter . GetCollectionName ( )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! opts . RebuildIndexesOnFileChange {
del , used , derr := db . tryIndexDelete ( adapter , data , collectionName , partitionName , opts . IndexedFields , options )
if derr != nil {
return nil , derr
@@ -1702,7 +1819,7 @@ func (db *LineDb) deletePartitioned(data any, collectionName string, options Lin
return nil , err
}
allResults = append ( allResults , results ... )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! opts . RebuildIndexesOnFileChange {
records , lineIdx , readErr := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { InTransaction : true } )
if readErr == nil {
_ = db . indexStore . Rebuild ( collectionName , partitionName , opts . IndexedFields , records , lineIdx )
@@ -1710,6 +1827,11 @@ func (db *LineDb) deletePartitioned(data any, collectionName string, options Lin
}
}
if opts != nil && opts . RebuildIndexesOnFileChange && db . indexStore != nil && len ( opts . IndexedFields ) > 0 {
db . rebuildAllIndexesForLogicalCollection ( collectionName )
}
db . clearCacheForCollection ( collectionName )
return allResults , nil
}