11 Commits

29 changed files with 927 additions and 258 deletions

View File

@@ -25,7 +25,7 @@ package main
import (
"log"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go"
)
func main() {

View File

@@ -5,7 +5,7 @@ import (
"os"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
func main() {

View File

@@ -5,7 +5,7 @@ import (
"log"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
type User struct {

View File

@@ -6,7 +6,7 @@ import (
"testing"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
func TestCustomJSONSerialization(t *testing.T) {

View File

@@ -10,7 +10,7 @@ import (
"strings"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
// User представляет пользователя

View File

@@ -7,7 +7,7 @@ import (
"os"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
// Item — структура для вставки (LineDB поддерживает struct и map)

View File

@@ -5,7 +5,7 @@ import (
"log"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
// testDeleteOneRecordByID тестирует удаление одной записи по ID

View File

@@ -7,7 +7,7 @@ import (
"os"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
func main() {

View File

@@ -5,7 +5,7 @@ import (
"log"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
// testSingleInsert тестирует вставку одной записи

View File

@@ -11,7 +11,7 @@ import (
"strings"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
// User представляет пользователя

View File

@@ -7,7 +7,7 @@ import (
"path/filepath"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
// Пример работы с партициями + индексируемой коллекцией.

View File

@@ -20,7 +20,7 @@ import (
"strings"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
const (

View File

@@ -10,7 +10,7 @@ import (
"strings"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
func main() {

View File

@@ -9,7 +9,7 @@ import (
"os"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
func main() {

2
go.mod
View File

@@ -1,4 +1,4 @@
module linedb
module direct-dev.ru/gitea/GiteaAdmin/elowdb-go
go 1.21

View File

@@ -1,6 +1,7 @@
package linedb
import (
"strings"
"sync"
"time"
)
@@ -94,6 +95,37 @@ func (c *RecordCache) Clear() {
c.cache = make(map[string]*CacheEntry)
}
// ClearCollection удаляет записи кэша, относящиеся к коллекции collectionName:
// ключи ReadByFilter ("collection:filter" и "collection_part:filter") и SetByRecord ("id:collection", "id:collection_part").
func (c *RecordCache) ClearCollection(collectionName string) {
if collectionName == "" {
c.Clear()
return
}
c.mutex.Lock()
defer c.mutex.Unlock()
for key := range c.cache {
if cacheKeyBelongsToCollection(key, collectionName) {
delete(c.cache, key)
}
}
}
func cacheKeyBelongsToCollection(key, collectionName string) bool {
if strings.HasPrefix(key, collectionName+":") {
return true
}
if strings.HasPrefix(key, collectionName+"_") {
return true
}
idx := strings.LastIndex(key, ":")
if idx < 0 {
return false
}
suf := key[idx+1:]
return suf == collectionName || strings.HasPrefix(suf, collectionName+"_")
}
// ClearEntriesContainingIDs удаляет из кэша только те записи, в данных которых
// встречается хотя бы один из переданных id. Если ids пуст — ничего не делает.
func (c *RecordCache) ClearEntriesContainingIDs(ids []any) {

View File

@@ -33,8 +33,9 @@ type IndexStore interface {
UnindexRecord(collection, partition string, fields []string, record map[string]any, lineIndex int)
// Rebuild перестраивает вклад в индекс для одной партиции (или всей коллекции, если одна партиция).
// partition — имя партиции; records — записи, позиция в срезе = lineIndex.
Rebuild(collection, partition string, fields []string, records []any) error
// lineIndexes[i] — 0-based номер физической строки в файле для records[i] (после пустых/удалённых строк).
// Если lineIndexes == nil или len != len(records), используются плотные индексы 0..len-1 (устаревшее поведение).
Rebuild(collection, partition string, fields []string, records []any, lineIndexes []int) error
// Clear очищает индекс коллекции (все партиции).
Clear(collection string) error
@@ -164,7 +165,7 @@ func (s *InMemoryIndexStore) Lookup(collection, field, value string) ([]IndexPos
}
// Rebuild перестраивает вклад партиции в индекс.
func (s *InMemoryIndexStore) Rebuild(collection, partition string, fields []string, records []any) error {
func (s *InMemoryIndexStore) Rebuild(collection, partition string, fields []string, records []any, lineIndexes []int) error {
if partition == "" {
partition = DefaultPartition
}
@@ -194,7 +195,11 @@ func (s *InMemoryIndexStore) Rebuild(collection, partition string, fields []stri
}
// Добавляем новые позиции
for idx, rec := range records {
for i, rec := range records {
lineIdx := i
if lineIndexes != nil && i < len(lineIndexes) {
lineIdx = lineIndexes[i]
}
recMap, ok := rec.(map[string]any)
if !ok {
continue
@@ -205,7 +210,7 @@ func (s *InMemoryIndexStore) Rebuild(collection, partition string, fields []stri
if s.index[key] == nil {
s.index[key] = make(map[string][]IndexPosition)
}
s.index[key][val] = append(s.index[key][val], IndexPosition{Partition: partition, LineIndex: idx})
s.index[key][val] = append(s.index[key][val], IndexPosition{Partition: partition, LineIndex: lineIdx})
}
}
return nil

View File

@@ -133,7 +133,7 @@ func (s *MemcachedIndexStore) Lookup(collection, field, value string) ([]IndexPo
}
// Rebuild перестраивает вклад партиции в индекс.
func (s *MemcachedIndexStore) Rebuild(collection, partition string, fields []string, records []any) error {
func (s *MemcachedIndexStore) Rebuild(collection, partition string, fields []string, records []any, lineIndexes []int) error {
if partition == "" {
partition = DefaultPartition
}
@@ -155,7 +155,11 @@ func (s *MemcachedIndexStore) Rebuild(collection, partition string, fields []str
// встречаются в records. Для остальных value старые позиции этой partition останутся.
// Значит нужен merge: для каждого value в records делаем Get, убираем старые Position с этой partition, добавляем новые.
byFieldValue := make(map[string]map[string][]IndexPosition)
for idx, rec := range records {
for i, rec := range records {
lineIdx := i
if lineIndexes != nil && i < len(lineIndexes) {
lineIdx = lineIndexes[i]
}
recMap, ok := rec.(map[string]any)
if !ok {
continue
@@ -165,7 +169,7 @@ func (s *MemcachedIndexStore) Rebuild(collection, partition string, fields []str
if byFieldValue[field] == nil {
byFieldValue[field] = make(map[string][]IndexPosition)
}
byFieldValue[field][val] = append(byFieldValue[field][val], IndexPosition{Partition: partition, LineIndex: idx})
byFieldValue[field][val] = append(byFieldValue[field][val], IndexPosition{Partition: partition, LineIndex: lineIdx})
}
}

View File

@@ -402,6 +402,133 @@ func (j *JSONLFile) Read(options LineDbAdapterOptions) ([]any, error) {
return records, scanner.Err()
}
// ReadWithPhysicalLineIndexes как Read, но для каждой записи возвращает 0-based индекс строки (слота),
// в том же смысле, что ReadByLineIndexes/WriteAtLineIndexes: смещение в файле = lineIndex * allocSize.
// Пустые слоты (пробелы после удаления и т.п.) пропускаются; индекс — номер слота, а не порядковый номер записи.
func (j *JSONLFile) ReadWithPhysicalLineIndexes(options LineDbAdapterOptions) ([]any, []int, error) {
if !options.InTransaction {
j.mutex.RLock()
defer j.mutex.RUnlock()
}
if !j.initialized {
return nil, nil, fmt.Errorf("file not initialized")
}
if j.allocSize <= 0 {
return nil, nil, fmt.Errorf("invalid allocSize")
}
file, err := os.Open(j.filename)
if err != nil {
return nil, nil, fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return nil, nil, fmt.Errorf("stat file: %w", err)
}
nSlots := int(info.Size()) / j.allocSize
buf := make([]byte, j.allocSize)
var records []any
var lineIndexes []int
for slot := 0; slot < nSlots; slot++ {
n, err := io.ReadFull(file, buf)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
if err != nil {
return nil, nil, fmt.Errorf("read slot %d: %w", slot, err)
}
if n != j.allocSize {
break
}
line := string(buf[:n])
line = strings.TrimRight(line, "\n")
line = strings.TrimRight(line, " ")
if strings.TrimSpace(line) == "" {
continue
}
if j.cypherKey != "" && !j.options.Encode {
decoded, err := base64.StdEncoding.DecodeString(line)
if err != nil {
if j.options.SkipInvalidLines {
continue
}
return nil, nil, fmt.Errorf("failed to decode base64: %w", err)
}
line = string(decoded)
}
var record any
if err := j.jsonUnmarshal([]byte(line), &record); err != nil {
if j.options.SkipInvalidLines {
continue
}
return nil, nil, fmt.Errorf("failed to unmarshal JSON: %w", err)
}
records = append(records, record)
lineIndexes = append(lineIndexes, slot)
}
return records, lineIndexes, nil
}
// HasBlankSlots возвращает true, если в файле есть хотя бы один пустой слот
// (пробелы + перевод строки — типичный след точечного удаления BlankLinesAtPositions).
func (j *JSONLFile) HasBlankSlots(options LineDbAdapterOptions) (bool, error) {
if !options.InTransaction {
j.mutex.RLock()
defer j.mutex.RUnlock()
}
if !j.initialized {
return false, fmt.Errorf("file not initialized")
}
if j.allocSize <= 0 {
return false, fmt.Errorf("invalid allocSize")
}
file, err := os.Open(j.filename)
if err != nil {
return false, fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return false, fmt.Errorf("stat file: %w", err)
}
nSlots := int(info.Size()) / j.allocSize
buf := make([]byte, j.allocSize)
for slot := 0; slot < nSlots; slot++ {
n, err := io.ReadFull(file, buf)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
if err != nil {
return false, fmt.Errorf("read slot %d: %w", slot, err)
}
if n != j.allocSize {
break
}
line := string(buf[:n])
line = strings.TrimRight(line, "\n")
line = strings.TrimRight(line, " ")
if strings.TrimSpace(line) == "" {
return true, nil
}
}
return false, nil
}
// ReadByLineIndexes читает записи по номерам строк (0-based) с использованием random access.
// Ожидается, что файл нормализован и каждая строка имеет длину allocSize байт (включая \n),
// как это обеспечивает Init/normalizeExistingFile/rewriteFile.
@@ -662,6 +789,86 @@ func (j *JSONLFile) BlankLinesAtPositions(lineIndexes []int, options LineDbAdapt
return nil
}
// CompactFile перезаписывает файл, копируя подряд только непустые слоты (сырые байты без JSON).
// Пустой слот: после обрезки \n и пробелов строка пустая — как после BlankLinesAtPositions.
// Один буфер на весь проход; без раскодирования и без удержания всех записей в памяти.
func (j *JSONLFile) CompactFile(options LineDbAdapterOptions) error {
if !options.InTransaction {
j.mutex.Lock()
defer j.mutex.Unlock()
}
if !j.initialized {
return fmt.Errorf("file not initialized")
}
if j.allocSize <= 0 {
return fmt.Errorf("invalid allocSize")
}
info, err := os.Stat(j.filename)
if err != nil {
return fmt.Errorf("compact stat: %w", err)
}
nSlots := int(info.Size()) / j.allocSize
tempFile := j.filename + ".tmp"
dst, err := os.Create(tempFile)
if err != nil {
return fmt.Errorf("compact create temp: %w", err)
}
src, err := os.Open(j.filename)
if err != nil {
dst.Close()
_ = os.Remove(tempFile)
return fmt.Errorf("compact open source: %w", err)
}
buf := make([]byte, j.allocSize)
compactErr := func() error {
defer src.Close()
for slot := 0; slot < nSlots; slot++ {
n, err := io.ReadFull(src, buf)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
if err != nil {
return fmt.Errorf("compact read slot %d: %w", slot, err)
}
if n != j.allocSize {
break
}
slot := buf[:n]
t := bytes.TrimRight(bytes.TrimRight(slot, "\n"), " ")
if len(bytes.TrimSpace(t)) == 0 {
continue
}
if _, err := dst.Write(slot); err != nil {
return fmt.Errorf("compact write: %w", err)
}
}
return nil
}()
if err := dst.Close(); err != nil {
_ = os.Remove(tempFile)
if compactErr != nil {
return compactErr
}
return fmt.Errorf("compact close temp: %w", err)
}
if compactErr != nil {
_ = os.Remove(tempFile)
return compactErr
}
if err := os.Rename(tempFile, j.filename); err != nil {
_ = os.Remove(tempFile)
return fmt.Errorf("compact rename: %w", err)
}
return nil
}
// LineCount возвращает число строк в файле (fileSize / allocSize).
// Используется для точечного индексирования после Write.
func (j *JSONLFile) LineCount() (int, error) {
@@ -851,6 +1058,11 @@ func (j *JSONLFile) GetEncryptKey() string {
return j.cypherKey
}
// Mutex возвращает RWMutex адаптера файла (тот же замок, что у Read/Write/Update/Delete при options.InTransaction == false).
func (j *JSONLFile) Mutex() *sync.RWMutex {
return &j.mutex
}
// matchesFilter проверяет соответствие записи фильтру
func (j *JSONLFile) matchesFilter(record any, filter any, strictCompare bool) bool {
if filter == nil {
@@ -896,14 +1108,18 @@ func (j *JSONLFile) valuesMatch(a, b any, strictCompare bool) bool {
return a == b
}
// Для строк - нечувствительное к регистру сравнение
if a == b {
return true
}
// Для строк - нечувствительное к регистру сравнение (равенство, не подстрока)
if aStr, ok := a.(string); ok {
if bStr, ok := b.(string); ok {
return strings.Contains(strings.ToLower(aStr), strings.ToLower(bStr))
return matchStringByPattern(aStr, bStr, strictCompare)
}
}
return a == b
return false
}
// rewriteFile перезаписывает файл новыми данными

View File

@@ -4,7 +4,9 @@ import (
"sync"
)
// LastIDManager управляет последними ID для коллекций
// LastIDManager управляет последними ID для коллекций.
// Ключ — полное имя коллекции (как в NextID/Insert), без обрезки по «_»:
// иначе user_data и events_A ломались бы на первый сегмент.
type LastIDManager struct {
lastIDs map[string]int
mutex sync.RWMutex
@@ -23,53 +25,32 @@ func GetLastIDManagerInstance() *LastIDManager {
return lastIDManagerInstance
}
// GetLastID получает последний ID для коллекции
func (l *LastIDManager) GetLastID(filename string) int {
// GetLastID получает последний ID для коллекции (ключ — полное имя коллекции).
func (l *LastIDManager) GetLastID(collectionKey string) int {
l.mutex.RLock()
defer l.mutex.RUnlock()
baseFileName := l.getBaseFileName(filename)
return l.lastIDs[baseFileName]
return l.lastIDs[collectionKey]
}
// SetLastID устанавливает последний ID для коллекции
func (l *LastIDManager) SetLastID(filename string, id int) {
// SetLastID устанавливает последний ID для коллекции, если id больше текущего.
func (l *LastIDManager) SetLastID(collectionKey string, id int) {
l.mutex.Lock()
defer l.mutex.Unlock()
baseFileName := l.getBaseFileName(filename)
currentID := l.lastIDs[baseFileName]
currentID := l.lastIDs[collectionKey]
if currentID < id {
l.lastIDs[baseFileName] = id
l.lastIDs[collectionKey] = id
}
}
// IncrementLastID увеличивает последний ID для коллекции
func (l *LastIDManager) IncrementLastID(filename string) int {
func (l *LastIDManager) IncrementLastID(collectionKey string) int {
l.mutex.Lock()
defer l.mutex.Unlock()
baseFileName := l.getBaseFileName(filename)
currentID := l.lastIDs[baseFileName]
currentID := l.lastIDs[collectionKey]
newID := currentID + 1
l.lastIDs[baseFileName] = newID
l.lastIDs[collectionKey] = newID
return newID
}
// getBaseFileName извлекает базовое имя файла
func (l *LastIDManager) getBaseFileName(filename string) string {
if idx := l.findPartitionSeparator(filename); idx != -1 {
return filename[:idx]
}
return filename
}
// findPartitionSeparator находит разделитель партиции
func (l *LastIDManager) findPartitionSeparator(filename string) int {
for i, char := range filename {
if char == '_' {
return i
}
}
return -1
}

File diff suppressed because it is too large Load Diff

118
pkg/linedb/string_match.go Normal file
View File

@@ -0,0 +1,118 @@
package linedb
import "strings"
type stringMatchMode int
const (
stringMatchEqual stringMatchMode = iota
stringMatchContains
stringMatchHasPrefix
stringMatchHasSuffix
)
func matchStringByPattern(value string, pattern string, strictCompare bool) bool {
mode, lit := parsePercentPattern(pattern)
if !strictCompare {
// For substring checks we use ToLower (close enough to EqualFold intent).
value = strings.ToLower(value)
lit = strings.ToLower(lit)
}
switch mode {
case stringMatchContains:
return strings.Contains(value, lit)
case stringMatchHasPrefix:
return strings.HasPrefix(value, lit)
case stringMatchHasSuffix:
return strings.HasSuffix(value, lit)
default:
if strictCompare {
return value == lit
}
return strings.EqualFold(value, lit)
}
}
// parsePercentPattern interprets unescaped % at the start/end of pattern:
// - %foo% => contains "foo"
// - foo% => hasPrefix "foo"
// - %foo => hasSuffix "foo"
// - foo => equal "foo"
// Escaping: \% means literal %, \\ means literal \ (only affects escape processing).
func parsePercentPattern(pattern string) (stringMatchMode, string) {
if pattern == "" {
return stringMatchEqual, ""
}
leading := pattern[0] == '%'
trailing := len(pattern) > 0 && pattern[len(pattern)-1] == '%' && !isEscapedAt(pattern, len(pattern)-1)
start := 0
end := len(pattern)
if leading {
start = 1
}
if trailing && end > start {
end--
}
lit := unescapePercents(pattern[start:end])
switch {
case leading && trailing:
return stringMatchContains, lit
case trailing:
return stringMatchHasPrefix, lit
case leading:
return stringMatchHasSuffix, lit
default:
return stringMatchEqual, unescapePercents(pattern)
}
}
func isEscapedAt(s string, idx int) bool {
// idx is escaped if preceded by an odd number of backslashes.
if idx <= 0 || idx >= len(s) {
return false
}
n := 0
for i := idx - 1; i >= 0 && s[i] == '\\'; i-- {
n++
}
return n%2 == 1
}
func unescapePercents(s string) string {
if s == "" {
return ""
}
var b strings.Builder
b.Grow(len(s))
esc := false
for i := 0; i < len(s); i++ {
ch := s[i]
if esc {
// Only unescape % and \; keep backslash for other chars.
if ch == '%' || ch == '\\' {
b.WriteByte(ch)
} else {
b.WriteByte('\\')
b.WriteByte(ch)
}
esc = false
continue
}
if ch == '\\' {
esc = true
continue
}
b.WriteByte(ch)
}
if esc {
// dangling backslash
b.WriteByte('\\')
}
return b.String()
}

View File

@@ -105,6 +105,10 @@ type JSONLFileOptions struct {
SkipInvalidLines bool `json:"skipInvalidLines,omitempty"`
DecryptKey string `json:"decryptKey,omitempty"`
ConvertStringIdToNumber bool `json:"convertStringIdToNumber,omitempty"`
// RebuildIndexesOnFileChange — после каждой мутации файла коллекции (Insert/Update/Delete через LineDb)
// полная пересборка всех индексов по этой логической коллекции; частичное индексирование (IndexRecord),
// tryIndexUpdate/tryIndexDelete и периодический ребилд по таймеру для этой коллекции отключаются.
RebuildIndexesOnFileChange bool `json:"rebuildIndexesOnFileChange,omitempty"`
// Функции сериализации и десериализации JSON
JSONMarshal func(any) ([]byte, error) `json:"-"`
JSONUnmarshal func([]byte, any) error `json:"-"`

View File

@@ -5,7 +5,7 @@ import (
"testing"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
// mockMemcached — in-memory реализация MemcachedClient для тестов.
@@ -111,7 +111,7 @@ func TestIndexEncodedCollectionCache(t *testing.T) {
}
initOptions := &linedb.LineDbInitOptions{
CacheSize: 100,
CacheTTL: time.Minute*10,
CacheTTL: time.Minute * 10,
DBFolder: "./data/test-linedb-index-enc",
Collections: []linedb.JSONLFileOptions{
{

View File

@@ -8,7 +8,7 @@ import (
"testing"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
func setupPointUpdateDB(t *testing.T) (*linedb.LineDb, func()) {

View File

@@ -7,7 +7,7 @@ import (
"testing"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
func TestLineDbBasic(t *testing.T) {

View File

@@ -5,7 +5,7 @@ import (
"path/filepath"
"testing"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
const dbDir = "./data/nonpartitioned"

View File

@@ -7,7 +7,7 @@ import (
"strings"
"testing"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
// failingLookupStore — IndexStore, у которого Lookup всегда возвращает ошибку (для проверки FailOnFailureIndexRead).
@@ -27,8 +27,8 @@ func (f *failingLookupStore) UnindexRecord(collection, partition string, fields
f.inner.UnindexRecord(collection, partition, fields, record, lineIndex)
}
func (f *failingLookupStore) Rebuild(collection, partition string, fields []string, records []any) error {
return f.inner.Rebuild(collection, partition, fields, records)
func (f *failingLookupStore) Rebuild(collection, partition string, fields []string, records []any, lineIndexes []int) error {
return f.inner.Rebuild(collection, partition, fields, records, lineIndexes)
}
func (f *failingLookupStore) Clear(collection string) error {

View File

@@ -6,7 +6,7 @@ import (
"testing"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
const dbDir = "./data/partitioned"