package linedb import ( "encoding/json" "fmt" "sync" ) // MemcachedClient — минимальный интерфейс для работы с Memcached. // Реализуйте его поверх вашего клиента (например github.com/bradfitz/gomemcache/memcache). type MemcachedClient interface { Get(key string) ([]byte, error) Set(key string, value []byte, expireSeconds int) error Delete(key string) error } // MemcachedIndexStore — реализация IndexStore с хранением индексов в Memcached. type MemcachedIndexStore struct { client MemcachedClient prefix string expireSec int mu sync.Mutex } // MemcachedIndexStoreOptions — опции для MemcachedIndexStore. type MemcachedIndexStoreOptions struct { Client MemcachedClient KeyPrefix string // префикс ключей (по умолчанию "linedb:idx:") ExpireSeconds int // TTL записей (0 = без истечения) } // NewMemcachedIndexStore создаёт IndexStore с бэкендом Memcached. func NewMemcachedIndexStore(opts MemcachedIndexStoreOptions) (*MemcachedIndexStore, error) { if opts.Client == nil { return nil, fmt.Errorf("MemcachedClient is required") } prefix := opts.KeyPrefix if prefix == "" { prefix = "linedb:idx:" } return &MemcachedIndexStore{ client: opts.Client, prefix: prefix, expireSec: opts.ExpireSeconds, }, nil } func (s *MemcachedIndexStore) memKey(collection, field, value string) string { // Memcached ограничивает длину ключа (обычно 250 байт) k := s.prefix + collection + ":" + field + ":" + value if len(k) > 250 { // Хэшируем длинные значения — упрощённо берём последние 200 символов if len(value) > 200 { k = s.prefix + collection + ":" + field + ":" + value[len(value)-200:] } } return k } // IndexRecord добавляет запись в индекс. func (s *MemcachedIndexStore) IndexRecord(collection, partition string, fields []string, record map[string]any, lineIndex int) { if len(fields) == 0 || record == nil { return } if partition == "" { partition = DefaultPartition } pos := IndexPosition{Partition: partition, LineIndex: lineIndex} for _, field := range fields { val := getFieldValue(record, field) key := s.memKey(collection, field, val) var list []IndexPosition if data, err := s.client.Get(key); err == nil && len(data) > 0 { _ = json.Unmarshal(data, &list) } list = append(list, pos) if data, err := json.Marshal(list); err == nil { _ = s.client.Set(key, data, s.expireSec) } } } // UnindexRecord удаляет запись из индекса. func (s *MemcachedIndexStore) UnindexRecord(collection, partition string, fields []string, record map[string]any, lineIndex int) { if len(fields) == 0 || record == nil { return } if partition == "" { partition = DefaultPartition } for _, field := range fields { val := getFieldValue(record, field) key := s.memKey(collection, field, val) data, err := s.client.Get(key) if err != nil || len(data) == 0 { continue } var list []IndexPosition if json.Unmarshal(data, &list) != nil { continue } var newList []IndexPosition for _, p := range list { if p.Partition != partition || p.LineIndex != lineIndex { newList = append(newList, p) } } if len(newList) == 0 { _ = s.client.Delete(key) } else if data2, err := json.Marshal(newList); err == nil { _ = s.client.Set(key, data2, s.expireSec) } } } // Lookup ищет позиции по полю и значению. func (s *MemcachedIndexStore) Lookup(collection, field, value string) ([]IndexPosition, error) { key := s.memKey(collection, field, value) data, err := s.client.Get(key) if err != nil { return nil, nil } if len(data) == 0 { return nil, nil } var positions []IndexPosition if json.Unmarshal(data, &positions) != nil { return nil, nil } out := make([]IndexPosition, len(positions)) copy(out, positions) return out, nil } // Rebuild перестраивает вклад партиции в индекс. func (s *MemcachedIndexStore) Rebuild(collection, partition string, fields []string, records []any, lineIndexes []int) error { if partition == "" { partition = DefaultPartition } s.mu.Lock() defer s.mu.Unlock() // Memcached не поддерживает удаление по паттерну — добавляем новые ключи. // Для корректной замены партиции нужно слить с существующими: получить старые списки, // удалить позиции с Partition==partition, добавить новые. Упрощённо: строим value -> []IndexPosition // только для этой партиции и перезаписываем. Но тогда старые позиции других партиций // для того же value будут потеряны. Поэтому нужен merge: Get, фильтровать по partition, добавлять новые. // Для merge нужен список всех values — его нет. Реалистично: Rebuild вызывается для одной партиции, // и мы должны merge. Без перечисления ключей в Memcached merge невозможен. // Вариант: храним в одном ключе все позиции. При Rebuild(partition) нам нужны ВСЕ ключи // collection:field:value. Их мы не знаем. Прагматичное решение: Rebuild для Memcached // делает полную перезапись для этого partition — мы не можем удалить старые, просто добавляем. // Это приведёт к дубликатам. Правильный путь: хранить составной ключ вида collection:field:value, // и value — единственный. При Rebuild(partition) мы перезаписываем только те ключи, которые // встречаются в records. Для остальных value старые позиции этой partition останутся. // Значит нужен merge: для каждого value в records делаем Get, убираем старые Position с этой partition, добавляем новые. byFieldValue := make(map[string]map[string][]IndexPosition) for i, rec := range records { lineIdx := i if lineIndexes != nil && i < len(lineIndexes) { lineIdx = lineIndexes[i] } recMap, ok := rec.(map[string]any) if !ok { continue } for _, field := range fields { val := getFieldValue(recMap, field) if byFieldValue[field] == nil { byFieldValue[field] = make(map[string][]IndexPosition) } byFieldValue[field][val] = append(byFieldValue[field][val], IndexPosition{Partition: partition, LineIndex: lineIdx}) } } for field, valMap := range byFieldValue { for val, newPositions := range valMap { key := s.memKey(collection, field, val) var list []IndexPosition if data, err := s.client.Get(key); err == nil && len(data) > 0 { _ = json.Unmarshal(data, &list) } // Удаляем старые позиции этой партиции filtered := make([]IndexPosition, 0, len(list)) for _, p := range list { if p.Partition != partition { filtered = append(filtered, p) } } list = append(filtered, newPositions...) if data, err := json.Marshal(list); err == nil { if err := s.client.Set(key, data, s.expireSec); err != nil { return fmt.Errorf("memcached set %s: %w", key, err) } } } } return nil } // Clear очищает индекс коллекции. Memcached не поддерживает delete by pattern, // поэтому метод является заглушкой — старые ключи истекут по TTL. func (s *MemcachedIndexStore) Clear(collection string) error { _ = collection // Опционально: сохранить в отдельном ключе список всех ключей индекса // и удалять их по одному. Упрощённо — ничего не делаем. return nil }