before refactor index store to complex file-line pattern

This commit is contained in:
2026-03-12 16:13:44 +06:00
parent 491ccbea89
commit 8ba956d8c5
21 changed files with 7804 additions and 57 deletions

View File

@@ -23,10 +23,11 @@ type LineDb struct {
cacheExternal *RecordCache
nextIDFn func(any, string) (any, error)
lastIDManager *LastIDManager
// inTransaction bool
cacheTTL time.Duration
constructorOptions *LineDbOptions
initOptions *LineDbInitOptions
indexStore IndexStore
reindexDone chan struct{} // закрывается при Close, останавливает горутину ReindexByTimer
}
// NewLineDb создает новый экземпляр LineDb
@@ -57,6 +58,8 @@ func NewLineDb(options *LineDbOptions, adapters ...*JSONLFile) *LineDb {
db.collections[collectionName] = adapter.GetFilename()
}
db.indexStore = options.IndexStore
return db
}
@@ -119,6 +122,41 @@ func (db *LineDb) Init(force bool, initOptions *LineDbInitOptions) error {
db.collections[collectionName] = filename
}
// Индексы: создаём in-memory по умолчанию, если не задан IndexStore и есть IndexedFields
if db.indexStore == nil {
for _, opt := range initOptions.Collections {
if len(opt.IndexedFields) > 0 {
db.indexStore = NewInMemoryIndexStore()
break
}
}
}
if db.indexStore != nil {
for _, opt := range initOptions.Collections {
if len(opt.IndexedFields) == 0 || db.isCollectionPartitioned(opt.CollectionName) {
continue
}
adapter := db.adapters[opt.CollectionName]
if adapter == nil {
continue
}
records, err := adapter.Read(LineDbAdapterOptions{})
if err != nil {
return fmt.Errorf("failed to read collection %s for index rebuild: %w", opt.CollectionName, err)
}
if err := db.indexStore.Rebuild(opt.CollectionName, opt.IndexedFields, records); err != nil {
return fmt.Errorf("failed to rebuild index for %s: %w", opt.CollectionName, err)
}
}
}
// Периодический ребилд индексов
if initOptions.ReindexByTimer > 0 && db.indexStore != nil {
db.reindexDone = make(chan struct{})
interval := time.Duration(initOptions.ReindexByTimer) * time.Millisecond
go db.reindexByTimerLoop(interval)
}
return nil
}
@@ -247,6 +285,20 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO
}
}
// Обновляем индекс (полная пересборка по коллекции)
if db.indexStore != nil {
opts := db.getCollectionOptions(collectionName)
if opts != nil && len(opts.IndexedFields) > 0 && !db.isCollectionPartitioned(collectionName) {
adapter, exists := db.adapters[collectionName]
if exists {
allRecords, readErr := adapter.Read(LineDbAdapterOptions{})
if readErr == nil {
_ = db.indexStore.Rebuild(collectionName, opts.IndexedFields, allRecords)
}
}
}
}
return nil
}
@@ -284,7 +336,30 @@ func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOp
return fmt.Errorf("collection %s not found", collectionName)
}
return adapter.Write(data, options)
var startLine int
if options.DoIndexing && db.indexStore != nil {
if c, err := adapter.LineCount(); err == nil {
startLine = c
}
}
if err := adapter.Write(data, options); err != nil {
return err
}
// Точечное индексирование при DoIndexing
if options.DoIndexing && db.indexStore != nil {
opts := db.getCollectionOptions(collectionName)
if opts != nil && len(opts.IndexedFields) > 0 {
dataArray := db.normalizeDataArray(data)
for i, record := range dataArray {
if m, err := db.toMap(record); err == nil {
db.indexStore.IndexRecord(collectionName, opts.IndexedFields, m, startLine+i)
}
}
}
}
return nil
}
// Update обновляет записи в коллекции
@@ -294,17 +369,20 @@ func (db *LineDb) Update(data any, collectionName string, filter any, options Li
db.mutex.Lock()
defer db.mutex.Unlock()
}
if collectionName == "" {
collectionName = db.getFirstCollection()
}
// Конвертируем data в map (struct или map)
dataMap, err := db.toMap(data)
if err != nil {
return nil, fmt.Errorf("invalid update data format: %w", err)
}
// Нормализуем фильтр (строка/struct -> map)
normFilter, err := db.normalizeFilter(filter)
if err != nil {
return nil, err
}
filter = normFilter
// Проверяем конфликт ID
if filterMap, ok := filter.(map[string]any); ok {
if dataMap["id"] != nil && filterMap["id"] != nil {
@@ -321,7 +399,6 @@ func (db *LineDb) Update(data any, collectionName string, filter any, options Li
if err := db.checkUniqueFieldsUpdate(dataMap, filter, collectionName, options); err != nil {
return nil, err
}
// Проверяем партиционирование
if db.isCollectionPartitioned(collectionName) {
return db.updatePartitioned(dataMap, collectionName, filter, options)
@@ -333,7 +410,41 @@ func (db *LineDb) Update(data any, collectionName string, filter any, options Li
return nil, fmt.Errorf("collection %s not found", collectionName)
}
return adapter.Update(dataMap, filter, options)
// Пробуем точечный Update через индекс (без полного чтения файла)
opts := db.getCollectionOptions(collectionName)
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
result, used, err := db.tryIndexUpdate(adapter, filter, dataMap, collectionName, opts.IndexedFields, options)
if err != nil {
return nil, err
}
if used {
if db.cacheExternal != nil {
ids := extractIDsFromRecords(result)
db.cacheExternal.ClearEntriesContainingIDs(ids)
}
return result, nil
}
}
result, err := adapter.Update(dataMap, filter, options)
if err != nil {
return nil, err
}
// Перестраиваем индекс после Update
if db.indexStore != nil {
opts := db.getCollectionOptions(collectionName)
if opts != nil && len(opts.IndexedFields) > 0 {
allRecords, readErr := adapter.Read(LineDbAdapterOptions{})
if readErr == nil {
_ = db.indexStore.Rebuild(collectionName, opts.IndexedFields, allRecords)
}
}
}
if db.cacheExternal != nil {
ids := extractIDsFromRecords(result)
db.cacheExternal.ClearEntriesContainingIDs(ids)
}
return result, nil
}
// Delete удаляет записи из коллекции
@@ -348,6 +459,13 @@ func (db *LineDb) Delete(data any, collectionName string, options LineDbAdapterO
collectionName = db.getFirstCollection()
}
// Нормализуем фильтр удаления (строка/struct -> map)
normFilter, err := db.normalizeFilter(data)
if err != nil {
return nil, err
}
data = normFilter
// Проверяем партиционирование
if db.isCollectionPartitioned(collectionName) {
return db.deletePartitioned(data, collectionName, options)
@@ -359,7 +477,40 @@ func (db *LineDb) Delete(data any, collectionName string, options LineDbAdapterO
return nil, fmt.Errorf("collection %s not found", collectionName)
}
return adapter.Delete(data, options)
opts := db.getCollectionOptions(collectionName)
if db.indexStore != nil && opts != nil && len(opts.IndexedFields) > 0 {
result, used, err := db.tryIndexDelete(adapter, data, collectionName, opts.IndexedFields, options)
if err != nil {
return nil, err
}
if used {
if db.cacheExternal != nil {
ids := extractIDsFromRecords(result)
db.cacheExternal.ClearEntriesContainingIDs(ids)
}
return result, nil
}
}
result, err := adapter.Delete(data, options)
if err != nil {
return nil, err
}
// Перестраиваем индекс после Delete
if db.indexStore != nil {
opts := db.getCollectionOptions(collectionName)
if opts != nil && len(opts.IndexedFields) > 0 {
allRecords, readErr := adapter.Read(LineDbAdapterOptions{})
if readErr == nil {
_ = db.indexStore.Rebuild(collectionName, opts.IndexedFields, allRecords)
}
}
}
if db.cacheExternal != nil {
ids := extractIDsFromRecords(result)
db.cacheExternal.ClearEntriesContainingIDs(ids)
}
return result, nil
}
// Select выполняет выборку с поддержкой цепочки
@@ -382,6 +533,13 @@ func (db *LineDb) ReadByFilter(filter any, collectionName string, options LineDb
defer db.mutex.RUnlock()
}
// Нормализуем фильтр (строка/struct -> map)
normFilter, err := db.normalizeFilter(filter)
if err != nil {
return nil, err
}
filter = normFilter
if collectionName == "" {
collectionName = db.getFirstCollection()
}
@@ -400,12 +558,21 @@ func (db *LineDb) ReadByFilter(filter any, collectionName string, options LineDb
return db.readByFilterPartitioned(filter, collectionName, options)
}
// Обычная фильтрация
adapter, exists := db.adapters[collectionName]
if !exists {
return nil, fmt.Errorf("collection %s not found", collectionName)
}
// Используем индекс, если фильтр — одно поле из IndexedFields
if db.indexStore != nil {
if hit, result, err := db.tryIndexLookup(adapter, filter, collectionName, options); hit && err == nil {
if db.cacheExternal != nil && !options.InTransaction {
db.cacheExternal.Set(db.generateCacheKey(filter, collectionName), result)
}
return result, nil
}
}
result, err := adapter.ReadByFilter(filter, options)
if err != nil {
return nil, err
@@ -466,6 +633,12 @@ func (db *LineDb) Close() {
db.mutex.Lock()
defer db.mutex.Unlock()
// Останавливаем горутину ReindexByTimer
if db.reindexDone != nil {
close(db.reindexDone)
db.reindexDone = nil
}
// Закрываем все адаптеры
for _, adapter := range db.adapters {
adapter.Destroy()
@@ -483,6 +656,38 @@ func (db *LineDb) Close() {
db.partitionFunctions = make(map[string]func(any) string)
}
// reindexByTimerLoop выполняет полный ребилд всех индексов с заданным интервалом.
// Работает в отдельной горутине. Останавливается при закрытии db.reindexDone.
func (db *LineDb) reindexByTimerLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-db.reindexDone:
return
case <-ticker.C:
db.mutex.Lock()
if db.indexStore != nil && db.initOptions != nil {
for _, opt := range db.initOptions.Collections {
if len(opt.IndexedFields) == 0 || db.isCollectionPartitioned(opt.CollectionName) {
continue
}
adapter := db.adapters[opt.CollectionName]
if adapter == nil {
continue
}
records, err := adapter.Read(LineDbAdapterOptions{})
if err != nil {
continue
}
_ = db.indexStore.Rebuild(opt.CollectionName, opt.IndexedFields, records)
}
}
db.mutex.Unlock()
}
}
}
// Вспомогательные методы
func (db *LineDb) getFirstCollection() string {
@@ -499,6 +704,179 @@ func (db *LineDb) getBaseCollectionName(collectionName string) string {
return collectionName
}
// tryIndexLookup пытается выполнить поиск через индекс.
// Если фильтр — map с несколькими полями, выбирается первое поле из IndexedFields,
// по нему читаются строки через индекс, затем в памяти накладывается полный фильтр.
// Возвращает (true, result) если индекс использован, (false, nil) иначе.
func (db *LineDb) tryIndexLookup(adapter *JSONLFile, filter any, collectionName string, options LineDbAdapterOptions) (bool, []any, error) {
filterMap, ok := filter.(map[string]any)
if !ok || len(filterMap) == 0 {
return false, nil, nil
}
opts := adapter.GetOptions()
if len(opts.IndexedFields) == 0 {
return false, nil, nil
}
// Находим первое индексируемое поле, которое есть в фильтре
var field string
var value any
for _, idxField := range opts.IndexedFields {
if v, exists := filterMap[idxField]; exists {
field = idxField
value = v
break
}
}
if field == "" {
return false, nil, nil
}
valStr := valueToIndexKey(value)
indexes, err := db.indexStore.Lookup(collectionName, field, valStr)
if err != nil || indexes == nil || len(indexes) == 0 {
return false, nil, err
}
records, err := adapter.ReadByLineIndexes(indexes, options)
if err != nil {
return false, nil, err
}
// Если фильтр — одно поле, уже отфильтровано индексом
if len(filterMap) == 1 {
return true, records, nil
}
// Иначе накладываем полный фильтр по остальным полям в памяти
var filtered []any
for _, rec := range records {
if db.matchesFilter(rec, filter, options.StrictCompare) {
filtered = append(filtered, rec)
}
}
return true, filtered, nil
}
// tryIndexUpdate выполняет точечное обновление через индекс. Возвращает (result, used, err).
func (db *LineDb) tryIndexUpdate(adapter *JSONLFile, filter any, dataMap map[string]any, collectionName string, indexedFields []string, options LineDbAdapterOptions) ([]any, bool, error) {
filterMap, ok := filter.(map[string]any)
if !ok || len(filterMap) == 0 {
return nil, false, nil
}
var field string
var value any
for _, idxField := range indexedFields {
if v, exists := filterMap[idxField]; exists {
field = idxField
value = v
break
}
}
if field == "" {
return nil, false, nil
}
valStr := valueToIndexKey(value)
indexes, err := db.indexStore.Lookup(collectionName, field, valStr)
if err != nil || indexes == nil || len(indexes) == 0 {
return nil, false, err
}
opt := options
opt.InTransaction = true
records, positions, err := adapter.ReadByLineIndexesWithPositions(indexes, opt)
if err != nil {
return nil, false, err
}
var toUpdate []any
var toUpdatePos []int
for i, rec := range records {
if !db.matchesFilter(rec, filter, options.StrictCompare) {
continue
}
toUpdate = append(toUpdate, rec)
toUpdatePos = append(toUpdatePos, positions[i])
}
if len(toUpdate) == 0 {
return []any{}, true, nil
}
var updated []any
for _, rec := range toUpdate {
m, ok := rec.(map[string]any)
if !ok {
continue
}
upd := make(map[string]any)
for k, v := range m {
upd[k] = v
}
for k, v := range dataMap {
upd[k] = v
}
updated = append(updated, upd)
}
if err := adapter.WriteAtLineIndexes(updated, toUpdatePos, opt); err != nil {
return nil, false, err
}
for i, rec := range toUpdate {
if m, ok := rec.(map[string]any); ok {
db.indexStore.UnindexRecord(collectionName, indexedFields, m, toUpdatePos[i])
}
}
for i, rec := range updated {
if m, ok := rec.(map[string]any); ok {
db.indexStore.IndexRecord(collectionName, indexedFields, m, toUpdatePos[i])
}
}
return updated, true, nil
}
// tryIndexDelete выполняет точечное удаление через индекс. Возвращает (deletedRecords, used, err).
func (db *LineDb) tryIndexDelete(adapter *JSONLFile, filter any, collectionName string, indexedFields []string, options LineDbAdapterOptions) ([]any, bool, error) {
filterMap, ok := filter.(map[string]any)
if !ok || len(filterMap) == 0 {
return nil, false, nil
}
var field string
var value any
for _, idxField := range indexedFields {
if v, exists := filterMap[idxField]; exists {
field = idxField
value = v
break
}
}
if field == "" {
return nil, false, nil
}
valStr := valueToIndexKey(value)
indexes, err := db.indexStore.Lookup(collectionName, field, valStr)
if err != nil || indexes == nil || len(indexes) == 0 {
return nil, false, err
}
opt := options
opt.InTransaction = true
records, positions, err := adapter.ReadByLineIndexesWithPositions(indexes, opt)
if err != nil {
return nil, false, err
}
var toDel []any
var toDelPos []int
for i, rec := range records {
if !db.matchesFilter(rec, filter, options.StrictCompare) {
continue
}
toDel = append(toDel, rec)
toDelPos = append(toDelPos, positions[i])
}
if len(toDel) == 0 {
return []any{}, true, nil
}
if err := adapter.BlankLinesAtPositions(toDelPos, opt); err != nil {
return nil, false, err
}
for i, rec := range toDel {
if m, ok := rec.(map[string]any); ok {
db.indexStore.UnindexRecord(collectionName, indexedFields, m, toDelPos[i])
}
}
return toDel, true, nil
}
// getCollectionOptions возвращает опции коллекции (для партиционированных — опции базовой коллекции)
func (db *LineDb) getCollectionOptions(collectionName string) *JSONLFileOptions {
if db.initOptions == nil {
@@ -860,11 +1238,88 @@ func (db *LineDb) compareIDs(a, b any) bool {
return a == b
}
// extractIDsFromRecords извлекает id из списка записей (map[string]any).
func extractIDsFromRecords(records []any) []any {
if len(records) == 0 {
return nil
}
ids := make([]any, 0, len(records))
for _, rec := range records {
m, ok := rec.(map[string]any)
if !ok {
continue
}
if id, exists := m["id"]; exists && id != nil {
ids = append(ids, id)
}
}
return ids
}
func (db *LineDb) generateCacheKey(filter any, collectionName string) string {
// Упрощенная реализация генерации ключа кэша
return fmt.Sprintf("%s:%v", collectionName, filter)
}
// normalizeFilter приводит произвольный filter к более удобной форме:
// - string вида "field:value, field2:value2" -> map[string]any{"field": "value", "field2": "value2"}
// - простая string без ":" -> map[string]any{"id": value}
// - struct -> map[string]any (через toMap)
// - map[string]any и func(any) bool не меняются.
func (db *LineDb) normalizeFilter(filter any) (any, error) {
switch f := filter.(type) {
case nil:
return nil, nil
case map[string]any:
return f, nil
case func(any) bool:
return f, nil
case string:
s := strings.TrimSpace(f)
if s == "" {
return nil, nil
}
// pattern: field:value, field2:value2
if strings.Contains(s, ":") {
result := make(map[string]any)
parts := strings.Split(s, ",")
for _, p := range parts {
p = strings.TrimSpace(p)
if p == "" {
continue
}
kv := strings.SplitN(p, ":", 2)
if len(kv) != 2 {
continue
}
key := strings.TrimSpace(kv[0])
val := strings.TrimSpace(kv[1])
if key == "" {
continue
}
result[key] = val
}
if len(result) == 0 {
return map[string]any{"id": s}, nil
}
return result, nil
}
// простая строка — считаем фильтром по id
return map[string]any{"id": s}, nil
default:
if m, ok := f.(map[string]any); ok {
return m, nil
}
// Пытаемся трактовать как struct и конвертировать в map
m, err := db.toMap(f)
if err != nil {
// если не получилось — возвращаем как есть
return filter, nil
}
return m, nil
}
}
func (db *LineDb) updatePartitioned(data any, collectionName string, filter any, options LineDbAdapterOptions) ([]any, error) {
// Получаем все партиции
partitionFiles, err := db.getPartitionFiles(collectionName)
@@ -1198,6 +1653,37 @@ func (db *LineDb) GetCacheMap() map[string]*CacheEntry {
return make(map[string]*CacheEntry)
}
// GetCacheForTest возвращает сырую карту кэша для тестов: ключ — ключ кэша, значение — Data записи.
// Доступ только при accessKey == "give_me_cache", иначе возвращается пустая мапа.
func (db *LineDb) GetCacheForTest(accessKey string) map[string]any {
const testAccessKey = "give_me_cache"
if accessKey != testAccessKey {
return map[string]any{}
}
if db.cacheExternal == nil {
return map[string]any{}
}
flat := db.cacheExternal.GetFlatCacheMap()
out := make(map[string]any, len(flat))
for k, entry := range flat {
out[k] = entry.Data
}
return out
}
// GetIndexSnapshotForTest возвращает снимок индекса для тестов (только InMemoryIndexStore).
// Доступ только при accessKey == "give_me_cache". Ключ — "collection:field", значение — map[value][]int (номера строк).
func (db *LineDb) GetIndexSnapshotForTest(accessKey string) map[string]any {
const testAccessKey = "give_me_cache"
if accessKey != testAccessKey || db.indexStore == nil {
return map[string]any{}
}
if mem, ok := db.indexStore.(*InMemoryIndexStore); ok {
return mem.GetSnapshotForTest(testAccessKey)
}
return map[string]any{}
}
func (db *LineDb) GetFirstCollection() string {
return db.getFirstCollection()
}