diff --git a/pkg/linedb/line_db.go b/pkg/linedb/line_db.go index b4b8a30..bc52d28 100644 --- a/pkg/linedb/line_db.go +++ b/pkg/linedb/line_db.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "sort" "strconv" "strings" "sync" @@ -137,12 +138,10 @@ func (db *LineDb) Init(force bool, initOptions *LineDbInitOptions) error { continue } if db.isCollectionPartitioned(opt.CollectionName) { - // Партиции: обходим все существующие партиции (могут быть созданы ранее) + // Основной файл + все base_* (пустой partition id → основной файл) baseName := opt.CollectionName - for name, adapter := range db.adapters { - if !strings.HasPrefix(name, baseName+"_") { - continue - } + for _, adapter := range db.partitionStorageAdaptersOrdered(baseName) { + name := adapter.GetCollectionName() records, lineIdx, err := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{}) if err != nil { continue @@ -249,7 +248,9 @@ func (db *LineDb) seedLastIDPartitioned(dbFolder, baseName string) error { return nil } -// Read читает все записи из коллекции +// Read читает все записи из коллекции. +// Для партиционированной логической коллекции (имя без суффикса _*) читает основной файл +// и все файлы партиций base_* подряд. Запрос по имени одной партиции (base_part) читает только её файл. func (db *LineDb) Read(collectionName string, options LineDbAdapterOptions) ([]any, error) { db.mutex.RLock() defer db.mutex.RUnlock() @@ -258,6 +259,23 @@ func (db *LineDb) Read(collectionName string, options LineDbAdapterOptions) ([]a collectionName = db.getFirstCollection() } + baseName := db.getBaseCollectionName(collectionName) + if db.isCollectionPartitioned(baseName) && collectionName == baseName { + adapters := db.partitionStorageAdaptersOrdered(baseName) + if len(adapters) == 0 { + return nil, fmt.Errorf("collection %s not found", collectionName) + } + var all []any + for _, a := range adapters { + recs, err := a.Read(options) + if err != nil { + return nil, err + } + all = append(all, recs...) + } + return all, nil + } + adapter, exists := db.adapters[collectionName] if !exists { return nil, fmt.Errorf("collection %s not found", collectionName) @@ -798,10 +816,8 @@ func (db *LineDb) indexRebuildTimerLoop(interval time.Duration) { } if db.isCollectionPartitioned(opt.CollectionName) { baseName := opt.CollectionName - for name, adapter := range db.adapters { - if !strings.HasPrefix(name, baseName+"_") { - continue - } + for _, adapter := range db.partitionStorageAdaptersOrdered(baseName) { + name := adapter.GetCollectionName() dirty, err := adapter.HasBlankSlots(LineDbAdapterOptions{}) if err != nil { continue @@ -1336,17 +1352,28 @@ func (db *LineDb) isCollectionPartitioned(collectionName string) bool { return exists } -func (db *LineDb) getPartitionFiles(collectionName string) ([]string, error) { - baseName := db.getBaseCollectionName(collectionName) - var files []string - - for name, filename := range db.collections { +// partitionStorageAdaptersOrdered — для логического имени партиционированной коллекции baseName: +// сначала адаптер основного файла (baseName.jsonl), затем все baseName_* в лексикографическом порядке. +// Пустой ключ партиции пишет в основной файл; эти записи учитываются здесь первым адаптером. +func (db *LineDb) partitionStorageAdaptersOrdered(baseName string) []*JSONLFile { + var out []*JSONLFile + if a := db.adapters[baseName]; a != nil { + out = append(out, a) + } + var partNames []string + for name := range db.adapters { + if name == baseName { + continue + } if strings.HasPrefix(name, baseName+"_") { - files = append(files, filename) + partNames = append(partNames, name) } } - - return files, nil + sort.Strings(partNames) + for _, name := range partNames { + out = append(out, db.adapters[name]) + } + return out } func (db *LineDb) getPartitionAdapter(data any, collectionName string) (*JSONLFile, error) { @@ -1356,6 +1383,14 @@ func (db *LineDb) getPartitionAdapter(data any, collectionName string) (*JSONLFi } partitionID := partitionFn(data) + if strings.TrimSpace(partitionID) == "" { + adapter, ok := db.adapters[collectionName] + if !ok { + return nil, fmt.Errorf("collection %s not found", collectionName) + } + return adapter, nil + } + partitionName := fmt.Sprintf("%s_%s", collectionName, partitionID) adapter, exists := db.adapters[partitionName] @@ -1609,49 +1644,35 @@ func (db *LineDb) normalizeFilter(filter any) (any, error) { } func (db *LineDb) updatePartitioned(data any, collectionName string, filter any, options LineDbAdapterOptions) ([]any, error) { - partitionFiles, err := db.getPartitionFiles(collectionName) - if err != nil { - return nil, err - } dataMap, err := db.toMap(data) if err != nil { return nil, fmt.Errorf("invalid update data format: %w", err) } opts := db.getCollectionOptions(collectionName) + baseName := db.getBaseCollectionName(collectionName) var allResults []any - for _, filename := range partitionFiles { - var adapter *JSONLFile - var partitionName string - for name, adapterFile := range db.collections { - if adapterFile == filename { - adapter = db.adapters[name] - partitionName = name - break + for _, adapter := range db.partitionStorageAdaptersOrdered(baseName) { + partitionName := adapter.GetCollectionName() + if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { + upd, used, terr := db.tryIndexUpdate(adapter, filter, dataMap, collectionName, partitionName, opts.IndexedFields, options) + if terr != nil { + return nil, terr + } + if used { + allResults = append(allResults, upd...) + continue } } - - if adapter != nil { - if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { - upd, used, terr := db.tryIndexUpdate(adapter, filter, dataMap, collectionName, partitionName, opts.IndexedFields, options) - if terr != nil { - return nil, terr - } - if used { - allResults = append(allResults, upd...) - continue - } - } - results, err := adapter.Update(data, filter, options) - if err != nil { - return nil, err - } - allResults = append(allResults, results...) - if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { - records, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{InTransaction: true}) - if readErr == nil { - _ = db.indexStore.Rebuild(collectionName, partitionName, opts.IndexedFields, records, lineIdx) - } + results, err := adapter.Update(data, filter, options) + if err != nil { + return nil, err + } + allResults = append(allResults, results...) + if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { + records, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{InTransaction: true}) + if readErr == nil { + _ = db.indexStore.Rebuild(collectionName, partitionName, opts.IndexedFields, records, lineIdx) } } } @@ -1660,45 +1681,31 @@ func (db *LineDb) updatePartitioned(data any, collectionName string, filter any, } func (db *LineDb) deletePartitioned(data any, collectionName string, options LineDbAdapterOptions) ([]any, error) { - partitionFiles, err := db.getPartitionFiles(collectionName) - if err != nil { - return nil, err - } opts := db.getCollectionOptions(collectionName) + baseName := db.getBaseCollectionName(collectionName) var allResults []any - for _, filename := range partitionFiles { - var adapter *JSONLFile - var partitionName string - for name, adapterFile := range db.collections { - if adapterFile == filename { - adapter = db.adapters[name] - partitionName = name - break + for _, adapter := range db.partitionStorageAdaptersOrdered(baseName) { + partitionName := adapter.GetCollectionName() + if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { + del, used, derr := db.tryIndexDelete(adapter, data, collectionName, partitionName, opts.IndexedFields, options) + if derr != nil { + return nil, derr + } + if used { + allResults = append(allResults, del...) + continue } } - - if adapter != nil { - if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { - del, used, derr := db.tryIndexDelete(adapter, data, collectionName, partitionName, opts.IndexedFields, options) - if derr != nil { - return nil, derr - } - if used { - allResults = append(allResults, del...) - continue - } - } - results, err := adapter.Delete(data, options) - if err != nil { - return nil, err - } - allResults = append(allResults, results...) - if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { - records, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{InTransaction: true}) - if readErr == nil { - _ = db.indexStore.Rebuild(collectionName, partitionName, opts.IndexedFields, records, lineIdx) - } + results, err := adapter.Delete(data, options) + if err != nil { + return nil, err + } + allResults = append(allResults, results...) + if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 { + records, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{InTransaction: true}) + if readErr == nil { + _ = db.indexStore.Rebuild(collectionName, partitionName, opts.IndexedFields, records, lineIdx) } } } @@ -1707,32 +1714,15 @@ func (db *LineDb) deletePartitioned(data any, collectionName string, options Lin } func (db *LineDb) readByFilterPartitioned(filter any, collectionName string, options LineDbAdapterOptions) ([]any, error) { - // Получаем все партиции - partitionFiles, err := db.getPartitionFiles(collectionName) - if err != nil { - return nil, err - } - + baseName := db.getBaseCollectionName(collectionName) var allResults []any - for _, filename := range partitionFiles { - // Находим адаптер по имени файла - var adapter *JSONLFile - for name, adapterFile := range db.collections { - if adapterFile == filename { - adapter = db.adapters[name] - break - } - } - - if adapter != nil { - results, err := adapter.ReadByFilter(filter, options) - if err != nil { - return nil, err - } - allResults = append(allResults, results...) + for _, adapter := range db.partitionStorageAdaptersOrdered(baseName) { + results, err := adapter.ReadByFilter(filter, options) + if err != nil { + return nil, err } + allResults = append(allResults, results...) } - return allResults, nil }