206 lines
8.1 KiB
Go
206 lines
8.1 KiB
Go
package linedb
|
||
|
||
import (
|
||
"encoding/json"
|
||
"fmt"
|
||
"sync"
|
||
)
|
||
|
||
// MemcachedClient — минимальный интерфейс для работы с Memcached.
|
||
// Реализуйте его поверх вашего клиента (например github.com/bradfitz/gomemcache/memcache).
|
||
type MemcachedClient interface {
|
||
Get(key string) ([]byte, error)
|
||
Set(key string, value []byte, expireSeconds int) error
|
||
Delete(key string) error
|
||
}
|
||
|
||
// MemcachedIndexStore — реализация IndexStore с хранением индексов в Memcached.
|
||
type MemcachedIndexStore struct {
|
||
client MemcachedClient
|
||
prefix string
|
||
expireSec int
|
||
mu sync.Mutex
|
||
}
|
||
|
||
// MemcachedIndexStoreOptions — опции для MemcachedIndexStore.
|
||
type MemcachedIndexStoreOptions struct {
|
||
Client MemcachedClient
|
||
KeyPrefix string // префикс ключей (по умолчанию "linedb:idx:")
|
||
ExpireSeconds int // TTL записей (0 = без истечения)
|
||
}
|
||
|
||
// NewMemcachedIndexStore создаёт IndexStore с бэкендом Memcached.
|
||
func NewMemcachedIndexStore(opts MemcachedIndexStoreOptions) (*MemcachedIndexStore, error) {
|
||
if opts.Client == nil {
|
||
return nil, fmt.Errorf("MemcachedClient is required")
|
||
}
|
||
prefix := opts.KeyPrefix
|
||
if prefix == "" {
|
||
prefix = "linedb:idx:"
|
||
}
|
||
return &MemcachedIndexStore{
|
||
client: opts.Client,
|
||
prefix: prefix,
|
||
expireSec: opts.ExpireSeconds,
|
||
}, nil
|
||
}
|
||
|
||
func (s *MemcachedIndexStore) memKey(collection, field, value string) string {
|
||
// Memcached ограничивает длину ключа (обычно 250 байт)
|
||
k := s.prefix + collection + ":" + field + ":" + value
|
||
if len(k) > 250 {
|
||
// Хэшируем длинные значения — упрощённо берём последние 200 символов
|
||
if len(value) > 200 {
|
||
k = s.prefix + collection + ":" + field + ":" + value[len(value)-200:]
|
||
}
|
||
}
|
||
return k
|
||
}
|
||
|
||
// IndexRecord добавляет запись в индекс.
|
||
func (s *MemcachedIndexStore) IndexRecord(collection, partition string, fields []string, record map[string]any, lineIndex int) {
|
||
if len(fields) == 0 || record == nil {
|
||
return
|
||
}
|
||
if partition == "" {
|
||
partition = DefaultPartition
|
||
}
|
||
pos := IndexPosition{Partition: partition, LineIndex: lineIndex}
|
||
for _, field := range fields {
|
||
val := getFieldValue(record, field)
|
||
key := s.memKey(collection, field, val)
|
||
var list []IndexPosition
|
||
if data, err := s.client.Get(key); err == nil && len(data) > 0 {
|
||
_ = json.Unmarshal(data, &list)
|
||
}
|
||
list = append(list, pos)
|
||
if data, err := json.Marshal(list); err == nil {
|
||
_ = s.client.Set(key, data, s.expireSec)
|
||
}
|
||
}
|
||
}
|
||
|
||
// UnindexRecord удаляет запись из индекса.
|
||
func (s *MemcachedIndexStore) UnindexRecord(collection, partition string, fields []string, record map[string]any, lineIndex int) {
|
||
if len(fields) == 0 || record == nil {
|
||
return
|
||
}
|
||
if partition == "" {
|
||
partition = DefaultPartition
|
||
}
|
||
for _, field := range fields {
|
||
val := getFieldValue(record, field)
|
||
key := s.memKey(collection, field, val)
|
||
data, err := s.client.Get(key)
|
||
if err != nil || len(data) == 0 {
|
||
continue
|
||
}
|
||
var list []IndexPosition
|
||
if json.Unmarshal(data, &list) != nil {
|
||
continue
|
||
}
|
||
var newList []IndexPosition
|
||
for _, p := range list {
|
||
if p.Partition != partition || p.LineIndex != lineIndex {
|
||
newList = append(newList, p)
|
||
}
|
||
}
|
||
if len(newList) == 0 {
|
||
_ = s.client.Delete(key)
|
||
} else if data2, err := json.Marshal(newList); err == nil {
|
||
_ = s.client.Set(key, data2, s.expireSec)
|
||
}
|
||
}
|
||
}
|
||
|
||
// Lookup ищет позиции по полю и значению.
|
||
func (s *MemcachedIndexStore) Lookup(collection, field, value string) ([]IndexPosition, error) {
|
||
key := s.memKey(collection, field, value)
|
||
data, err := s.client.Get(key)
|
||
if err != nil {
|
||
return nil, nil
|
||
}
|
||
if len(data) == 0 {
|
||
return nil, nil
|
||
}
|
||
var positions []IndexPosition
|
||
if json.Unmarshal(data, &positions) != nil {
|
||
return nil, nil
|
||
}
|
||
out := make([]IndexPosition, len(positions))
|
||
copy(out, positions)
|
||
return out, nil
|
||
}
|
||
|
||
// Rebuild перестраивает вклад партиции в индекс.
|
||
func (s *MemcachedIndexStore) Rebuild(collection, partition string, fields []string, records []any) error {
|
||
if partition == "" {
|
||
partition = DefaultPartition
|
||
}
|
||
s.mu.Lock()
|
||
defer s.mu.Unlock()
|
||
|
||
// Memcached не поддерживает удаление по паттерну — добавляем новые ключи.
|
||
// Для корректной замены партиции нужно слить с существующими: получить старые списки,
|
||
// удалить позиции с Partition==partition, добавить новые. Упрощённо: строим value -> []IndexPosition
|
||
// только для этой партиции и перезаписываем. Но тогда старые позиции других партиций
|
||
// для того же value будут потеряны. Поэтому нужен merge: Get, фильтровать по partition, добавлять новые.
|
||
// Для merge нужен список всех values — его нет. Реалистично: Rebuild вызывается для одной партиции,
|
||
// и мы должны merge. Без перечисления ключей в Memcached merge невозможен.
|
||
// Вариант: храним в одном ключе все позиции. При Rebuild(partition) нам нужны ВСЕ ключи
|
||
// collection:field:value. Их мы не знаем. Прагматичное решение: Rebuild для Memcached
|
||
// делает полную перезапись для этого partition — мы не можем удалить старые, просто добавляем.
|
||
// Это приведёт к дубликатам. Правильный путь: хранить составной ключ вида collection:field:value,
|
||
// и value — единственный. При Rebuild(partition) мы перезаписываем только те ключи, которые
|
||
// встречаются в records. Для остальных value старые позиции этой partition останутся.
|
||
// Значит нужен merge: для каждого value в records делаем Get, убираем старые Position с этой partition, добавляем новые.
|
||
byFieldValue := make(map[string]map[string][]IndexPosition)
|
||
for idx, rec := range records {
|
||
recMap, ok := rec.(map[string]any)
|
||
if !ok {
|
||
continue
|
||
}
|
||
for _, field := range fields {
|
||
val := getFieldValue(recMap, field)
|
||
if byFieldValue[field] == nil {
|
||
byFieldValue[field] = make(map[string][]IndexPosition)
|
||
}
|
||
byFieldValue[field][val] = append(byFieldValue[field][val], IndexPosition{Partition: partition, LineIndex: idx})
|
||
}
|
||
}
|
||
|
||
for field, valMap := range byFieldValue {
|
||
for val, newPositions := range valMap {
|
||
key := s.memKey(collection, field, val)
|
||
var list []IndexPosition
|
||
if data, err := s.client.Get(key); err == nil && len(data) > 0 {
|
||
_ = json.Unmarshal(data, &list)
|
||
}
|
||
// Удаляем старые позиции этой партиции
|
||
filtered := make([]IndexPosition, 0, len(list))
|
||
for _, p := range list {
|
||
if p.Partition != partition {
|
||
filtered = append(filtered, p)
|
||
}
|
||
}
|
||
list = append(filtered, newPositions...)
|
||
if data, err := json.Marshal(list); err == nil {
|
||
if err := s.client.Set(key, data, s.expireSec); err != nil {
|
||
return fmt.Errorf("memcached set %s: %w", key, err)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// Clear очищает индекс коллекции. Memcached не поддерживает delete by pattern,
|
||
// поэтому метод является заглушкой — старые ключи истекут по TTL.
|
||
func (s *MemcachedIndexStore) Clear(collection string) error {
|
||
_ = collection
|
||
// Опционально: сохранить в отдельном ключе список всех ключей индекса
|
||
// и удалять их по одному. Упрощённо — ничего не делаем.
|
||
return nil
|
||
}
|
||
|