@@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
@@ -137,12 +138,10 @@ func (db *LineDb) Init(force bool, initOptions *LineDbInitOptions) error {
continue
}
if db . isCollectionPartitioned ( opt . CollectionName ) {
// Партиции: обходим все существующие партиции (могут быть созданы ранее )
// Основной файл + все base_* (пустой partition id → основной файл )
baseName := opt . CollectionName
for name , adapter := range db . adapters {
if ! strings . HasPrefix ( name , baseName + "_" ) {
continue
}
for _ , adapter := range db . partitionStorageAdaptersOrdered ( baseName ) {
name := adapter . GetCollectionName ( )
records , lineIdx , err := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { } )
if err != nil {
continue
@@ -249,7 +248,9 @@ func (db *LineDb) seedLastIDPartitioned(dbFolder, baseName string) error {
return nil
}
// Read читает все записи из коллекции
// Read читает все записи из коллекции.
// Для партиционированной логической коллекции (имя без суффикса _*) читает основной файл
// и все файлы партиций base_* подряд. Запрос по имени одной партиции (base_part) читает только её файл.
func ( db * LineDb ) Read ( collectionName string , options LineDbAdapterOptions ) ( [ ] any , error ) {
db . mutex . RLock ( )
defer db . mutex . RUnlock ( )
@@ -258,6 +259,23 @@ func (db *LineDb) Read(collectionName string, options LineDbAdapterOptions) ([]a
collectionName = db . getFirstCollection ( )
}
baseName := db . getBaseCollectionName ( collectionName )
if db . isCollectionPartitioned ( baseName ) && collectionName == baseName {
adapters := db . partitionStorageAdaptersOrdered ( baseName )
if len ( adapters ) == 0 {
return nil , fmt . Errorf ( "collection %s not found" , collectionName )
}
var all [ ] any
for _ , a := range adapters {
recs , err := a . Read ( options )
if err != nil {
return nil , err
}
all = append ( all , recs ... )
}
return all , nil
}
adapter , exists := db . adapters [ collectionName ]
if ! exists {
return nil , fmt . Errorf ( "collection %s not found" , collectionName )
@@ -365,34 +383,16 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO
writeOptions := LineDbAdapterOptions { InTransaction : true , InternalCall : true }
if db . indexStore != nil {
opts := db . getCollectionOptions ( collectionName )
if opts != nil && len ( opts . IndexedFields ) > 0 && db . isCollectionPartitioned ( collectionName ) {
writeOptions . DoIndexing = true // индекс строится точечно при Write в партиции
if opts != nil && len ( opts . IndexedFields ) > 0 {
// Точечное IndexRecord в Write (LineCount → стартовая строка для новых записей)
writeOptions . DoIndexing = true
}
}
if err := db . Write ( resultDataArray , collectionName , writeOptions ) ; err != nil {
return fmt . Errorf ( "failed to write data: %w" , err )
}
// Обновляем кэш
if db . cacheExternal != nil {
for _ , item := range resultDataArray {
db . cacheExternal . UpdateCacheAfterInsert ( item , collectionName )
}
}
// Обновляем индекс (полная пересборка по коллекции) — только для непартиционированных
if db . indexStore != nil {
opts := db . getCollectionOptions ( collectionName )
if opts != nil && len ( opts . IndexedFields ) > 0 && ! db . isCollectionPartitioned ( collectionName ) {
adapter , exists := db . adapters [ collectionName ]
if exists {
allRecords , lineIdx , readErr := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { } )
if readErr == nil {
_ = db . indexStore . Rebuild ( collectionName , DefaultPartition , opts . IndexedFields , allRecords , lineIdx )
}
}
}
}
db . clearCacheForCollection ( collectionName )
return nil
}
@@ -520,15 +520,12 @@ func (db *LineDb) Update(data any, collectionName string, filter any, options Li
// Пробуем точечный Update через индекс (без полного чтения файла)
opts := db . getCollectionOptions ( collectionName )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 && ! db . isCollectionPartitioned ( collectionName ) {
result , used , err := db . tryIndexUpdate ( adapter , filter , dataMap , collectionName , opts . IndexedFields , options )
result , used , err := db . tryIndexUpdate ( adapter , filter , dataMap , collectionName , DefaultPartition , opts . IndexedFields , options )
if err != nil {
return nil , err
}
if used {
if db . cacheExternal != nil {
ids := extractIDsFromRecords ( result )
db . cacheExternal . ClearEntriesContainingIDs ( ids )
}
db . clearCacheForCollection ( collectionName )
return result , nil
}
}
@@ -547,10 +544,7 @@ func (db *LineDb) Update(data any, collectionName string, filter any, options Li
}
}
}
if db . cacheExternal != nil {
ids := extractIDsFromRecords ( result )
db . cacheExternal . ClearEntriesContainingIDs ( ids )
}
db . clearCacheForCollection ( collectionName )
return result , nil
}
@@ -586,15 +580,12 @@ func (db *LineDb) Delete(data any, collectionName string, options LineDbAdapterO
opts := db . getCollectionOptions ( collectionName )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
result , used , err := db . tryIndexDelete ( adapter , data , collectionName , opts . IndexedFields , options )
result , used , err := db . tryIndexDelete ( adapter , data , collectionName , DefaultPartition , opts . IndexedFields , options )
if err != nil {
return nil , err
}
if used {
if db . cacheExternal != nil {
ids := extractIDsFromRecords ( result )
db . cacheExternal . ClearEntriesContainingIDs ( ids )
}
db . clearCacheForCollection ( collectionName )
return result , nil
}
}
@@ -613,10 +604,7 @@ func (db *LineDb) Delete(data any, collectionName string, options LineDbAdapterO
}
}
}
if db . cacheExternal != nil {
ids := extractIDsFromRecords ( result )
db . cacheExternal . ClearEntriesContainingIDs ( ids )
}
db . clearCacheForCollection ( collectionName )
return result , nil
}
@@ -753,15 +741,21 @@ func (db *LineDb) ClearCache(collectionName string, options LineDbAdapterOptions
if collectionName == "" {
db . cacheExternal . Clear ( )
} else {
// Очищаем только записи для конкретной коллекции
// Это упрощенная реализация
db . cacheExternal . Clear ( )
db . cacheExternal . ClearCollection ( collectionName )
}
}
return nil
}
// clearCacheForCollection сбрасывает кэш запросов по логической коллекции (включая ключи партиций base_*).
func ( db * LineDb ) clearCacheForCollection ( collectionName string ) {
if db . cacheExternal == nil || collectionName == "" {
return
}
db . cacheExternal . ClearCollection ( collectionName )
}
// Close закрывает базу данных
func ( db * LineDb ) Close ( ) {
db . mutex . Lock ( )
@@ -792,6 +786,8 @@ func (db *LineDb) Close() {
// indexRebuildTimerLoop выполняет полный ребилд всех индексов с заданным интервалом (IndexRebuildTimer).
// Работает в отдельной горутине; при каждом тике берёт db.mutex.Lock() на время ребилда.
// Перед ребилдом коллекции: если в файле есть пустые слоты (след точечного удаления), вызывается
// CompactFile — перезапись без «дыр», затем индексирование по сжатому файлу.
// Останавливается при закрытии db.indexRebuildDone.
func ( db * LineDb ) indexRebuildTimerLoop ( interval time . Duration ) {
ticker := time . NewTicker ( interval )
@@ -809,10 +805,15 @@ func (db *LineDb) indexRebuildTimerLoop(interval time.Duration) {
}
if db . isCollectionPartitioned ( opt . CollectionName ) {
baseName := opt . CollectionName
for name , adapter := range db . adapters {
if ! strings . HasPrefix ( name , baseName + "_" ) {
for _ , adapter := range db . partitionStorageAdaptersOrdered ( baseName ) {
name := adapter . GetCollectionName ( )
dirty , err := adapter . HasBlankSlots ( LineDbAdapterOptions { } )
if err != nil {
continue
}
if dirty {
_ = adapter . CompactFile ( LineDbAdapterOptions { } )
}
records , lineIdx , err := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { } )
if err != nil {
continue
@@ -822,6 +823,10 @@ func (db *LineDb) indexRebuildTimerLoop(interval time.Duration) {
} else {
adapter := db . adapters [ opt . CollectionName ]
if adapter != nil {
dirty , err := adapter . HasBlankSlots ( LineDbAdapterOptions { } )
if err == nil && dirty {
_ = adapter . CompactFile ( LineDbAdapterOptions { } )
}
records , lineIdx , err := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { } )
if err == nil {
_ = db . indexStore . Rebuild ( opt . CollectionName , DefaultPartition , opt . IndexedFields , records , lineIdx )
@@ -965,8 +970,23 @@ func (db *LineDb) tryIndexLookupPartitioned(filter any, collectionName string, o
return true , filtered , nil
}
// indexedFieldKeysEqual возвращает true, если ключи индекса по всем indexedFields совпадают
// (та же семантика, что getFieldValue/valueToIndexKey в индексе).
func indexedFieldKeysEqual ( oldRec , newRec map [ string ] any , indexedFields [ ] string ) bool {
for _ , f := range indexedFields {
if getFieldValue ( oldRec , f ) != getFieldValue ( newRec , f ) {
return false
}
}
return true
}
// tryIndexUpdate выполняет точечное обновление через индекс. Возвращает (result, used, err).
func ( db * LineDb ) tryIndexUpdate ( adapter * JSONLFile , filter any , dataMap map [ string ] any , collectionName string , indexedFields [ ] string , options LineDbAdapterOptions ) ( [ ] any , bool , error ) {
// partition — имя партиции для IndexStore (для непартиционированных коллекций: DefaultPartition).
func ( db * LineDb ) tryIndexUpdate ( adapter * JSONLFile , filter any , dataMap map [ string ] any , collectionName string , partition string , indexedFields [ ] string , options LineDbAdapterOptions ) ( [ ] any , bool , error ) {
if partition == "" {
partition = DefaultPartition
}
filterMap , ok := filter . ( map [ string ] any )
if ! ok || len ( filterMap ) == 0 {
return nil , false , nil
@@ -990,7 +1010,7 @@ func (db *LineDb) tryIndexUpdate(adapter *JSONLFile, filter any, dataMap map[str
}
var indexes [ ] int
for _ , p := range posList {
if p . Partition == DefaultP artition {
if p . Partition == p artition {
indexes = append ( indexes , p . LineIndex )
}
}
@@ -1033,21 +1053,26 @@ func (db *LineDb) tryIndexUpdate(adapter *JSONLFile, filter any, dataMap map[str
if err := adapter . WriteAtLineIndexes ( updated , toUpdatePos , opt ) ; err != nil {
return nil , false , err
}
for i , rec := range toUpdate {
if m , ok := rec . ( map [ string ] any ) ; ok {
db . indexStore . UnindexRecord ( collectionName , DefaultPartition , indexedFields , m , toUpdatePos [ i ] )
for i := range toUpdate {
oldM , ok1 := toUpdate [ i ] . ( map [ string ] any )
newM , ok2 := updated [ i ] . ( map [ string ] any )
if ! ok1 || ! ok2 {
continue
}
}
for i , rec := range updated {
if m , ok := rec . ( map [ string ] any ) ; ok {
db . indexStore . IndexRecord ( collectionName , DefaultPartition , indexedFields , m , toUpdatePos [ i ] )
if indexedFieldKeysEqual ( oldM , newM , indexedFields ) {
continue
}
db . indexStore . UnindexRecord ( collectionName , partition , indexedFields , oldM , toUpdatePos [ i ] )
db . indexStore . IndexRecord ( collectionName , partition , indexedFields , newM , toUpdatePos [ i ] )
}
return updated , true , nil
}
// tryIndexDelete выполняет точечное удаление через индекс. Возвращает (deletedRecords, used, err).
func ( db * LineDb ) tryIndexDelete ( adapter * JSONLFile , filter any , collectionName string , indexedFields [ ] string , options LineDbAdapterOptions ) ( [ ] any , bool , error ) {
func ( db * LineDb ) tryIndexDelete ( adapter * JSONLFile , filter any , collectionName string , partition string , indexedFields [ ] string , options LineDbAdapterOptions ) ( [ ] any , bool , error ) {
if partition == "" {
partition = DefaultPartition
}
filterMap , ok := filter . ( map [ string ] any )
if ! ok || len ( filterMap ) == 0 {
return nil , false , nil
@@ -1071,7 +1096,7 @@ func (db *LineDb) tryIndexDelete(adapter *JSONLFile, filter any, collectionName
}
var indexes [ ] int
for _ , p := range posList {
if p . Partition == DefaultP artition {
if p . Partition == p artition {
indexes = append ( indexes , p . LineIndex )
}
}
@@ -1101,7 +1126,7 @@ func (db *LineDb) tryIndexDelete(adapter *JSONLFile, filter any, collectionName
}
for i , rec := range toDel {
if m , ok := rec . ( map [ string ] any ) ; ok {
db . indexStore . UnindexRecord ( collectionName , DefaultP artition, indexedFields , m , toDelPos [ i ] )
db . indexStore . UnindexRecord ( collectionName , p artition, indexedFields , m , toDelPos [ i ] )
}
}
return toDel , true , nil
@@ -1316,17 +1341,28 @@ func (db *LineDb) isCollectionPartitioned(collectionName string) bool {
return exists
}
func ( db * LineDb ) getPartitionFiles ( collectionName string ) ( [ ] string , error ) {
baseName := db . getBaseCollectionName ( collectionName )
var files [ ] string
fo r name , filename := range db . collections {
// partitionStorageAdaptersOrdered — для логического имени партиционированной коллекции baseName:
// сначала адаптер основного файла (baseName.jsonl), затем все baseName_* в лексикографическом порядке.
// Пустой ключ партиции пишет в основной файл; эти записи учитываются здесь первым адаптером.
func ( db * LineDb ) partitionStorageAdaptersOrdered ( baseName string ) [ ] * JSONLFile {
va r out [ ] * JSONLFile
if a := db . adapters [ baseName ] ; a != nil {
out = append ( out , a )
}
var partNames [ ] string
for name := range db . adapters {
if name == baseName {
continue
}
if strings . HasPrefix ( name , baseName + "_" ) {
fil es = append ( fil es, file name)
partNam es = append ( partNam es, name )
}
}
return files , nil
sort . Strings ( partNames )
for _ , name := range partNames {
out = append ( out , db . adapters [ name ] )
}
return out
}
func ( db * LineDb ) getPartitionAdapter ( data any , collectionName string ) ( * JSONLFile , error ) {
@@ -1336,6 +1372,14 @@ func (db *LineDb) getPartitionAdapter(data any, collectionName string) (*JSONLFi
}
partitionID := partitionFn ( data )
if strings . TrimSpace ( partitionID ) == "" {
adapter , ok := db . adapters [ collectionName ]
if ! ok {
return nil , fmt . Errorf ( "collection %s not found" , collectionName )
}
return adapter , nil
}
partitionName := fmt . Sprintf ( "%s_%s" , collectionName , partitionID )
adapter , exists := db . adapters [ partitionName ]
@@ -1506,24 +1550,6 @@ func (db *LineDb) compareIDs(a, b any) bool {
return a == b
}
// extractIDsFromRecords извлекает id из списка записей (map[string]any).
func extractIDsFromRecords ( records [ ] any ) [ ] any {
if len ( records ) == 0 {
return nil
}
ids := make ( [ ] any , 0 , len ( records ) )
for _ , rec := range records {
m , ok := rec . ( map [ string ] any )
if ! ok {
continue
}
if id , exists := m [ "id" ] ; exists && id != nil {
ids = append ( ids , id )
}
}
return ids
}
func ( db * LineDb ) generateCacheKey ( filter any , collectionName string ) string {
// Упрощенная реализация генерации ключа кэша
return fmt . Sprintf ( "%s:%v" , collectionName , filter )
@@ -1589,106 +1615,87 @@ func (db *LineDb) normalizeFilter(filter any) (any, error) {
}
func ( db * LineDb ) updatePartitioned ( data any , collectionName string , filter any , options LineDbAdapterOptions ) ( [ ] any , error ) {
partitionFiles , err := db . getPartitionFiles ( collectionName )
dataMap , err := db . toMap ( data )
if err != nil {
return nil , err
return nil , fmt . Errorf ( "invalid update data format: %w" , err )
}
opts := db . getCollectionOptions ( collectionName )
baseName := db . getBaseCollectionName ( collectionName )
var allResults [ ] any
for _ , filename := range partitionFiles {
var adapter * JSONLFile
var partitionName string
for name , adapterFile := range db . collections {
if adapterFile = = filename {
adapter = db . adapters [ name ]
partitionName = name
break
for _ , adapter := range db . partitionStorageAdaptersOrdered ( baseName ) {
partitionName := adapter . GetCollectionName ( )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
upd , used , terr := db . tryIndexUpdate ( adapter , filter , dataMap , collectionName , partitionName , opts . IndexedFields , options )
if terr ! = nil {
return nil , terr
}
if used {
allResults = append ( allResults , upd ... )
continue
}
}
if adapt er != nil {
results , err := adapter . Update ( data , filter , options )
if err != nil {
return nil , err
}
allResults = append ( allResults , results ... )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
records , lineIdx , readErr := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOption s{ InTransaction : true } )
if readErr == nil {
_ = db . indexStore . Rebuild ( collectionName , partitionName , opts . IndexedFields , records , lineIdx )
}
results , err := adapter . Update ( data , filter , options )
if er r != nil {
return nil , err
}
allResults = append ( allResults , results ... )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
records , lineIdx , readErr := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { InTransaction : true } )
if readErr == nil {
_ = db . indexStore . Rebuild ( collectionName , partitionName , opt s. IndexedFields , records , lineIdx )
}
}
}
db . clearCacheForCollection ( collectionName )
return allResults , nil
}
func ( db * LineDb ) deletePartitioned ( data any , collectionName string , options LineDbAdapterOptions ) ( [ ] any , error ) {
partitionFiles , err := db . getPartitionFiles ( collectionName )
if err != nil {
return nil , err
}
opts := db . getCollectionOptions ( collectionName )
baseName := db . getBaseCollectionName ( collectionName )
var allResults [ ] any
for _ , filename := range partitionFiles {
var adapter * JSONLFile
var partitionName string
for name , adapterFile := range db . collections {
if adapterFile = = filename {
adapter = db . adapters [ name ]
partitionName = name
break
for _ , adapter := range db . partitionStorageAdaptersOrdered ( baseName ) {
partitionName := adapter . GetCollectionName ( )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
del , used , derr := db . tryIndexDelete ( adapter , data , collectionName , partitionName , opts . IndexedFields , options )
if derr ! = nil {
return nil , derr
}
if used {
allResults = append ( allResults , del ... )
continue
}
}
if adapt er != nil {
results , err := adapter . Delete ( data , options )
if err != nil {
return nil , err
}
allResults = append ( allResults , results ... )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
records , lineIdx , readErr := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOption s{ InTransaction : true } )
if readErr == nil {
_ = db . indexStore . Rebuild ( collectionName , partitionName , opts . IndexedFields , records , lineIdx )
}
results , err := adapter . Delete ( data , options )
if er r != nil {
return nil , err
}
allResults = append ( allResults , results ... )
if db . indexStore != nil && opts != nil && len ( opts . IndexedFields ) > 0 {
records , lineIdx , readErr := adapter . ReadWithPhysicalLineIndexes ( LineDbAdapterOptions { InTransaction : true } )
if readErr == nil {
_ = db . indexStore . Rebuild ( collectionName , partitionName , opt s. IndexedFields , records , lineIdx )
}
}
}
db . clearCacheForCollection ( collectionName )
return allResults , nil
}
func ( db * LineDb ) readByFilterPartitioned ( filter any , collectionName string , options LineDbAdapterOptions ) ( [ ] any , error ) {
// Получаем все партиции
partitionFiles , err := db . getPartitionFiles ( collectionName )
if err != nil {
return nil , err
}
baseName := db . getBaseCollectionName ( collectionName )
var allResults [ ] any
for _ , filename := range partitionFiles {
// Находим адаптер по имени файла
var adapt er * JSONLFile
for name , adapterFile := range db . collections {
if adapterFile == filename {
adapter = db . adapters [ name ]
break
}
}
if adapter != nil {
results , err := adapter . ReadByFilter ( filter , options )
if err != nil {
return nil , err
}
allResults = append ( allResults , results ... )
for _ , adapter := range db . partitionStorageAdaptersOrdered ( baseName ) {
results , err := adapter . ReadByFilter ( filter , options )
if err != nil {
return nil , err
}
allResults = append ( allResults , results ... )
}
return allResults , nil
}