Files
elowdb-go/pkg/linedb/line_db.go

1979 lines
59 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package linedb
import (
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
goccyjson "github.com/goccy/go-json"
)
// LineDb представляет основную базу данных
// Соответствует TypeScript классу LineDb
type LineDb struct {
adapters map[string]*JSONLFile
collections map[string]string
partitionFunctions map[string]func(any) string
mutex sync.RWMutex
cacheSize int
cacheExternal *RecordCache
nextIDFn func(any, string) (any, error)
lastIDManager *LastIDManager
cacheTTL time.Duration
constructorOptions *LineDbOptions
initOptions *LineDbInitOptions
indexStore IndexStore
indexRebuildDone chan struct{} // закрывается при Close, останавливает горутину IndexRebuildTimer
}
// NewLineDb создает новый экземпляр LineDb
func NewLineDb(options *LineDbOptions, adapters ...*JSONLFile) *LineDb {
if options == nil {
options = &LineDbOptions{}
}
db := &LineDb{
adapters: make(map[string]*JSONLFile),
collections: make(map[string]string),
partitionFunctions: make(map[string]func(any) string),
cacheSize: options.CacheSize,
cacheTTL: options.CacheTTL,
lastIDManager: GetLastIDManagerInstance(),
constructorOptions: options,
}
// Инициализируем кэш если нужно
if db.cacheSize > 0 && db.cacheTTL > 0 {
db.cacheExternal = NewRecordCache(db.cacheSize, db.cacheTTL)
}
// Добавляем готовые адаптеры
for _, adapter := range adapters {
collectionName := adapter.GetCollectionName()
db.adapters[collectionName] = adapter
db.collections[collectionName] = adapter.GetFilename()
}
db.indexStore = options.IndexStore
return db
}
// Init инициализирует базу данных
func (db *LineDb) Init(force bool, initOptions *LineDbInitOptions) error {
db.mutex.Lock()
defer db.mutex.Unlock()
if initOptions == nil {
return fmt.Errorf("no init options provided")
}
// Устанавливаем опции
db.initOptions = initOptions
db.cacheSize = initOptions.CacheSize
db.cacheTTL = initOptions.CacheTTL
// Инициализируем кэш если нужно
if db.cacheSize > 0 && db.cacheTTL > 0 {
db.cacheExternal = NewRecordCache(db.cacheSize, db.cacheTTL)
}
// Создаем папку базы данных
dbFolder := initOptions.DBFolder
if dbFolder == "" {
dbFolder = "linedb"
}
if err := os.MkdirAll(dbFolder, 0755); err != nil {
return fmt.Errorf("failed to create database folder: %w", err)
}
// Сохраняем функции партиционирования
for _, partition := range initOptions.Partitions {
if partition.PartIDFn != nil {
db.partitionFunctions[partition.CollectionName] = partition.PartIDFn
}
}
// Создаем адаптеры для коллекций
for i, adapterOptions := range initOptions.Collections {
collectionName := adapterOptions.CollectionName
if collectionName == "" {
collectionName = fmt.Sprintf("collection_%d", i+1)
}
// Создаем путь к файлу
filename := filepath.Join(dbFolder, collectionName+".jsonl")
// Создаем адаптер
adapter := NewJSONLFile(filename, adapterOptions.EncryptKeyForLineDb, adapterOptions)
// Инициализируем адаптер
if err := adapter.Init(force, LineDbAdapterOptions{}); err != nil {
return fmt.Errorf("failed to init adapter for collection %s: %w", collectionName, err)
}
// Добавляем в карту адаптеров
db.adapters[collectionName] = adapter
db.collections[collectionName] = filename
}
// Индексы: создаём in-memory по умолчанию, если не задан IndexStore и есть IndexedFields
if db.indexStore == nil {
for _, opt := range initOptions.Collections {
if len(opt.IndexedFields) > 0 {
db.indexStore = NewInMemoryIndexStore()
break
}
}
}
if db.indexStore != nil {
for _, opt := range initOptions.Collections {
if len(opt.IndexedFields) == 0 {
continue
}
if db.isCollectionPartitioned(opt.CollectionName) {
// Основной файл + все base_* (пустой partition id → основной файл)
baseName := opt.CollectionName
for _, adapter := range db.partitionStorageAdaptersOrdered(baseName) {
name := adapter.GetCollectionName()
records, lineIdx, err := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{})
if err != nil {
continue
}
_ = db.indexStore.Rebuild(opt.CollectionName, name, opt.IndexedFields, records, lineIdx)
}
} else {
adapter := db.adapters[opt.CollectionName]
if adapter != nil {
records, lineIdx, err := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{})
if err != nil {
return fmt.Errorf("failed to read collection %s for index rebuild: %w", opt.CollectionName, err)
}
if err := db.indexStore.Rebuild(opt.CollectionName, DefaultPartition, opt.IndexedFields, records, lineIdx); err != nil {
return fmt.Errorf("failed to rebuild index for %s: %w", opt.CollectionName, err)
}
}
}
}
}
// Периодический ребилд индексов (отдельная горутина; при тике — полная блокировка db.mutex)
if db.indexStore != nil && initOptions.IndexRebuildTimer > 0 {
db.indexRebuildDone = make(chan struct{})
interval := time.Duration(initOptions.IndexRebuildTimer) * time.Second
go db.indexRebuildTimerLoop(interval)
}
if err := db.seedLastIDsFromData(dbFolder); err != nil {
return err
}
return nil
}
// seedLastIDsFromData выставляет LastIDManager по максимальному id в существующих данных
// (после рестарта процесса счётчик не начинается с 1).
func (db *LineDb) seedLastIDsFromData(dbFolder string) error {
for i, opt := range db.initOptions.Collections {
collectionName := opt.CollectionName
if collectionName == "" {
collectionName = fmt.Sprintf("collection_%d", i+1)
}
if db.isCollectionPartitioned(collectionName) {
if err := db.seedLastIDPartitioned(dbFolder, collectionName); err != nil {
return err
}
continue
}
adapter := db.adapters[collectionName]
if adapter == nil {
continue
}
records, err := adapter.Read(LineDbAdapterOptions{})
if err != nil {
return fmt.Errorf("seed last id: read %s: %w", collectionName, err)
}
db.lastIDManager.SetLastID(collectionName, db.GetMaxID(records))
}
return nil
}
// seedLastIDPartitioned — максимальный id по базовому файлу и всем events_*.jsonl на диске.
// NextID для партиций вызывается с логическим именем (например events), ключ тот же.
func (db *LineDb) seedLastIDPartitioned(dbFolder, baseName string) error {
maxID := 0
if a := db.adapters[baseName]; a != nil {
recs, err := a.Read(LineDbAdapterOptions{})
if err != nil {
return fmt.Errorf("seed last id: read base %s: %w", baseName, err)
}
if m := db.GetMaxID(recs); m > maxID {
maxID = m
}
}
pattern := filepath.Join(dbFolder, baseName+"_*.jsonl")
matches, err := filepath.Glob(pattern)
if err != nil {
return fmt.Errorf("seed last id: glob %s: %w", pattern, err)
}
for _, path := range matches {
base := filepath.Base(path)
partName := strings.TrimSuffix(base, ".jsonl")
var adapter *JSONLFile
if existing, ok := db.adapters[partName]; ok {
adapter = existing
} else {
adapter = NewJSONLFile(path, "", JSONLFileOptions{CollectionName: partName})
if err := adapter.Init(false, LineDbAdapterOptions{}); err != nil {
continue
}
db.adapters[partName] = adapter
db.collections[partName] = path
}
recs, err := adapter.Read(LineDbAdapterOptions{})
if err != nil {
continue
}
if m := db.GetMaxID(recs); m > maxID {
maxID = m
}
}
db.lastIDManager.SetLastID(baseName, maxID)
return nil
}
// Read читает все записи из коллекции.
// Для партиционированной логической коллекции (имя без суффикса _*) читает основной файл
// и все файлы партиций base_* подряд. Запрос по имени одной партиции (base_part) читает только её файл.
func (db *LineDb) Read(collectionName string, options LineDbAdapterOptions) ([]any, error) {
db.mutex.RLock()
defer db.mutex.RUnlock()
if collectionName == "" {
collectionName = db.getFirstCollection()
}
baseName := db.getBaseCollectionName(collectionName)
if db.isCollectionPartitioned(baseName) && collectionName == baseName {
adapters := db.partitionStorageAdaptersOrdered(baseName)
if len(adapters) == 0 {
return nil, fmt.Errorf("collection %s not found", collectionName)
}
var all []any
for _, a := range adapters {
recs, err := a.Read(options)
if err != nil {
return nil, err
}
all = append(all, recs...)
}
return all, nil
}
adapter, exists := db.adapters[collectionName]
if !exists {
return nil, fmt.Errorf("collection %s not found", collectionName)
}
return adapter.Read(options)
}
// Insert вставляет новые записи в коллекцию
func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterOptions) error {
// Блокируем только если не в транзакции
if !options.InTransaction {
db.mutex.Lock()
defer db.mutex.Unlock()
}
if collectionName == "" {
collectionName = db.getFirstCollection()
}
// Проверяем debug tag
if options.DebugTag == "error" {
return fmt.Errorf("test error")
}
// Обрабатываем данные
dataArray := db.normalizeDataArray(data)
resultDataArray := make([]any, 0, len(dataArray))
for _, item := range dataArray {
itemMap, err := db.toMap(item)
if err != nil {
return fmt.Errorf("invalid data format: %w", err)
}
// Генерируем ID если отсутствует
if itemMap["id"] == nil || db.isInvalidID(itemMap["id"]) {
newID, err := db.NextID(item, collectionName)
if err != nil {
return fmt.Errorf("failed to generate ID: %w", err)
}
// Проверяем уникальность ID
done := false
count := 0
for !done && count < 10000 {
// Проверяем, что ID не существует в результатах
exists := false
for _, resultItem := range resultDataArray {
if resultMap, ok := resultItem.(map[string]any); ok {
if resultMap["id"] == newID {
exists = true
break
}
}
}
if !exists {
done = true
} else {
newID, err = db.NextID(item, collectionName)
if err != nil {
return fmt.Errorf("failed to generate unique ID: %w", err)
}
}
count++
}
if count >= 10000 {
return fmt.Errorf("can not generate new id for 10 000 iterations")
}
itemMap["id"] = newID
} else {
// Проверяем существование записи если не пропускаем проверку
if !options.SkipCheckExistingForWrite {
filter := map[string]any{"id": itemMap["id"]}
for key, partitionAdapter := range db.adapters {
if strings.Contains(key, collectionName) {
exists, err := partitionAdapter.ReadByFilter(filter, LineDbAdapterOptions{InTransaction: true})
if err != nil {
return fmt.Errorf("failed to check existing record: %w", err)
}
if len(exists) > 0 {
return fmt.Errorf("record with id %v already exists in collection %s", itemMap["id"], collectionName)
}
}
}
}
}
// Проверяем обязательные поля (required)
if err := db.checkRequiredFieldsInsert(itemMap, collectionName); err != nil {
return err
}
// Проверяем уникальность полей
if err := db.checkUniqueFieldsInsert(itemMap, collectionName, resultDataArray, options); err != nil {
return err
}
resultDataArray = append(resultDataArray, itemMap)
}
// Записываем данные с флагом транзакции
writeOptions := LineDbAdapterOptions{InTransaction: true, InternalCall: true}
if db.indexStore != nil {
opts := db.getCollectionOptions(collectionName)
if opts != nil && len(opts.IndexedFields) > 0 {
// Точечное IndexRecord в Write (LineCount → стартовая строка для новых записей)
writeOptions.DoIndexing = true
}
}
if err := db.Write(resultDataArray, collectionName, writeOptions); err != nil {
return fmt.Errorf("failed to write data: %w", err)
}
db.clearCacheForCollection(collectionName)
return nil
}
// Write записывает данные в коллекцию
func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOptions) error {
// Блокируем только если не в транзакции
if !options.InTransaction {
db.mutex.Lock()
defer db.mutex.Unlock()
}
if collectionName == "" {
collectionName = db.getFirstCollection()
}
// Проверяем партиционирование
if db.isCollectionPartitioned(collectionName) {
dataArray := db.normalizeDataArray(data)
opts := db.getCollectionOptions(collectionName)
for _, item := range dataArray {
adapter, err := db.getPartitionAdapter(item, collectionName)
if err != nil {
return fmt.Errorf("failed to get partition adapter: %w", err)
}
partitionName := adapter.GetCollectionName()
var startLine int
if options.DoIndexing && db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
if c, err := adapter.LineCount(); err == nil {
startLine = c
}
}
if err := adapter.Write(item, options); err != nil {
return fmt.Errorf("failed to write to partition: %w", err)
}
if options.DoIndexing && db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
if m, err := db.toMap(item); err == nil {
db.indexStore.IndexRecord(collectionName, partitionName, opts.IndexedFields, m, startLine)
}
}
}
return nil
}
// Обычная запись
adapter, exists := db.adapters[collectionName]
if !exists {
return fmt.Errorf("collection %s not found", collectionName)
}
var startLine int
if options.DoIndexing && db.indexStore != nil {
if c, err := adapter.LineCount(); err == nil {
startLine = c
}
}
if err := adapter.Write(data, options); err != nil {
return err
}
// Точечное индексирование при DoIndexing
if options.DoIndexing && db.indexStore != nil {
opts := db.getCollectionOptions(collectionName)
if opts != nil && len(opts.IndexedFields) > 0 {
dataArray := db.normalizeDataArray(data)
for i, record := range dataArray {
if m, err := db.toMap(record); err == nil {
db.indexStore.IndexRecord(collectionName, DefaultPartition, opts.IndexedFields, m, startLine+i)
}
}
}
}
return nil
}
// Update обновляет записи в коллекции
func (db *LineDb) Update(data any, collectionName string, filter any, options LineDbAdapterOptions) ([]any, error) {
// Блокируем только если не в транзакции
if !options.InTransaction {
db.mutex.Lock()
defer db.mutex.Unlock()
}
if collectionName == "" {
collectionName = db.getFirstCollection()
}
// Конвертируем data в map (struct или map)
dataMap, err := db.toMap(data)
if err != nil {
return nil, fmt.Errorf("invalid update data format: %w", err)
}
// Нормализуем фильтр (строка/struct -> map)
normFilter, err := db.normalizeFilter(filter)
if err != nil {
return nil, err
}
filter = normFilter
// Проверяем конфликт ID
if filterMap, ok := filter.(map[string]any); ok {
if dataMap["id"] != nil && filterMap["id"] != nil {
if !db.compareIDs(dataMap["id"], filterMap["id"]) {
return nil, fmt.Errorf("you can not update record id with filter by another id. Use delete and insert instead")
}
}
}
// Проверяем обязательные поля (required)
if err := db.checkRequiredFieldsUpdate(dataMap, collectionName); err != nil {
return nil, err
}
// Проверяем уникальность полей
if err := db.checkUniqueFieldsUpdate(dataMap, filter, collectionName, options); err != nil {
return nil, err
}
// Проверяем партиционирование
if db.isCollectionPartitioned(collectionName) {
return db.updatePartitioned(dataMap, collectionName, filter, options)
}
// Обычное обновление
adapter, exists := db.adapters[collectionName]
if !exists {
return nil, fmt.Errorf("collection %s not found", collectionName)
}
// Пробуем точечный Update через индекс (без полного чтения файла)
opts := db.getCollectionOptions(collectionName)
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 && !db.isCollectionPartitioned(collectionName) {
result, used, err := db.tryIndexUpdate(adapter, filter, dataMap, collectionName, DefaultPartition, opts.IndexedFields, options)
if err != nil {
return nil, err
}
if used {
db.clearCacheForCollection(collectionName)
return result, nil
}
}
result, err := adapter.Update(dataMap, filter, options)
if err != nil {
return nil, err
}
// Перестраиваем индекс после Update
if db.indexStore != nil {
opts := db.getCollectionOptions(collectionName)
if opts != nil && len(opts.IndexedFields) > 0 {
allRecords, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{})
if readErr == nil {
_ = db.indexStore.Rebuild(collectionName, DefaultPartition, opts.IndexedFields, allRecords, lineIdx)
}
}
}
db.clearCacheForCollection(collectionName)
return result, nil
}
// Delete удаляет записи из коллекции
func (db *LineDb) Delete(data any, collectionName string, options LineDbAdapterOptions) ([]any, error) {
// Блокируем только если не в транзакции
if !options.InTransaction {
db.mutex.Lock()
defer db.mutex.Unlock()
}
if collectionName == "" {
collectionName = db.getFirstCollection()
}
// Нормализуем фильтр удаления (строка/struct -> map)
normFilter, err := db.normalizeFilter(data)
if err != nil {
return nil, err
}
data = normFilter
// Проверяем партиционирование
if db.isCollectionPartitioned(collectionName) {
return db.deletePartitioned(data, collectionName, options)
}
// Обычное удаление
adapter, exists := db.adapters[collectionName]
if !exists {
return nil, fmt.Errorf("collection %s not found", collectionName)
}
opts := db.getCollectionOptions(collectionName)
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
result, used, err := db.tryIndexDelete(adapter, data, collectionName, DefaultPartition, opts.IndexedFields, options)
if err != nil {
return nil, err
}
if used {
db.clearCacheForCollection(collectionName)
return result, nil
}
}
result, err := adapter.Delete(data, options)
if err != nil {
return nil, err
}
// Перестраиваем индекс после Delete
if db.indexStore != nil {
opts := db.getCollectionOptions(collectionName)
if opts != nil && len(opts.IndexedFields) > 0 {
allRecords, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{})
if readErr == nil {
_ = db.indexStore.Rebuild(collectionName, DefaultPartition, opts.IndexedFields, allRecords, lineIdx)
}
}
}
db.clearCacheForCollection(collectionName)
return result, nil
}
// Select выполняет выборку с поддержкой цепочки
func (db *LineDb) Select(filter any, collectionName string, options LineDbAdapterOptions) (any, error) {
result, err := db.ReadByFilter(filter, collectionName, options)
if err != nil {
return nil, err
}
if options.ReturnChain {
return NewCollectionChain(result), nil
}
return result, nil
}
// ReadByFilter читает записи по фильтру
func (db *LineDb) ReadByFilter(filter any, collectionName string, options LineDbAdapterOptions) ([]any, error) {
if !options.InTransaction {
db.mutex.RLock()
defer db.mutex.RUnlock()
}
// Нормализуем фильтр (строка/struct -> map)
normFilter, err := db.normalizeFilter(filter)
if err != nil {
return nil, err
}
filter = normFilter
if collectionName == "" {
collectionName = db.getFirstCollection()
}
// Проверяем кэш
if db.cacheExternal != nil && !options.InTransaction {
if cached, exists := db.cacheExternal.Get(db.generateCacheKey(filter, collectionName)); exists {
if cachedArray, ok := cached.([]any); ok {
return cachedArray, nil
}
}
}
// Проверяем партиционирование
if db.isCollectionPartitioned(collectionName) {
if db.indexStore != nil {
hit, result, err := db.tryIndexLookupPartitioned(filter, collectionName, options)
if err != nil || (options.FailOnFailureIndexRead && !hit) {
return nil, fmt.Errorf("index read failed: %w", err)
}
if hit && err == nil {
if db.cacheExternal != nil && !options.InTransaction {
db.cacheExternal.Set(db.generateCacheKey(filter, collectionName), result)
}
return result, nil
}
}
// fallback to regular read
result, err := db.readByFilterPartitioned(filter, collectionName, options)
if err != nil {
return nil, err
}
// Обновляем кэш
if db.cacheExternal != nil && !options.InTransaction {
db.cacheExternal.Set(db.generateCacheKey(filter, collectionName), result)
}
return result, nil
}
adapter, exists := db.adapters[collectionName]
if !exists {
return nil, fmt.Errorf("collection %s not found", collectionName)
}
// Используем индекс, если фильтр — одно поле из IndexedFields
if db.indexStore != nil {
hit, result, err := db.tryIndexLookup(adapter, filter, collectionName, DefaultPartition, options)
if err != nil || (options.FailOnFailureIndexRead && !hit) {
return nil, fmt.Errorf("index read failed: %w", err)
}
if hit && err == nil {
if db.cacheExternal != nil && !options.InTransaction {
db.cacheExternal.Set(db.generateCacheKey(filter, collectionName), result)
}
return result, nil
}
}
// fallback to regular read
result, err := adapter.ReadByFilter(filter, options)
if err != nil {
return nil, err
}
// Обновляем кэш
if db.cacheExternal != nil && !options.InTransaction {
db.cacheExternal.Set(db.generateCacheKey(filter, collectionName), result)
}
return result, nil
}
// NextID генерирует следующий ID
func (db *LineDb) NextID(data any, collectionName string) (any, error) {
if db.nextIDFn != nil {
return db.nextIDFn(data, collectionName)
}
// Используем LastIDManager по умолчанию
lastID := db.lastIDManager.GetLastID(collectionName)
newID := lastID + 1
db.lastIDManager.SetLastID(collectionName, newID)
return newID, nil
}
// LastSequenceID возвращает последний последовательный ID
func (db *LineDb) LastSequenceID(collectionName string) int {
if collectionName == "" {
collectionName = db.getFirstCollection()
}
return db.lastIDManager.GetLastID(collectionName)
}
// ClearCache очищает кэш
func (db *LineDb) ClearCache(collectionName string, options LineDbAdapterOptions) error {
// Блокируем только если не в транзакции
if !options.InTransaction {
db.mutex.Lock()
defer db.mutex.Unlock()
}
if db.cacheExternal != nil {
if collectionName == "" {
db.cacheExternal.Clear()
} else {
db.cacheExternal.ClearCollection(collectionName)
}
}
return nil
}
// clearCacheForCollection сбрасывает кэш запросов по логической коллекции (включая ключи партиций base_*).
func (db *LineDb) clearCacheForCollection(collectionName string) {
if db.cacheExternal == nil || collectionName == "" {
return
}
db.cacheExternal.ClearCollection(collectionName)
}
// Close закрывает базу данных
func (db *LineDb) Close() {
db.mutex.Lock()
defer db.mutex.Unlock()
// Останавливаем горутину периодического ребилда индексов (IndexRebuildTimer)
if db.indexRebuildDone != nil {
close(db.indexRebuildDone)
db.indexRebuildDone = nil
}
// Закрываем все адаптеры
for _, adapter := range db.adapters {
adapter.Destroy()
}
// Останавливаем и очищаем кэш
if db.cacheExternal != nil {
db.cacheExternal.Stop()
db.cacheExternal.Clear()
}
// Очищаем карты
db.adapters = make(map[string]*JSONLFile)
db.collections = make(map[string]string)
db.partitionFunctions = make(map[string]func(any) string)
}
// indexRebuildTimerLoop выполняет полный ребилд всех индексов с заданным интервалом (IndexRebuildTimer).
// Работает в отдельной горутине; при каждом тике берёт db.mutex.Lock() на время ребилда.
// Перед ребилдом коллекции: если в файле есть пустые слоты (след точечного удаления), вызывается
// CompactFile — перезапись без «дыр», затем индексирование по сжатому файлу.
// Останавливается при закрытии db.indexRebuildDone.
func (db *LineDb) indexRebuildTimerLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-db.indexRebuildDone:
return
case <-ticker.C:
db.mutex.Lock()
if db.indexStore != nil && db.initOptions != nil {
for _, opt := range db.initOptions.Collections {
if len(opt.IndexedFields) == 0 {
continue
}
if db.isCollectionPartitioned(opt.CollectionName) {
baseName := opt.CollectionName
for _, adapter := range db.partitionStorageAdaptersOrdered(baseName) {
name := adapter.GetCollectionName()
dirty, err := adapter.HasBlankSlots(LineDbAdapterOptions{})
if err != nil {
continue
}
if dirty {
_ = adapter.CompactFile(LineDbAdapterOptions{})
}
records, lineIdx, err := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{})
if err != nil {
continue
}
_ = db.indexStore.Rebuild(opt.CollectionName, name, opt.IndexedFields, records, lineIdx)
}
} else {
adapter := db.adapters[opt.CollectionName]
if adapter != nil {
dirty, err := adapter.HasBlankSlots(LineDbAdapterOptions{})
if err == nil && dirty {
_ = adapter.CompactFile(LineDbAdapterOptions{})
}
records, lineIdx, err := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{})
if err == nil {
_ = db.indexStore.Rebuild(opt.CollectionName, DefaultPartition, opt.IndexedFields, records, lineIdx)
}
}
}
}
}
db.mutex.Unlock()
}
}
}
// Вспомогательные методы
func (db *LineDb) getFirstCollection() string {
for name := range db.adapters {
return name
}
return ""
}
func (db *LineDb) getBaseCollectionName(collectionName string) string {
if idx := strings.Index(collectionName, "_"); idx != -1 {
return collectionName[:idx]
}
return collectionName
}
// tryIndexLookup пытается выполнить поиск через индекс (для одной партиции/коллекции).
func (db *LineDb) tryIndexLookup(adapter *JSONLFile, filter any, collectionName, partition string, options LineDbAdapterOptions) (bool, []any, error) {
filterMap, ok := filter.(map[string]any)
if !ok || len(filterMap) == 0 {
return false, nil, nil
}
opts := db.getCollectionOptions(collectionName)
if opts == nil || len(opts.IndexedFields) == 0 {
return false, nil, nil
}
var field string
var value any
for _, idxField := range opts.IndexedFields {
if v, exists := filterMap[idxField]; exists {
field = idxField
value = v
break
}
}
if field == "" {
return false, nil, nil
}
valStr := valueToIndexKey(value)
positions, err := db.indexStore.Lookup(collectionName, field, valStr)
if err != nil || len(positions) == 0 {
return false, nil, err
}
// Фильтруем только позиции нужной партиции
var lineIndexes []int
for _, p := range positions {
if p.Partition == partition {
lineIndexes = append(lineIndexes, p.LineIndex)
}
}
if len(lineIndexes) == 0 {
return false, nil, nil
}
records, err := adapter.ReadByLineIndexes(lineIndexes, options)
if err != nil {
return false, nil, err
}
// Если фильтр — одно поле, уже отфильтровано индексом
if len(filterMap) == 1 {
return true, records, nil
}
// Иначе накладываем полный фильтр по остальным полям в памяти
var filtered []any
for _, rec := range records {
if db.matchesFilter(rec, filter, options.StrictCompare) {
filtered = append(filtered, rec)
}
}
return true, filtered, nil
}
// tryIndexLookupPartitioned — поиск через индекс для партиционированной коллекции.
func (db *LineDb) tryIndexLookupPartitioned(filter any, collectionName string, options LineDbAdapterOptions) (bool, []any, error) {
opts := db.getCollectionOptions(collectionName)
if opts == nil || len(opts.IndexedFields) == 0 {
return false, nil, nil
}
filterMap, ok := filter.(map[string]any)
if !ok || len(filterMap) == 0 {
return false, nil, nil
}
var field string
var value any
for _, idxField := range opts.IndexedFields {
if v, exists := filterMap[idxField]; exists {
field = idxField
value = v
break
}
}
if field == "" {
return false, nil, nil
}
valStr := valueToIndexKey(value)
positions, err := db.indexStore.Lookup(collectionName, field, valStr)
if err != nil || len(positions) == 0 {
return false, nil, err
}
// Группируем по партиции
byPartition := make(map[string][]int)
for _, p := range positions {
byPartition[p.Partition] = append(byPartition[p.Partition], p.LineIndex)
}
var allRecords []any
for partitionName, lineIndexes := range byPartition {
adapter := db.adapters[partitionName]
if adapter == nil {
continue
}
recs, err := adapter.ReadByLineIndexes(lineIndexes, options)
if err != nil {
return false, nil, err
}
allRecords = append(allRecords, recs...)
}
if len(allRecords) == 0 {
return false, nil, nil
}
if len(filterMap) == 1 {
return true, allRecords, nil
}
var filtered []any
for _, rec := range allRecords {
if db.matchesFilter(rec, filter, options.StrictCompare) {
filtered = append(filtered, rec)
}
}
return true, filtered, nil
}
// indexedFieldKeysEqual возвращает true, если ключи индекса по всем indexedFields совпадают
// (та же семантика, что getFieldValue/valueToIndexKey в индексе).
func indexedFieldKeysEqual(oldRec, newRec map[string]any, indexedFields []string) bool {
for _, f := range indexedFields {
if getFieldValue(oldRec, f) != getFieldValue(newRec, f) {
return false
}
}
return true
}
// tryIndexUpdate выполняет точечное обновление через индекс. Возвращает (result, used, err).
// partition — имя партиции для IndexStore (для непартиционированных коллекций: DefaultPartition).
func (db *LineDb) tryIndexUpdate(adapter *JSONLFile, filter any, dataMap map[string]any, collectionName string, partition string, indexedFields []string, options LineDbAdapterOptions) ([]any, bool, error) {
if partition == "" {
partition = DefaultPartition
}
filterMap, ok := filter.(map[string]any)
if !ok || len(filterMap) == 0 {
return nil, false, nil
}
var field string
var value any
for _, idxField := range indexedFields {
if v, exists := filterMap[idxField]; exists {
field = idxField
value = v
break
}
}
if field == "" {
return nil, false, nil
}
valStr := valueToIndexKey(value)
posList, err := db.indexStore.Lookup(collectionName, field, valStr)
if err != nil || len(posList) == 0 {
return nil, false, err
}
var indexes []int
for _, p := range posList {
if p.Partition == partition {
indexes = append(indexes, p.LineIndex)
}
}
if len(indexes) == 0 {
return nil, false, nil
}
opt := options
opt.InTransaction = true
records, positions, err := adapter.ReadByLineIndexesWithPositions(indexes, opt)
if err != nil {
return nil, false, err
}
var toUpdate []any
var toUpdatePos []int
for i, rec := range records {
if !db.matchesFilter(rec, filter, options.StrictCompare) {
continue
}
toUpdate = append(toUpdate, rec)
toUpdatePos = append(toUpdatePos, positions[i])
}
if len(toUpdate) == 0 {
return []any{}, true, nil
}
var updated []any
for _, rec := range toUpdate {
m, ok := rec.(map[string]any)
if !ok {
continue
}
upd := make(map[string]any)
for k, v := range m {
upd[k] = v
}
for k, v := range dataMap {
upd[k] = v
}
updated = append(updated, upd)
}
if err := adapter.WriteAtLineIndexes(updated, toUpdatePos, opt); err != nil {
return nil, false, err
}
for i := range toUpdate {
oldM, ok1 := toUpdate[i].(map[string]any)
newM, ok2 := updated[i].(map[string]any)
if !ok1 || !ok2 {
continue
}
if indexedFieldKeysEqual(oldM, newM, indexedFields) {
continue
}
db.indexStore.UnindexRecord(collectionName, partition, indexedFields, oldM, toUpdatePos[i])
db.indexStore.IndexRecord(collectionName, partition, indexedFields, newM, toUpdatePos[i])
}
return updated, true, nil
}
// tryIndexDelete выполняет точечное удаление через индекс. Возвращает (deletedRecords, used, err).
func (db *LineDb) tryIndexDelete(adapter *JSONLFile, filter any, collectionName string, partition string, indexedFields []string, options LineDbAdapterOptions) ([]any, bool, error) {
if partition == "" {
partition = DefaultPartition
}
filterMap, ok := filter.(map[string]any)
if !ok || len(filterMap) == 0 {
return nil, false, nil
}
var field string
var value any
for _, idxField := range indexedFields {
if v, exists := filterMap[idxField]; exists {
field = idxField
value = v
break
}
}
if field == "" {
return nil, false, nil
}
valStr := valueToIndexKey(value)
posList, err := db.indexStore.Lookup(collectionName, field, valStr)
if err != nil || len(posList) == 0 {
return nil, false, err
}
var indexes []int
for _, p := range posList {
if p.Partition == partition {
indexes = append(indexes, p.LineIndex)
}
}
if len(indexes) == 0 {
return nil, false, nil
}
opt := options
opt.InTransaction = true
records, positions, err := adapter.ReadByLineIndexesWithPositions(indexes, opt)
if err != nil {
return nil, false, err
}
var toDel []any
var toDelPos []int
for i, rec := range records {
if !db.matchesFilter(rec, filter, options.StrictCompare) {
continue
}
toDel = append(toDel, rec)
toDelPos = append(toDelPos, positions[i])
}
if len(toDel) == 0 {
return []any{}, true, nil
}
if err := adapter.BlankLinesAtPositions(toDelPos, opt); err != nil {
return nil, false, err
}
for i, rec := range toDel {
if m, ok := rec.(map[string]any); ok {
db.indexStore.UnindexRecord(collectionName, partition, indexedFields, m, toDelPos[i])
}
}
return toDel, true, nil
}
// getCollectionOptions возвращает опции коллекции (для партиционированных — опции базовой коллекции)
func (db *LineDb) getCollectionOptions(collectionName string) *JSONLFileOptions {
if db.initOptions == nil {
return nil
}
baseName := db.getBaseCollectionName(collectionName)
for i := range db.initOptions.Collections {
opts := &db.initOptions.Collections[i]
if opts.CollectionName == collectionName || opts.CollectionName == baseName {
return opts
}
}
return nil
}
// isValueEmpty проверяет, считается ли значение "пустым" (nil или "" для string) — для обратной совместимости
func (db *LineDb) isValueEmpty(v any) bool {
if v == nil {
return true
}
if s, ok := v.(string); ok && s == "" {
return true
}
return false
}
// isEmptyByMode проверяет, пусто ли значение по заданному режиму
func (db *LineDb) isEmptyByMode(m map[string]any, key string, mode EmptyValueMode) bool {
if mode == 0 {
mode = DefaultEmptyModeForUnique
}
v, exists := m[key]
if mode&EmptyModeAbsentKey != 0 && !exists {
return true
}
if mode&EmptyModeNil != 0 && v == nil {
return true
}
if mode&EmptyModeZeroValue != 0 {
switch val := v.(type) {
case string:
return val == ""
case int:
return val == 0
case int64:
return val == 0
case float64:
return val == 0
}
}
return false
}
// getUniqueFieldConstraints возвращает поля с unique и их EmptyMode (из FieldConstraints или UniqueFields)
func (db *LineDb) getUniqueFieldConstraints(collectionName string) []FieldConstraint {
opts := db.getCollectionOptions(collectionName)
if opts == nil {
return nil
}
if len(opts.FieldConstraints) > 0 {
var out []FieldConstraint
for _, fc := range opts.FieldConstraints {
if fc.Unique {
mode := fc.EmptyMode
if mode == 0 {
mode = DefaultEmptyModeForUnique
}
out = append(out, FieldConstraint{Name: fc.Name, Unique: true, EmptyMode: mode})
}
}
return out
}
// Legacy UniqueFields
var out []FieldConstraint
for _, name := range opts.UniqueFields {
out = append(out, FieldConstraint{Name: name, Unique: true, EmptyMode: DefaultEmptyModeForUnique})
}
return out
}
// getRequiredFieldConstraints возвращает required поля и их EmptyMode
func (db *LineDb) getRequiredFieldConstraints(collectionName string) []FieldConstraint {
opts := db.getCollectionOptions(collectionName)
if opts == nil {
return nil
}
var out []FieldConstraint
for _, fc := range opts.FieldConstraints {
if fc.Required {
mode := fc.EmptyMode
if mode == 0 {
mode = DefaultEmptyModeForRequired
}
out = append(out, FieldConstraint{Name: fc.Name, Required: true, EmptyMode: mode})
}
}
return out
}
// checkRequiredFieldsInsert проверяет обязательные поля при вставке
func (db *LineDb) checkRequiredFieldsInsert(itemMap map[string]any, collectionName string) error {
required := db.getRequiredFieldConstraints(collectionName)
for _, fc := range required {
if db.isEmptyByMode(itemMap, fc.Name, fc.EmptyMode) {
return fmt.Errorf("required field %q is empty in collection %q", fc.Name, collectionName)
}
}
return nil
}
// checkRequiredFieldsUpdate проверяет, что при Update не устанавливаются пустые значения в required полях
func (db *LineDb) checkRequiredFieldsUpdate(data map[string]any, collectionName string) error {
required := db.getRequiredFieldConstraints(collectionName)
for _, fc := range required {
if _, inData := data[fc.Name]; !inData {
continue // поле не обновляется — не проверяем
}
if db.isEmptyByMode(data, fc.Name, fc.EmptyMode) {
return fmt.Errorf("required field %q cannot be set to empty in collection %q", fc.Name, collectionName)
}
}
return nil
}
// checkUniqueFieldsInsert проверяет уникальность полей при вставке
func (db *LineDb) checkUniqueFieldsInsert(itemMap map[string]any, collectionName string, resultDataArray []any, options LineDbAdapterOptions) error {
if options.SkipCheckExistingForWrite {
return nil
}
uniqueFields := db.getUniqueFieldConstraints(collectionName)
if len(uniqueFields) == 0 {
return nil
}
for _, fc := range uniqueFields {
fieldName := fc.Name
if db.isEmptyByMode(itemMap, fieldName, fc.EmptyMode) {
continue
}
value := itemMap[fieldName]
// Проверяем в batch (уже добавляемые записи)
for _, resultItem := range resultDataArray {
if resultMap, ok := resultItem.(map[string]any); ok {
if db.valuesMatch(resultMap[fieldName], value, true) {
return fmt.Errorf("unique constraint violation: field %q value %v already exists in collection %q",
fieldName, value, collectionName)
}
}
}
// Проверяем в БД (при Insert записи ещё нет, поэтому любое совпадение — конфликт)
filter := map[string]any{fieldName: value}
existing, err := db.ReadByFilter(filter, collectionName, LineDbAdapterOptions{InTransaction: true})
if err != nil {
return fmt.Errorf("failed to check unique field %q: %w", fieldName, err)
}
if len(existing) > 0 {
return fmt.Errorf("unique constraint violation: field %q value %v already exists in collection %q",
fieldName, value, collectionName)
}
}
return nil
}
// checkUniqueFieldsUpdate проверяет уникальность полей при обновлении
func (db *LineDb) checkUniqueFieldsUpdate(data map[string]any, filter any, collectionName string, options LineDbAdapterOptions) error {
if options.SkipCheckExistingForWrite {
return nil
}
uniqueFields := db.getUniqueFieldConstraints(collectionName)
if len(uniqueFields) == 0 {
return nil
}
recordsToUpdate, err := db.ReadByFilter(filter, collectionName, LineDbAdapterOptions{InTransaction: true})
if err != nil {
return fmt.Errorf("failed to read records for update: %w", err)
}
updatingIDs := make(map[any]bool)
for _, rec := range recordsToUpdate {
if m, ok := rec.(map[string]any); ok && m["id"] != nil {
updatingIDs[m["id"]] = true
}
}
for _, fc := range uniqueFields {
fieldName := fc.Name
value, inData := data[fieldName]
if !inData || db.isEmptyByMode(data, fieldName, fc.EmptyMode) {
continue
}
existing, err := db.ReadByFilter(map[string]any{fieldName: value}, collectionName, LineDbAdapterOptions{InTransaction: true})
if err != nil {
return fmt.Errorf("failed to check unique field %q: %w", fieldName, err)
}
for _, rec := range existing {
if recMap, ok := rec.(map[string]any); ok {
if updatingIDs[recMap["id"]] {
continue
}
return fmt.Errorf("unique constraint violation: field %q value %v already exists in collection %q",
fieldName, value, collectionName)
}
}
}
return nil
}
func (db *LineDb) isCollectionPartitioned(collectionName string) bool {
_, exists := db.partitionFunctions[collectionName]
return exists
}
// partitionStorageAdaptersOrdered — для логического имени партиционированной коллекции baseName:
// сначала адаптер основного файла (baseName.jsonl), затем все baseName_* в лексикографическом порядке.
// Пустой ключ партиции пишет в основной файл; эти записи учитываются здесь первым адаптером.
func (db *LineDb) partitionStorageAdaptersOrdered(baseName string) []*JSONLFile {
var out []*JSONLFile
if a := db.adapters[baseName]; a != nil {
out = append(out, a)
}
var partNames []string
for name := range db.adapters {
if name == baseName {
continue
}
if strings.HasPrefix(name, baseName+"_") {
partNames = append(partNames, name)
}
}
sort.Strings(partNames)
for _, name := range partNames {
out = append(out, db.adapters[name])
}
return out
}
func (db *LineDb) getPartitionAdapter(data any, collectionName string) (*JSONLFile, error) {
partitionFn, exists := db.partitionFunctions[collectionName]
if !exists {
return nil, fmt.Errorf("partition function not found for collection %s", collectionName)
}
partitionID := partitionFn(data)
if strings.TrimSpace(partitionID) == "" {
adapter, ok := db.adapters[collectionName]
if !ok {
return nil, fmt.Errorf("collection %s not found", collectionName)
}
return adapter, nil
}
partitionName := fmt.Sprintf("%s_%s", collectionName, partitionID)
adapter, exists := db.adapters[partitionName]
if !exists {
// Создаем новый адаптер для партиции
filename := filepath.Join(db.initOptions.DBFolder, partitionName+".jsonl")
adapter = NewJSONLFile(filename, "", JSONLFileOptions{CollectionName: partitionName})
if err := adapter.Init(false, LineDbAdapterOptions{}); err != nil {
return nil, fmt.Errorf("failed to init partition adapter: %w", err)
}
db.adapters[partitionName] = adapter
db.collections[partitionName] = filename
}
return adapter, nil
}
func (db *LineDb) GetMaxID(records []any) int {
maxID := 0
for _, record := range records {
if recordMap, ok := record.(map[string]any); ok {
if id, ok := recordMap["id"]; ok {
n := idToIntForMax(id)
if n > maxID {
maxID = n
}
}
}
}
return maxID
}
func idToIntForMax(id any) int {
switch v := id.(type) {
case int:
return v
case int64:
return int(v)
case float64:
return int(v)
case float32:
return int(v)
default:
return 0
}
}
func (db *LineDb) matchesFilter(record any, filter any, strictCompare bool) bool {
if recordMap, ok := record.(map[string]any); ok {
if filterMap, ok := filter.(map[string]any); ok {
for key, filterValue := range filterMap {
if recordValue, exists := recordMap[key]; exists {
if !db.valuesMatch(recordValue, filterValue, strictCompare) {
return false
}
} else if strictCompare {
return false
}
}
return true
}
}
return false
}
func (db *LineDb) valuesMatch(a, b any, strictCompare bool) bool {
if strictCompare {
return a == b
}
// Нестрогое сравнение
if a == b {
return true
}
// Сравнение строк
if aStr, ok := a.(string); ok {
if bStr, ok := b.(string); ok {
return strings.EqualFold(aStr, bStr)
}
}
// Сравнение чисел
if aNum, ok := db.toNumber(a); ok {
if bNum, ok := db.toNumber(b); ok {
return aNum == bNum
}
}
return false
}
func (db *LineDb) toNumber(value any) (float64, bool) {
switch v := value.(type) {
case int:
return float64(v), true
case int64:
return float64(v), true
case float64:
return v, true
case string:
if f, err := strconv.ParseFloat(v, 64); err == nil {
return f, true
}
}
return 0, false
}
// toMap конвертирует struct или map в map[string]any (для Insert/Update)
func (db *LineDb) toMap(v any) (map[string]any, error) {
if m, ok := v.(map[string]any); ok {
return m, nil
}
data, err := goccyjson.Marshal(v)
if err != nil {
return nil, err
}
var result map[string]any
return result, goccyjson.Unmarshal(data, &result)
}
func (db *LineDb) normalizeDataArray(data any) []any {
switch v := data.(type) {
case []any:
return v
case any:
return []any{v}
default:
return []any{data}
}
}
func (db *LineDb) isInvalidID(id any) bool {
if id == nil {
return true
}
switch v := id.(type) {
case int:
return v <= -1
case int8:
return v <= -1
case int16:
return v <= -1
case int32:
return v <= -1
case int64:
return v <= -1
case uint, uint8, uint16, uint32, uint64:
return false
case float32:
if v != v { // NaN
return true
}
return float64(v) <= -1
case float64:
if v != v { // NaN
return true
}
return v <= -1
default:
return false
}
}
func (db *LineDb) compareIDs(a, b any) bool {
return a == b
}
func (db *LineDb) generateCacheKey(filter any, collectionName string) string {
// Упрощенная реализация генерации ключа кэша
return fmt.Sprintf("%s:%v", collectionName, filter)
}
// normalizeFilter приводит произвольный filter к более удобной форме:
// - string вида "field:value, field2:value2" -> map[string]any{"field": "value", "field2": "value2"}
// - простая string без ":" -> map[string]any{"id": value}
// - struct -> map[string]any (через toMap)
// - map[string]any и func(any) bool не меняются.
func (db *LineDb) normalizeFilter(filter any) (any, error) {
switch f := filter.(type) {
case nil:
return nil, nil
case map[string]any:
return f, nil
case func(any) bool:
return f, nil
case string:
s := strings.TrimSpace(f)
if s == "" {
return nil, nil
}
// pattern: field:value, field2:value2
if strings.Contains(s, ":") {
result := make(map[string]any)
parts := strings.Split(s, ",")
for _, p := range parts {
p = strings.TrimSpace(p)
if p == "" {
continue
}
kv := strings.SplitN(p, ":", 2)
if len(kv) != 2 {
continue
}
key := strings.TrimSpace(kv[0])
val := strings.TrimSpace(kv[1])
if key == "" {
continue
}
result[key] = val
}
if len(result) == 0 {
return map[string]any{"id": s}, nil
}
return result, nil
}
// простая строка — считаем фильтром по id
return map[string]any{"id": s}, nil
default:
if m, ok := f.(map[string]any); ok {
return m, nil
}
// Пытаемся трактовать как struct и конвертировать в map
m, err := db.toMap(f)
if err != nil {
// если не получилось — возвращаем как есть
return filter, nil
}
return m, nil
}
}
func (db *LineDb) updatePartitioned(data any, collectionName string, filter any, options LineDbAdapterOptions) ([]any, error) {
dataMap, err := db.toMap(data)
if err != nil {
return nil, fmt.Errorf("invalid update data format: %w", err)
}
opts := db.getCollectionOptions(collectionName)
baseName := db.getBaseCollectionName(collectionName)
var allResults []any
for _, adapter := range db.partitionStorageAdaptersOrdered(baseName) {
partitionName := adapter.GetCollectionName()
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
upd, used, terr := db.tryIndexUpdate(adapter, filter, dataMap, collectionName, partitionName, opts.IndexedFields, options)
if terr != nil {
return nil, terr
}
if used {
allResults = append(allResults, upd...)
continue
}
}
results, err := adapter.Update(data, filter, options)
if err != nil {
return nil, err
}
allResults = append(allResults, results...)
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
records, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{InTransaction: true})
if readErr == nil {
_ = db.indexStore.Rebuild(collectionName, partitionName, opts.IndexedFields, records, lineIdx)
}
}
}
db.clearCacheForCollection(collectionName)
return allResults, nil
}
func (db *LineDb) deletePartitioned(data any, collectionName string, options LineDbAdapterOptions) ([]any, error) {
opts := db.getCollectionOptions(collectionName)
baseName := db.getBaseCollectionName(collectionName)
var allResults []any
for _, adapter := range db.partitionStorageAdaptersOrdered(baseName) {
partitionName := adapter.GetCollectionName()
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
del, used, derr := db.tryIndexDelete(adapter, data, collectionName, partitionName, opts.IndexedFields, options)
if derr != nil {
return nil, derr
}
if used {
allResults = append(allResults, del...)
continue
}
}
results, err := adapter.Delete(data, options)
if err != nil {
return nil, err
}
allResults = append(allResults, results...)
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
records, lineIdx, readErr := adapter.ReadWithPhysicalLineIndexes(LineDbAdapterOptions{InTransaction: true})
if readErr == nil {
_ = db.indexStore.Rebuild(collectionName, partitionName, opts.IndexedFields, records, lineIdx)
}
}
}
db.clearCacheForCollection(collectionName)
return allResults, nil
}
func (db *LineDb) readByFilterPartitioned(filter any, collectionName string, options LineDbAdapterOptions) ([]any, error) {
baseName := db.getBaseCollectionName(collectionName)
var allResults []any
for _, adapter := range db.partitionStorageAdaptersOrdered(baseName) {
results, err := adapter.ReadByFilter(filter, options)
if err != nil {
return nil, err
}
allResults = append(allResults, results...)
}
return allResults, nil
}
// Добавляем недостающие методы
// SelectWithPagination выполняет выборку с пагинацией
func (db *LineDb) SelectWithPagination(filter any, page, limit int, collectionName string, options LineDbAdapterOptions) (*PaginatedResult, error) {
if page < 1 {
page = 1
}
if limit < 1 {
limit = 20
}
// Получаем все данные
allData, err := db.ReadByFilter(filter, collectionName, options)
if err != nil {
return nil, err
}
total := len(allData)
pages := (total + limit - 1) / limit // Округление вверх
// Вычисляем индексы для пагинации
start := (page - 1) * limit
end := start + limit
if end > total {
end = total
}
var data []any
if start < total {
data = allData[start:end]
}
return &PaginatedResult{
Data: data,
Total: total,
Limit: limit,
Pages: pages,
Page: page,
}, nil
}
// Join выполняет операцию JOIN между коллекциями
func (db *LineDb) Join(leftCollection, rightCollection any, options JoinOptions) (*CollectionChain, error) {
// Получаем данные левой коллекции
var leftData []any
switch v := leftCollection.(type) {
case string:
data, err := db.Read(v, LineDbAdapterOptions{InTransaction: options.InTransaction})
if err != nil {
return nil, err
}
leftData = data
case []any:
leftData = v
default:
return nil, fmt.Errorf("invalid left collection type")
}
// Получаем данные правой коллекции
var rightData []any
switch v := rightCollection.(type) {
case string:
data, err := db.Read(v, LineDbAdapterOptions{InTransaction: options.InTransaction})
if err != nil {
return nil, err
}
rightData = data
case []any:
rightData = v
default:
return nil, fmt.Errorf("invalid right collection type")
}
// Применяем фильтры
if options.LeftFilter != nil {
leftData = db.applyFilter(leftData, options.LeftFilter, options.StrictCompare)
}
if options.RightFilter != nil {
rightData = db.applyFilter(rightData, options.RightFilter, options.StrictCompare)
}
// Выполняем JOIN
var result []any
switch options.Type {
case JoinTypeInner:
result = db.innerJoin(leftData, rightData, options)
case JoinTypeLeft:
result = db.leftJoin(leftData, rightData, options)
case JoinTypeRight:
result = db.rightJoin(leftData, rightData, options)
case JoinTypeFull:
result = db.fullJoin(leftData, rightData, options)
default:
return nil, fmt.Errorf("unsupported join type: %s", options.Type)
}
return NewCollectionChain(result), nil
}
func (db *LineDb) applyFilter(data []any, filter map[string]any, strictCompare bool) []any {
var result []any
for _, item := range data {
if db.matchesFilter(item, filter, strictCompare) {
result = append(result, item)
}
}
return result
}
func (db *LineDb) innerJoin(leftData, rightData []any, options JoinOptions) []any {
var result []any
for _, left := range leftData {
for _, right := range rightData {
if db.matchJoinFields(left, right, options.LeftFields, options.RightFields, options.StrictCompare) {
result = append(result, JoinResult{Left: left, Right: right})
if options.OnlyOneFromRight {
break
}
}
}
}
return result
}
func (db *LineDb) leftJoin(leftData, rightData []any, options JoinOptions) []any {
var result []any
for _, left := range leftData {
matched := false
for _, right := range rightData {
if db.matchJoinFields(left, right, options.LeftFields, options.RightFields, options.StrictCompare) {
result = append(result, JoinResult{Left: left, Right: right})
matched = true
if options.OnlyOneFromRight {
break
}
}
}
if !matched {
result = append(result, JoinResult{Left: left, Right: nil})
}
}
return result
}
func (db *LineDb) rightJoin(leftData, rightData []any, options JoinOptions) []any {
var result []any
for _, right := range rightData {
matched := false
for _, left := range leftData {
if db.matchJoinFields(left, right, options.LeftFields, options.RightFields, options.StrictCompare) {
result = append(result, JoinResult{Left: left, Right: right})
matched = true
if options.OnlyOneFromRight {
break
}
}
}
if !matched {
result = append(result, JoinResult{Left: nil, Right: right})
}
}
return result
}
func (db *LineDb) fullJoin(leftData, rightData []any, options JoinOptions) []any {
// Объединяем LEFT и RIGHT JOIN
leftResult := db.leftJoin(leftData, rightData, options)
rightResult := db.rightJoin(leftData, rightData, options)
// Удаляем дубликаты
seen := make(map[string]bool)
var result []any
for _, item := range append(leftResult, rightResult...) {
key := db.generateJoinKey(item)
if !seen[key] {
seen[key] = true
result = append(result, item)
}
}
return result
}
func (db *LineDb) matchJoinFields(left, right any, leftFields, rightFields []string, strictCompare bool) bool {
if len(leftFields) != len(rightFields) {
return false
}
leftMap, leftOk := left.(map[string]any)
rightMap, rightOk := right.(map[string]any)
if !leftOk || !rightOk {
return false
}
for i, leftField := range leftFields {
rightField := rightFields[i]
leftValue := leftMap[leftField]
rightValue := rightMap[rightField]
if !db.valuesMatch(leftValue, rightValue, strictCompare) {
return false
}
}
return true
}
func (db *LineDb) generateJoinKey(item any) string {
// Упрощенная реализация генерации ключа для JOIN
if joinResult, ok := item.(JoinResult); ok {
return fmt.Sprintf("%v:%v", joinResult.Left, joinResult.Right)
}
return fmt.Sprintf("%v", item)
}
// Getter методы для совместимости с TypeScript версией
func (db *LineDb) GetActualCacheSize() int {
if db.cacheExternal != nil {
return db.cacheExternal.Size()
}
return 0
}
func (db *LineDb) GetLimitCacheSize() int {
return db.cacheSize
}
func (db *LineDb) GetCacheMap() map[string]*CacheEntry {
if db.cacheExternal != nil {
return db.cacheExternal.GetFlatCacheMap()
}
return make(map[string]*CacheEntry)
}
// GetCacheForTest возвращает сырую карту кэша для тестов: ключ — ключ кэша, значение — Data записи.
// Доступ только при accessKey == "give_me_cache", иначе возвращается пустая мапа.
func (db *LineDb) GetCacheForTest(accessKey string) map[string]any {
const testAccessKey = "give_me_cache"
if accessKey != testAccessKey {
return map[string]any{}
}
if db.cacheExternal == nil {
return map[string]any{}
}
flat := db.cacheExternal.GetFlatCacheMap()
out := make(map[string]any, len(flat))
for k, entry := range flat {
out[k] = entry.Data
}
return out
}
// GetIndexSnapshotForTest возвращает снимок индекса для тестов (только InMemoryIndexStore).
// Доступ только при accessKey == "give_me_cache". Ключ — "collection:field", значение — map[value][]IndexPosition.
func (db *LineDb) GetIndexSnapshotForTest(accessKey string) map[string]any {
const testAccessKey = "give_me_cache"
if accessKey != testAccessKey || db.indexStore == nil {
return map[string]any{}
}
if mem, ok := db.indexStore.(*InMemoryIndexStore); ok {
return mem.GetSnapshotForTest(testAccessKey)
}
return map[string]any{}
}
func (db *LineDb) GetFirstCollection() string {
return db.getFirstCollection()
}