Files
elowdb-go/pkg/linedb/index_memcached.go
2026-04-09 15:06:02 +06:00

210 lines
8.2 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package linedb
import (
"encoding/json"
"fmt"
"sync"
)
// MemcachedClient — минимальный интерфейс для работы с Memcached.
// Реализуйте его поверх вашего клиента (например github.com/bradfitz/gomemcache/memcache).
type MemcachedClient interface {
Get(key string) ([]byte, error)
Set(key string, value []byte, expireSeconds int) error
Delete(key string) error
}
// MemcachedIndexStore — реализация IndexStore с хранением индексов в Memcached.
type MemcachedIndexStore struct {
client MemcachedClient
prefix string
expireSec int
mu sync.Mutex
}
// MemcachedIndexStoreOptions — опции для MemcachedIndexStore.
type MemcachedIndexStoreOptions struct {
Client MemcachedClient
KeyPrefix string // префикс ключей (по умолчанию "linedb:idx:")
ExpireSeconds int // TTL записей (0 = без истечения)
}
// NewMemcachedIndexStore создаёт IndexStore с бэкендом Memcached.
func NewMemcachedIndexStore(opts MemcachedIndexStoreOptions) (*MemcachedIndexStore, error) {
if opts.Client == nil {
return nil, fmt.Errorf("MemcachedClient is required")
}
prefix := opts.KeyPrefix
if prefix == "" {
prefix = "linedb:idx:"
}
return &MemcachedIndexStore{
client: opts.Client,
prefix: prefix,
expireSec: opts.ExpireSeconds,
}, nil
}
func (s *MemcachedIndexStore) memKey(collection, field, value string) string {
// Memcached ограничивает длину ключа (обычно 250 байт)
k := s.prefix + collection + ":" + field + ":" + value
if len(k) > 250 {
// Хэшируем длинные значения — упрощённо берём последние 200 символов
if len(value) > 200 {
k = s.prefix + collection + ":" + field + ":" + value[len(value)-200:]
}
}
return k
}
// IndexRecord добавляет запись в индекс.
func (s *MemcachedIndexStore) IndexRecord(collection, partition string, fields []string, record map[string]any, lineIndex int) {
if len(fields) == 0 || record == nil {
return
}
if partition == "" {
partition = DefaultPartition
}
pos := IndexPosition{Partition: partition, LineIndex: lineIndex}
for _, field := range fields {
val := getFieldValue(record, field)
key := s.memKey(collection, field, val)
var list []IndexPosition
if data, err := s.client.Get(key); err == nil && len(data) > 0 {
_ = json.Unmarshal(data, &list)
}
list = append(list, pos)
if data, err := json.Marshal(list); err == nil {
_ = s.client.Set(key, data, s.expireSec)
}
}
}
// UnindexRecord удаляет запись из индекса.
func (s *MemcachedIndexStore) UnindexRecord(collection, partition string, fields []string, record map[string]any, lineIndex int) {
if len(fields) == 0 || record == nil {
return
}
if partition == "" {
partition = DefaultPartition
}
for _, field := range fields {
val := getFieldValue(record, field)
key := s.memKey(collection, field, val)
data, err := s.client.Get(key)
if err != nil || len(data) == 0 {
continue
}
var list []IndexPosition
if json.Unmarshal(data, &list) != nil {
continue
}
var newList []IndexPosition
for _, p := range list {
if p.Partition != partition || p.LineIndex != lineIndex {
newList = append(newList, p)
}
}
if len(newList) == 0 {
_ = s.client.Delete(key)
} else if data2, err := json.Marshal(newList); err == nil {
_ = s.client.Set(key, data2, s.expireSec)
}
}
}
// Lookup ищет позиции по полю и значению.
func (s *MemcachedIndexStore) Lookup(collection, field, value string) ([]IndexPosition, error) {
key := s.memKey(collection, field, value)
data, err := s.client.Get(key)
if err != nil {
return nil, nil
}
if len(data) == 0 {
return nil, nil
}
var positions []IndexPosition
if json.Unmarshal(data, &positions) != nil {
return nil, nil
}
out := make([]IndexPosition, len(positions))
copy(out, positions)
return out, nil
}
// Rebuild перестраивает вклад партиции в индекс.
func (s *MemcachedIndexStore) Rebuild(collection, partition string, fields []string, records []any, lineIndexes []int) error {
if partition == "" {
partition = DefaultPartition
}
s.mu.Lock()
defer s.mu.Unlock()
// Memcached не поддерживает удаление по паттерну — добавляем новые ключи.
// Для корректной замены партиции нужно слить с существующими: получить старые списки,
// удалить позиции с Partition==partition, добавить новые. Упрощённо: строим value -> []IndexPosition
// только для этой партиции и перезаписываем. Но тогда старые позиции других партиций
// для того же value будут потеряны. Поэтому нужен merge: Get, фильтровать по partition, добавлять новые.
// Для merge нужен список всех values — его нет. Реалистично: Rebuild вызывается для одной партиции,
// и мы должны merge. Без перечисления ключей в Memcached merge невозможен.
// Вариант: храним в одном ключе все позиции. При Rebuild(partition) нам нужны ВСЕ ключи
// collection:field:value. Их мы не знаем. Прагматичное решение: Rebuild для Memcached
// делает полную перезапись для этого partition — мы не можем удалить старые, просто добавляем.
// Это приведёт к дубликатам. Правильный путь: хранить составной ключ вида collection:field:value,
// и value — единственный. При Rebuild(partition) мы перезаписываем только те ключи, которые
// встречаются в records. Для остальных value старые позиции этой partition останутся.
// Значит нужен merge: для каждого value в records делаем Get, убираем старые Position с этой partition, добавляем новые.
byFieldValue := make(map[string]map[string][]IndexPosition)
for i, rec := range records {
lineIdx := i
if lineIndexes != nil && i < len(lineIndexes) {
lineIdx = lineIndexes[i]
}
recMap, ok := rec.(map[string]any)
if !ok {
continue
}
for _, field := range fields {
val := getFieldValue(recMap, field)
if byFieldValue[field] == nil {
byFieldValue[field] = make(map[string][]IndexPosition)
}
byFieldValue[field][val] = append(byFieldValue[field][val], IndexPosition{Partition: partition, LineIndex: lineIdx})
}
}
for field, valMap := range byFieldValue {
for val, newPositions := range valMap {
key := s.memKey(collection, field, val)
var list []IndexPosition
if data, err := s.client.Get(key); err == nil && len(data) > 0 {
_ = json.Unmarshal(data, &list)
}
// Удаляем старые позиции этой партиции
filtered := make([]IndexPosition, 0, len(list))
for _, p := range list {
if p.Partition != partition {
filtered = append(filtered, p)
}
}
list = append(filtered, newPositions...)
if data, err := json.Marshal(list); err == nil {
if err := s.client.Set(key, data, s.expireSec); err != nil {
return fmt.Errorf("memcached set %s: %w", key, err)
}
}
}
}
return nil
}
// Clear очищает индекс коллекции. Memcached не поддерживает delete by pattern,
// поэтому метод является заглушкой — старые ключи истекут по TTL.
func (s *MemcachedIndexStore) Clear(collection string) error {
_ = collection
// Опционально: сохранить в отдельном ключе список всех ключей индекса
// и удалять их по одному. Упрощённо — ничего не делаем.
return nil
}