finish index feature

This commit is contained in:
2026-04-07 11:49:42 +06:00
parent 8ba956d8c5
commit 15db6e81db
37 changed files with 1047 additions and 170 deletions

View File

@@ -58,18 +58,22 @@ func (s *MemcachedIndexStore) memKey(collection, field, value string) string {
}
// IndexRecord добавляет запись в индекс.
func (s *MemcachedIndexStore) IndexRecord(collection string, fields []string, record map[string]any, lineIndex int) {
func (s *MemcachedIndexStore) IndexRecord(collection, partition string, fields []string, record map[string]any, lineIndex int) {
if len(fields) == 0 || record == nil {
return
}
if partition == "" {
partition = DefaultPartition
}
pos := IndexPosition{Partition: partition, LineIndex: lineIndex}
for _, field := range fields {
val := getFieldValue(record, field)
key := s.memKey(collection, field, val)
var list []int
var list []IndexPosition
if data, err := s.client.Get(key); err == nil && len(data) > 0 {
_ = json.Unmarshal(data, &list)
}
list = append(list, lineIndex)
list = append(list, pos)
if data, err := json.Marshal(list); err == nil {
_ = s.client.Set(key, data, s.expireSec)
}
@@ -77,10 +81,13 @@ func (s *MemcachedIndexStore) IndexRecord(collection string, fields []string, re
}
// UnindexRecord удаляет запись из индекса.
func (s *MemcachedIndexStore) UnindexRecord(collection string, fields []string, record map[string]any, lineIndex int) {
func (s *MemcachedIndexStore) UnindexRecord(collection, partition string, fields []string, record map[string]any, lineIndex int) {
if len(fields) == 0 || record == nil {
return
}
if partition == "" {
partition = DefaultPartition
}
for _, field := range fields {
val := getFieldValue(record, field)
key := s.memKey(collection, field, val)
@@ -88,14 +95,14 @@ func (s *MemcachedIndexStore) UnindexRecord(collection string, fields []string,
if err != nil || len(data) == 0 {
continue
}
var list []int
var list []IndexPosition
if json.Unmarshal(data, &list) != nil {
continue
}
var newList []int
for _, i := range list {
if i != lineIndex {
newList = append(newList, i)
var newList []IndexPosition
for _, p := range list {
if p.Partition != partition || p.LineIndex != lineIndex {
newList = append(newList, p)
}
}
if len(newList) == 0 {
@@ -106,8 +113,8 @@ func (s *MemcachedIndexStore) UnindexRecord(collection string, fields []string,
}
}
// Lookup ищет индексы строк по полю и значению.
func (s *MemcachedIndexStore) Lookup(collection, field, value string) ([]int, error) {
// Lookup ищет позиции по полю и значению.
func (s *MemcachedIndexStore) Lookup(collection, field, value string) ([]IndexPosition, error) {
key := s.memKey(collection, field, value)
data, err := s.client.Get(key)
if err != nil {
@@ -116,27 +123,38 @@ func (s *MemcachedIndexStore) Lookup(collection, field, value string) ([]int, er
if len(data) == 0 {
return nil, nil
}
var indexes []int
if json.Unmarshal(data, &indexes) != nil {
var positions []IndexPosition
if json.Unmarshal(data, &positions) != nil {
return nil, nil
}
// Возвращаем копию, чтобы вызывающий код не модифицировал внутренний срез
out := make([]int, len(indexes))
copy(out, indexes)
out := make([]IndexPosition, len(positions))
copy(out, positions)
return out, nil
}
// Rebuild перестраивает индекс коллекции (удаляет старые ключи по префиксу и записывает новые).
func (s *MemcachedIndexStore) Rebuild(collection string, fields []string, records []any) error {
// Rebuild перестраивает вклад партиции в индекс.
func (s *MemcachedIndexStore) Rebuild(collection, partition string, fields []string, records []any) error {
if partition == "" {
partition = DefaultPartition
}
s.mu.Lock()
defer s.mu.Unlock()
// Memcached не поддерживает перечисление ключей по шаблону.
// Стратегия: перезаписываем все ключи для текущих записей.
// Старые ключи с другими значениями останутся до TTL.
// Строим value -> []lineIndex для каждого поля
byFieldValue := make(map[string]map[string][]int)
// Memcached не поддерживает удаление по паттерну — добавляем новые ключи.
// Для корректной замены партиции нужно слить с существующими: получить старые списки,
// удалить позиции с Partition==partition, добавить новые. Упрощённо: строим value -> []IndexPosition
// только для этой партиции и перезаписываем. Но тогда старые позиции других партиций
// для того же value будут потеряны. Поэтому нужен merge: Get, фильтровать по partition, добавлять новые.
// Для merge нужен список всех values — его нет. Реалистично: Rebuild вызывается для одной партиции,
// и мы должны merge. Без перечисления ключей в Memcached merge невозможен.
// Вариант: храним в одном ключе все позиции. При Rebuild(partition) нам нужны ВСЕ ключи
// collection:field:value. Их мы не знаем. Прагматичное решение: Rebuild для Memcached
// делает полную перезапись для этого partition — мы не можем удалить старые, просто добавляем.
// Это приведёт к дубликатам. Правильный путь: хранить составной ключ вида collection:field:value,
// и value — единственный. При Rebuild(partition) мы перезаписываем только те ключи, которые
// встречаются в records. Для остальных value старые позиции этой partition останутся.
// Значит нужен merge: для каждого value в records делаем Get, убираем старые Position с этой partition, добавляем новые.
byFieldValue := make(map[string]map[string][]IndexPosition)
for idx, rec := range records {
recMap, ok := rec.(map[string]any)
if !ok {
@@ -145,21 +163,31 @@ func (s *MemcachedIndexStore) Rebuild(collection string, fields []string, record
for _, field := range fields {
val := getFieldValue(recMap, field)
if byFieldValue[field] == nil {
byFieldValue[field] = make(map[string][]int)
byFieldValue[field] = make(map[string][]IndexPosition)
}
byFieldValue[field][val] = append(byFieldValue[field][val], idx)
byFieldValue[field][val] = append(byFieldValue[field][val], IndexPosition{Partition: partition, LineIndex: idx})
}
}
for field, valMap := range byFieldValue {
for val, list := range valMap {
for val, newPositions := range valMap {
key := s.memKey(collection, field, val)
data, err := json.Marshal(list)
if err != nil {
continue
var list []IndexPosition
if data, err := s.client.Get(key); err == nil && len(data) > 0 {
_ = json.Unmarshal(data, &list)
}
if err := s.client.Set(key, data, s.expireSec); err != nil {
return fmt.Errorf("memcached set %s: %w", key, err)
// Удаляем старые позиции этой партиции
filtered := make([]IndexPosition, 0, len(list))
for _, p := range list {
if p.Partition != partition {
filtered = append(filtered, p)
}
}
list = append(filtered, newPositions...)
if data, err := json.Marshal(list); err == nil {
if err := s.client.Set(key, data, s.expireSec); err != nil {
return fmt.Errorf("memcached set %s: %w", key, err)
}
}
}
}