@@ -1,9 +1,12 @@
package linedb
import (
"encoding/json"
"fmt"
"math"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
@@ -87,6 +90,8 @@ func (db *LineDb) Init(force bool, initOptions *LineDbInitOptions) error {
if dbFolder == "" {
dbFolder = "linedb"
}
// Нормализуем и сохраняем фактический путь, чтобы партиции создавались в той же папке
db . initOptions . DBFolder = dbFolder
if err := os . MkdirAll ( dbFolder , 0755 ) ; err != nil {
return fmt . Errorf ( "failed to create database folder: %w" , err )
@@ -137,12 +142,10 @@ func (db *LineDb) Init(force bool, initOptions *LineDbInitOptions) error {
continue
}
if db . isCollectionPartitioned ( opt . CollectionName ) {
// Партиции: обходим все существующие партиции (могут быть созданы ранее )
// Основной файл + все base_* (пустой partition id → основной файл )
baseName := opt . CollectionName
for name , adapter := range db . adapters {
if ! strings . HasPrefix ( name , baseName + "_" ) {
continue
}
for _ , adapter := range db . partitionStorageAdaptersOrdered ( baseName ) {
name := adapter . GetCollectionName ( )
records , lineIdx , err := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { } )
if err != nil {
continue
@@ -230,7 +233,13 @@ func (db *LineDb) seedLastIDPartitioned(dbFolder, baseName string) error {
if existing , ok := db . adapters [ partName ] ; ok {
adapter = existing
} else {
adapter = NewJSONLFile ( path , "" , JSONLFileOptions { CollectionName : partName } )
// Партиции должны наследовать опции базовой коллекции (allocSize/encode/crypto/marshal/etc. )
opts := JSONLFileOptions { CollectionName : partName }
if baseOpts := db . getCollectionOptions ( baseName ) ; baseOpts != nil {
opts = * baseOpts
opts . CollectionName = partName
}
adapter = NewJSONLFile ( path , opts . EncryptKeyForLineDb , opts )
if err := adapter . Init ( false , LineDbAdapterOptions { } ) ; err != nil {
continue
}
@@ -249,7 +258,9 @@ func (db *LineDb) seedLastIDPartitioned(dbFolder, baseName string) error {
return nil
}
// Read читает все записи из коллекции
// Read читает все записи из коллекции.
// Для партиционированной логической коллекции (имя без суффикса _*) читает основной файл
// и все файлы партиций base_* подряд. Запрос по имени одной партиции (base_part) читает только её файл.
func ( db * LineDb ) Read ( collectionName string , options LineDbAdapterOptions ) ( [ ] any , error ) {
db . mutex . RLock ( )
defer db . mutex . RUnlock ( )
@@ -258,6 +269,23 @@ func (db *LineDb) Read(collectionName string, options LineDbAdapterOptions) ([]a
collectionName = db . getFirstCollection ( )
}
baseName := db . getBaseCollectionName ( collectionName )
if db . isCollectionPartitioned ( baseName ) && collectionName == baseName {
adapters := db . partitionStorageAdaptersOrdered ( baseName )
if len ( adapters ) == 0 {
return nil , fmt . Errorf ( "collection %s not found" , collectionName )
}
var all [ ] any
for _ , a := range adapters {
recs , err := a . Read ( options )
if err != nil {
return nil , err
}
all = append ( all , recs ... )
}
return all , nil
}
adapter , exists := db . adapters [ collectionName ]
if ! exists {
return nil , fmt . Errorf ( "collection %s not found" , collectionName )
@@ -332,6 +360,10 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO
itemMap [ "id" ] = newID
} else {
if n , ok := coerceInsertIDToStrictInt ( itemMap [ "id" ] ) ; ok {
itemMap [ "id" ] = n
db . lastIDManager . SetLastID ( collectionName , n )
}
// Проверяем существование записи если не пропускаем проверку
if ! options . SkipCheckExistingForWrite {
filter := map [ string ] any { "id" : itemMap [ "id" ] }
@@ -365,34 +397,16 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO
writeOptions := LineDbAdapterOptions { InTransaction : true , InternalCall : true }
if db . indexStore != nil {
opts := db . getCollectionOptions ( collectionName )
if opts != nil && len ( opts . IndexedFields ) > 0 && db . isCollectionPartitioned ( collectionName ) {
writeOptions . DoIndexing = true // индекс строится точечно при Write в партиции
if opts != nil && len ( opts . IndexedFields ) > 0 && ! opts . RebuildIndexesOnFileChange {
// Точечное IndexRecord в Write (LineCount → стартовая строка для новых записей)
writeOptions . DoIndexing = true
}
}
if err := db . Write ( resultDataArray , collectionName , writeOptions ) ; err != nil {
return fmt . Errorf ( "failed to write data: %w" , err )
}
// Обновляем кэш
if db . cacheExternal != nil {
for _ , item := range resultDataArray {
db . cacheExternal . UpdateCacheAfterInsert ( item , collectionName )
}
}
// Обновляем индекс (полная пересборка по коллекции) — только для непартиционированных
if db . indexStore != nil {
opts := db . getCollectionOptions ( collectionName )
if opts != nil && len ( opts . IndexedFields ) > 0 && ! db . isCollectionPartitioned ( collectionName ) {
adapter , exists := db . adapters [ collectionName ]
if exists {
allRecords , lineIdx , readErr := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { } )
if readErr == nil {
_ = db . indexStore . Rebuild ( collectionName , DefaultPartition , opts . IndexedFields , allRecords , lineIdx )
}
}
}
}
db . clearCacheForCollection ( collectionName )
return nil
}
@@ -409,10 +423,13 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
collectionName = db . getFirstCollection ( )
}
opts := db . getCollectionOptions ( collectionName )
suppressPartialIndex := opts != nil && opts . RebuildIndexesOnFileChange
doPartial := options . DoIndexing && ! suppressPartialIndex
// Проверяем партиционирование
if db . isCollectionPartitioned ( collectionName ) {
dataArray := db . normalizeDataArray ( data )
opts := db . getCollectionOptions ( collectionName )
for _ , item := range dataArray {
adapter , err := db . getPartitionAdapter ( item , collectionName )
if err != nil {
@@ -420,7 +437,7 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
}
partitionName := adapter . GetCollectionName ( )
var startLine int
if options . DoIndexing && db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if doPartial && db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if c , err := adapter . LineCount ( ) ; err == nil {
startLine = c
}
@@ -428,12 +445,18 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
if err := adapter . Write ( item , options ) ; err != nil {
return fmt . Errorf ( "failed to write to partition: %w" , err )
}
if options . DoIndexing && db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if doPartial && db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
if m , err := db . toMap ( item ) ; err == nil {
db . indexStore . IndexRecord ( collectionName , partitionName , opts . IndexedFields , m , startLine )
}
}
}
if suppressPartialIndex {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
db . rebuildAllIndexesForLogicalCollection ( collectionName )
}
db . clearCacheForCollection ( collectionName )
}
return nil
}
@@ -444,7 +467,7 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
}
var startLine int
if options . DoIndexing && db . indexStore != nil {
if doPartial && db . indexStore != nil {
if c , err := adapter . LineCount ( ) ; err == nil {
startLine = c
}
@@ -455,8 +478,7 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
}
// Точечное индексирование при DoIndexing
if options . DoIndexing && db . indexStore != nil {
opts := db . getCollectionOptions ( collectionName )
if doPartial && db . indexStore != nil {
if opts != nil && len ( opts . IndexedFields ) > 0 {
dataArray := db . normalizeDataArray ( data )
for i , record := range dataArray {
@@ -466,6 +488,12 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
}
}
}
if suppressPartialIndex {
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
db . rebuildAllIndexesForLogicalCollection ( collectionName )
}
db . clearCacheForCollection ( collectionName )
}
return nil
}
@@ -519,16 +547,13 @@ func (db *LineDb) Update(data any, collectionName string, filter any, options Li
// Пробуем точечный Update через индекс (без полного чтения файла)
opts := db . getCollectionOptions ( collectionName )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! db . isCollectionPartitioned ( collectionName ) {
result , used , err := db . tryIndexUpdate ( adapter , filter , dataMap , collectionName , opts . IndexedFields , options )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! db . isCollectionPartitioned ( collectionName ) && ! opts . RebuildIndexesOnFileChange {
result , used , err := db . tryIndexUpdate ( adapter , filter , dataMap , collectionName , DefaultPartition , opts . IndexedFields , options )
if err != nil {
return nil , err
}
if used {
if db . cacheExternal != nil {
ids := extractIDsFromRecords ( result )
db . cacheExternal . ClearEntriesContainingIDs ( ids )
}
db . clearCacheForCollection ( collectionName )
return result , nil
}
}
@@ -547,10 +572,7 @@ func (db *LineDb) Update(data any, collectionName string, filter any, options Li
}
}
}
if db . cacheExternal != nil {
ids := extractIDsFromRecords ( result )
db . cacheExternal . ClearEntriesContainingIDs ( ids )
}
db . clearCacheForCollection ( collectionName )
return result , nil
}
@@ -585,16 +607,13 @@ func (db *LineDb) Delete(data any, collectionName string, options LineDbAdapterO
}
opts := db . getCollectionOptions ( collectionName )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
result , used , err := db . tryIndexDelete ( adapter , data , collectionName , opts . IndexedFields , options )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! opts . RebuildIndexesOnFileChange {
result , used , err := db . tryIndexDelete ( adapter , data , collectionName , DefaultPartition , opts . IndexedFields , options )
if err != nil {
return nil , err
}
if used {
if db . cacheExternal != nil {
ids := extractIDsFromRecords ( result )
db . cacheExternal . ClearEntriesContainingIDs ( ids )
}
db . clearCacheForCollection ( collectionName )
return result , nil
}
}
@@ -613,10 +632,7 @@ func (db *LineDb) Delete(data any, collectionName string, options LineDbAdapterO
}
}
}
if db . cacheExternal != nil {
ids := extractIDsFromRecords ( result )
db . cacheExternal . ClearEntriesContainingIDs ( ids )
}
db . clearCacheForCollection ( collectionName )
return result , nil
}
@@ -667,7 +683,7 @@ func (db *LineDb) ReadByFilter(filter any, collectionName string, options LineDb
if err != nil || ( options . FailOnFailureIndexRead && ! hit ) {
return nil , fmt . Errorf ( "index read failed: %w" , err )
}
if hit && err == nil {
if hit {
if db . cacheExternal != nil && ! options . InTransaction {
db . cacheExternal . Set ( db . generateCacheKey ( filter , collectionName ) , result )
}
@@ -698,7 +714,7 @@ func (db *LineDb) ReadByFilter(filter any, collectionName string, options LineDb
if err != nil || ( options . FailOnFailureIndexRead && ! hit ) {
return nil , fmt . Errorf ( "index read failed: %w" , err )
}
if hit && err == nil {
if hit {
if db . cacheExternal != nil && ! options . InTransaction {
db . cacheExternal . Set ( db . generateCacheKey ( filter , collectionName ) , result )
}
@@ -753,15 +769,168 @@ func (db *LineDb) ClearCache(collectionName string, options LineDbAdapterOptions
if collectionName == "" {
db . cacheExternal . Clear ( )
} else {
// Очищаем только записи для конкретной коллекции
// Это упрощенная реализация
db . cacheExternal . Clear ( )
db . cacheExternal . ClearCollection ( collectionName )
}
}
return nil
}
// DBMutex возвращает RWMutex уровня LineDb: им сериализуются Insert/Update/Delete/Read и др. при обычных вызовах.
// Пары вызовов: Lock/Unlock (эксклюзивно) и RLock/RUnlock (чтение). Неверный порядок вместе с AdapterMutex может дать взаимную блокировку.
func ( db * LineDb ) DBMutex ( ) * sync . RWMutex {
return & db . mutex
}
// AdapterMutex возвращает RWMutex JSONL-адаптера для имени хранилища (логическая коллекция или имя партиции, например events_A).
// Пустое collectionName — первая коллекция из конфигурации. Поиск выполняется под RLock карты адаптеров.
func ( db * LineDb ) AdapterMutex ( collectionName string ) ( * sync . RWMutex , error ) {
db . mutex . RLock ( )
defer db . mutex . RUnlock ( )
if collectionName == "" {
collectionName = db . getFirstCollection ( )
}
if collectionName == "" {
return nil , fmt . Errorf ( "no collection configured" )
}
a , ok := db . adapters [ collectionName ]
if ! ok {
return nil , fmt . Errorf ( "collection %s not found" , collectionName )
}
return a . Mutex ( ) , nil
}
// AdapterMutexAssumeDBLocked возвращает RWMutex адаптера. Вызывать только если текущая горутина
// уже удерживает DBMutex().RLock() или Lock() — иначе доступ к карте адаптеров не синхронизирован.
func ( db * LineDb ) AdapterMutexAssumeDBLocked ( collectionName string ) ( * sync . RWMutex , error ) {
if collectionName == "" {
collectionName = db . getFirstCollection ( )
}
if collectionName == "" {
return nil , fmt . Errorf ( "no collection configured" )
}
a , ok := db . adapters [ collectionName ]
if ! ok {
return nil , fmt . Errorf ( "collection %s not found" , collectionName )
}
return a . Mutex ( ) , nil
}
// clearCacheForCollection сбрасывает кэш запросов по логической коллекции (включая ключи партиций base_*).
func ( db * LineDb ) clearCacheForCollection ( collectionName string ) {
if db . cacheExternal == nil || collectionName == "" {
return
}
db . cacheExternal . ClearCollection ( collectionName )
}
// rebuildAllIndexesForLogicalCollection полностью пересобирает индекс по всем файлам/партициям логической коллекции.
// Вызывать при удержании db.mutex (или внутри транзакции с внешней блокировкой).
func ( db * LineDb ) rebuildAllIndexesForLogicalCollection ( logicalName string ) {
if db . indexStore == nil {
return
}
opts := db . getCollectionOptions ( logicalName )
if opts == nil || len ( opts . IndexedFields ) == 0 {
return
}
if db . isCollectionPartitioned ( logicalName ) {
for _ , adapter := range db . partitionStorageAdaptersOrdered ( logicalName ) {
partName := adapter . GetCollectionName ( )
records , lineIdx , err := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { } )
if err != nil {
continue
}
_ = db . indexStore . Rebuild ( logicalName , partName , opts . IndexedFields , records , lineIdx )
}
return
}
adapter := db . adapters [ logicalName ]
if adapter == nil {
return
}
records , lineIdx , err := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { } )
if err != nil {
return
}
_ = db . indexStore . Rebuild ( logicalName , DefaultPartition , opts . IndexedFields , records , lineIdx )
}
func coerceInsertIDToStrictInt ( v any ) ( int , bool ) {
switch x := v . ( type ) {
case int :
return x , true
case int8 :
return int ( x ) , true
case int16 :
return int ( x ) , true
case int32 :
return int ( x ) , true
case int64 :
if x < int64 ( math . MinInt ) || x > int64 ( math . MaxInt ) {
return 0 , false
}
return int ( x ) , true
case uint :
if uint64 ( x ) > uint64 ( math . MaxInt ) {
return 0 , false
}
return int ( x ) , true
case uint8 :
return int ( x ) , true
case uint16 :
return int ( x ) , true
case uint32 :
return int ( x ) , true
case uint64 :
if x > uint64 ( math . MaxInt ) {
return 0 , false
}
return int ( x ) , true
case float32 :
return coerceFloatToStrictInt ( float64 ( x ) )
case float64 :
return coerceFloatToStrictInt ( x )
case string :
s := strings . TrimSpace ( x )
if s == "" {
return 0 , false
}
n , err := strconv . ParseInt ( s , 10 , 64 )
if err != nil {
return 0 , false
}
if n < int64 ( math . MinInt ) || n > int64 ( math . MaxInt ) {
return 0 , false
}
return int ( n ) , true
case json . Number :
n , err := x . Int64 ( )
if err != nil {
return 0 , false
}
if n < int64 ( math . MinInt ) || n > int64 ( math . MaxInt ) {
return 0 , false
}
return int ( n ) , true
default :
return 0 , false
}
}
func coerceFloatToStrictInt ( x float64 ) ( int , bool ) {
if x != x {
return 0 , false
}
if x < float64 ( math . MinInt ) || x > float64 ( math . MaxInt ) {
return 0 , false
}
if x != math . Trunc ( x ) {
return 0 , false
}
return int ( x ) , true
}
// Close закрывает базу данных
func ( db * LineDb ) Close ( ) {
db . mutex . Lock ( )
@@ -792,6 +961,8 @@ func (db *LineDb) Close() {
// indexRebuildTimerLoop выполняет полный ребилд всех индексов с заданным интервалом (IndexRebuildTimer).
// Работает в отдельной горутине; при каждом тике берёт db.mutex.Lock() на время ребилда.
// Перед ребилдом коллекции: если в файле есть пустые слоты (след точечного удаления), вызывается
// CompactFile — перезапись без «дыр», затем индексирование по сжатому файлу.
// Останавливается при закрытии db.indexRebuildDone.
func ( db * LineDb ) indexRebuildTimerLoop ( interval time . Duration ) {
ticker := time . NewTicker ( interval )
@@ -804,15 +975,20 @@ func (db *LineDb) indexRebuildTimerLoop(interval time.Duration) {
db . mutex . Lock ( )
if db . indexStore != nil && db . initOptions != nil {
for _ , opt := range db . initOptions . Collections {
if len ( opt . IndexedFields ) == 0 {
if len ( opt . IndexedFields ) == 0 || opt . RebuildIndexesOnFileChange {
continue
}
if db . isCollectionPartitioned ( opt . CollectionName ) {
baseName := opt . CollectionName
for name , adapter := range db . adapters {
if ! strings . HasPrefix ( name , baseName + "_" ) {
for _ , adapter := range db . partitionStorageAdaptersOrdered ( baseName ) {
name := adapter . GetCollectionName ( )
dirty , err := adapter . HasBlankSlots ( LineDbAdapterOptions { } )
if err != nil {
continue
}
if dirty {
_ = adapter . CompactFile ( LineDbAdapterOptions { } )
}
records , lineIdx , err := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { } )
if err != nil {
continue
@@ -822,6 +998,10 @@ func (db *LineDb) indexRebuildTimerLoop(interval time.Duration) {
} else {
adapter := db . adapters [ opt . CollectionName ]
if adapter != nil {
dirty , err := adapter . HasBlankSlots ( LineDbAdapterOptions { } )
if err == nil && dirty {
_ = adapter . CompactFile ( LineDbAdapterOptions { } )
}
records , lineIdx , err := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { } )
if err == nil {
_ = db . indexStore . Rebuild ( opt . CollectionName , DefaultPartition , opt . IndexedFields , records , lineIdx )
@@ -965,8 +1145,23 @@ func (db *LineDb) tryIndexLookupPartitioned(filter any, collectionName string, o
return true , filtered , nil
}
// indexedFieldKeysEqual возвращает true, если ключи индекса по всем indexedFields совпадают
// (та же семантика, что getFieldValue/valueToIndexKey в индексе).
func indexedFieldKeysEqual ( oldRec , newRec map [ string ] any , indexedFields [ ] string ) bool {
for _ , f := range indexedFields {
if getFieldValue ( oldRec , f ) != getFieldValue ( newRec , f ) {
return false
}
}
return true
}
// tryIndexUpdate выполняет точечное обновление через индекс. Возвращает (result, used, err).
func ( db * LineDb ) tryIndexUpdate ( adapter * JSONLFile , filter any , dataMap map [ string ] any , collectionName string , indexedFields [ ] string , options LineDbAdapterOptions ) ( [ ] any , bool , error ) {
// partition — имя партиции для IndexStore (для непартиционированных коллекций: DefaultPartition).
func ( db * LineDb ) tryIndexUpdate ( adapter * JSONLFile , filter any , dataMap map [ string ] any , collectionName string , partition string , indexedFields [ ] string , options LineDbAdapterOptions ) ( [ ] any , bool , error ) {
if partition == "" {
partition = DefaultPartition
}
filterMap , ok := filter . ( map [ string ] any )
if ! ok || len ( filterMap ) == 0 {
return nil , false , nil
@@ -990,7 +1185,7 @@ func (db *LineDb) tryIndexUpdate(adapter *JSONLFile, filter any, dataMap map[str
}
var indexes [ ] int
for _ , p := range posList {
if p . Partition == DefaultP artition {
if p . Partition == p artition {
indexes = append ( indexes , p . LineIndex )
}
}
@@ -1033,21 +1228,26 @@ func (db *LineDb) tryIndexUpdate(adapter *JSONLFile, filter any, dataMap map[str
if err := adapter . WriteAtLineIndexes ( updated , toUpdatePos , opt ) ; err != nil {
return nil , false , err
}
for i , rec := range toUpdate {
if m , ok := rec . ( map [ string ] any ) ; ok {
db . indexStore . UnindexRecord ( collectionName , DefaultPartition , indexedFields , m , toUpdatePos [ i ] )
for i := range toUpdate {
oldM , ok1 := toUpdate [ i ] . ( map [ string ] any )
newM , ok2 := updated [ i ] . ( map [ string ] any )
if ! ok1 || ! ok2 {
continue
}
}
for i , rec := range updated {
if m , ok := rec . ( map [ string ] any ) ; ok {
db . indexStore . IndexRecord ( collectionName , DefaultPartition , indexedFields , m , toUpdatePos [ i ] )
if indexedFieldKeysEqual ( oldM , newM , indexedFields ) {
continue
}
db . indexStore . UnindexRecord ( collectionName , partition , indexedFields , oldM , toUpdatePos [ i ] )
db . indexStore . IndexRecord ( collectionName , partition , indexedFields , newM , toUpdatePos [ i ] )
}
return updated , true , nil
}
// tryIndexDelete выполняет точечное удаление через индекс. Возвращает (deletedRecords, used, err).
func ( db * LineDb ) tryIndexDelete ( adapter * JSONLFile , filter any , collectionName string , indexedFields [ ] string , options LineDbAdapterOptions ) ( [ ] any , bool , error ) {
func ( db * LineDb ) tryIndexDelete ( adapter * JSONLFile , filter any , collectionName string , partition string , indexedFields [ ] string , options LineDbAdapterOptions ) ( [ ] any , bool , error ) {
if partition == "" {
partition = DefaultPartition
}
filterMap , ok := filter . ( map [ string ] any )
if ! ok || len ( filterMap ) == 0 {
return nil , false , nil
@@ -1071,7 +1271,7 @@ func (db *LineDb) tryIndexDelete(adapter *JSONLFile, filter any, collectionName
}
var indexes [ ] int
for _ , p := range posList {
if p . Partition == DefaultP artition {
if p . Partition == p artition {
indexes = append ( indexes , p . LineIndex )
}
}
@@ -1101,7 +1301,7 @@ func (db *LineDb) tryIndexDelete(adapter *JSONLFile, filter any, collectionName
}
for i , rec := range toDel {
if m , ok := rec . ( map [ string ] any ) ; ok {
db . indexStore . UnindexRecord ( collectionName , DefaultP artition, indexedFields , m , toDelPos [ i ] )
db . indexStore . UnindexRecord ( collectionName , p artition, indexedFields , m , toDelPos [ i ] )
}
}
return toDel , true , nil
@@ -1123,7 +1323,7 @@ func (db *LineDb) getCollectionOptions(collectionName string) *JSONLFileOptions
}
// isValueEmpty проверяет, считается ли значение "пустым" (nil или "" для string) — для обратной совместимости
func ( db * LineDb ) i sValueEmpty( v any ) bool {
func ( db * LineDb ) I sValueEmpty( v any ) bool {
if v == nil {
return true
}
@@ -1316,17 +1516,28 @@ func (db *LineDb) isCollectionPartitioned(collectionName string) bool {
return exists
}
func ( db * LineDb ) getPartitionFiles ( collectionName string ) ( [ ] string , error ) {
baseName := db . getBaseCollectionName ( collectionName )
var files [ ] string
fo r name , filename := range db . collections {
// partitionStorageAdaptersOrdered — для логического имени партиционированной коллекции baseName:
// сначала адаптер основного файла (baseName.jsonl), затем все baseName_* в лексикографическом порядке.
// Пустой ключ партиции пишет в основной файл; эти записи учитываются здесь первым адаптером.
func ( db * LineDb ) partitionStorageAdaptersOrdered ( baseName string ) [ ] * JSONLFile {
va r out [ ] * JSONLFile
if a := db . adapters [ baseName ] ; a != nil {
out = append ( out , a )
}
var partNames [ ] string
for name := range db . adapters {
if name == baseName {
continue
}
if strings . HasPrefix ( name , baseName + "_" ) {
fil es = append ( fil es, file name)
partNam es = append ( partNam es, name )
}
}
return files , nil
sort . Strings ( partNames )
for _ , name := range partNames {
out = append ( out , db . adapters [ name ] )
}
return out
}
func ( db * LineDb ) getPartitionAdapter ( data any , collectionName string ) ( * JSONLFile , error ) {
@@ -1336,13 +1547,27 @@ func (db *LineDb) getPartitionAdapter(data any, collectionName string) (*JSONLFi
}
partitionID := partitionFn ( data )
if strings . TrimSpace ( partitionID ) == "" {
adapter , ok := db . adapters [ collectionName ]
if ! ok {
return nil , fmt . Errorf ( "collection %s not found" , collectionName )
}
return adapter , nil
}
partitionName := fmt . Sprintf ( "%s_%s" , collectionName , partitionID )
adapter , exists := db . adapters [ partitionName ]
if ! exists {
// Создаем новый адаптер для партиции
filename := filepath . Join ( db . initOptions . DBFolder , partitionName + ".jsonl" )
adapter = NewJSONLFile ( filename , "" , JSONLFileOptions { CollectionName : partitionName } )
// Партиции должны наследовать опции базовой коллекции (allocSize/encode/crypto/marshal/etc. )
opts := JSONLFileOptions { CollectionName : partitionName }
if baseOpts := db . getCollectionOptions ( collectionName ) ; baseOpts != nil {
opts = * baseOpts
opts . CollectionName = partitionName
}
adapter = NewJSONLFile ( filename , opts . EncryptKeyForLineDb , opts )
if err := adapter . Init ( false , LineDbAdapterOptions { } ) ; err != nil {
return nil , fmt . Errorf ( "failed to init partition adapter: %w" , err )
@@ -1416,7 +1641,7 @@ func (db *LineDb) valuesMatch(a, b any, strictCompare bool) bool {
// Сравнение строк
if aStr , ok := a . ( string ) ; ok {
if bStr , ok := b . ( string ) ; ok {
return strings . EqualFold ( aStr , bStr )
return matchStringByPattern ( aStr , bStr , strictCompare )
}
}
@@ -1506,24 +1731,6 @@ func (db *LineDb) compareIDs(a, b any) bool {
return a == b
}
// extractIDsFromRecords извлекает id из списка записей (map[string]any).
func extractIDsFromRecords ( records [ ] any ) [ ] any {
if len ( records ) == 0 {
return nil
}
ids := make ( [ ] any , 0 , len ( records ) )
for _ , rec := range records {
m , ok := rec . ( map [ string ] any )
if ! ok {
continue
}
if id , exists := m [ "id" ] ; exists && id != nil {
ids = append ( ids , id )
}
}
return ids
}
func ( db * LineDb ) generateCacheKey ( filter any , collectionName string ) string {
// Упрощенная реализация генерации ключа кэша
return fmt . Sprintf ( "%s:%v" , collectionName , filter )
@@ -1589,106 +1796,95 @@ func (db *LineDb) normalizeFilter(filter any) (any, error) {
}
func ( db * LineDb ) updatePartitioned ( data any , collectionName string , filter any , options LineDbAdapterOptions ) ( [ ] any , error ) {
partitionFiles , err := db . getPartitionFiles ( collectionName )
dataMap , err := db . toMap ( data )
if err != nil {
return nil , err
return nil , fmt . Errorf ( "invalid update data format: %w" , err )
}
opts := db . getCollectionOptions ( collectionName )
baseName := db . getBaseCollectionName ( collectionName )
var allResults [ ] any
for _ , filename := range partitionFiles {
var adapter * JSONLFile
var partitionName string
for name , adapterFile := range db . collections {
if adapterFile = = filename {
adapter = db . adapters [ name ]
partitionName = name
break
for _ , adapter := range db . partitionStorageAdaptersOrdered ( baseName ) {
partitionName := adapter . GetCollectionName ( )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! opts . RebuildIndexesOnFileChange {
upd , used , terr := db . tryIndexUpdate ( adapter , filter , dataMap , collectionName , partitionName , opts . IndexedFields , options )
if terr ! = nil {
return nil , terr
}
if used {
allResults = append ( allResults , upd ... )
continue
}
}
if adapt er != nil {
results , err := adapter . Update ( data , filter , options )
if err != nil {
return nil , err
}
allResults = append ( allResults , results ... )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
records , lineIdx , readErr := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOption s{ InTransaction : true } )
if readErr == nil {
_ = db . indexStore . Rebuild ( collectionName , partitionName , opts . IndexedFields , records , lineIdx )
}
results , err := adapter . Update ( data , filter , options )
if er r != nil {
return nil , err
}
allResults = append ( allResults , results ... )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! opts . RebuildIndexesOnFileChange {
records , lineIdx , readErr := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { InTransaction : true } )
if readErr == nil {
_ = db . indexStore . Rebuild ( collectionName , partitionName , opt s. IndexedFields , records , lineIdx )
}
}
}
if opts != nil && opts . RebuildIndexesOnFileChange && db . indexStore != nil && len ( opts . IndexedFields ) > 0 {
db . rebuildAllIndexesForLogicalCollection ( collectionName )
}
db . clearCacheForCollection ( collectionName )
return allResults , nil
}
func ( db * LineDb ) deletePartitioned ( data any , collectionName string , options LineDbAdapterOptions ) ( [ ] any , error ) {
partitionFiles , err := db . getPartitionFiles ( collectionName )
if err != nil {
return nil , err
}
opts := db . getCollectionOptions ( collectionName )
baseName := db . getBaseCollectionName ( collectionName )
var allResults [ ] any
for _ , filename := range partitionFiles {
var adapter * JSONLFile
var partitionName string
for name , adapterFile := range db . collections {
if adapterFile = = filename {
adapter = db . adapters [ name ]
partitionName = name
break
for _ , adapter := range db . partitionStorageAdaptersOrdered ( baseName ) {
partitionName := adapter . GetCollectionName ( )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! opts . RebuildIndexesOnFileChange {
del , used , derr := db . tryIndexDelete ( adapter , data , collectionName , partitionName , opts . IndexedFields , options )
if derr ! = nil {
return nil , derr
}
if used {
allResults = append ( allResults , del ... )
continue
}
}
if adapt er != nil {
results , err := adapter . Delete ( data , options )
if err != nil {
return nil , err
}
allResults = append ( allResults , results ... )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
records , lineIdx , readErr := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOption s{ InTransaction : true } )
if readErr == nil {
_ = db . indexStore . Rebuild ( collectionName , partitionName , opts . IndexedFields , records , lineIdx )
}
results , err := adapter . Delete ( data , options )
if er r != nil {
return nil , err
}
allResults = append ( allResults , results ... )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! opts . RebuildIndexesOnFileChange {
records , lineIdx , readErr := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { InTransaction : true } )
if readErr == nil {
_ = db . indexStore . Rebuild ( collectionName , partitionName , opt s. IndexedFields , records , lineIdx )
}
}
}
if opts != nil && opts . RebuildIndexesOnFileChange && db . indexStore != nil && len ( opts . IndexedFields ) > 0 {
db . rebuildAllIndexesForLogicalCollection ( collectionName )
}
db . clearCacheForCollection ( collectionName )
return allResults , nil
}
func ( db * LineDb ) readByFilterPartitioned ( filter any , collectionName string , options LineDbAdapterOptions ) ( [ ] any , error ) {
// Получаем все партиции
partitionFiles , err := db . getPartitionFiles ( collectionName )
if err != nil {
return nil , err
}
baseName := db . getBaseCollectionName ( collectionName )
var allResults [ ] any
for _ , filename := range partitionFiles {
// Находим адаптер по имени файла
var adapt er * JSONLFile
for name , adapterFile := range db . collections {
if adapterFile == filename {
adapter = db . adapters [ name ]
break
}
}
if adapter != nil {
results , err := adapter . ReadByFilter ( filter , options )
if err != nil {
return nil , err
}
allResults = append ( allResults , results ... )
for _ , adapter := range db . partitionStorageAdaptersOrdered ( baseName ) {
results , err := adapter . ReadByFilter ( filter , options )
if err != nil {
return nil , err
}
allResults = append ( allResults , results ... )
}
return allResults , nil
}