Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| dfff1ffd7e | |||
| adee0bc805 | |||
| 5f753c3e93 | |||
| 9861e09246 | |||
| 0a36acda24 | |||
| 40c6935b4f | |||
| 0b1deabbaa | |||
| fafe0d4a66 | |||
| a1bbd9177a | |||
| 7894474f2d | |||
| 17d1a538ca |
@@ -25,7 +25,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type User struct {
|
type User struct {
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestCustomJSONSerialization(t *testing.T) {
|
func TestCustomJSONSerialization(t *testing.T) {
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// User представляет пользователя
|
// User представляет пользователя
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Item — структура для вставки (LineDB поддерживает struct и map)
|
// Item — структура для вставки (LineDB поддерживает struct и map)
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// testDeleteOneRecordByID тестирует удаление одной записи по ID
|
// testDeleteOneRecordByID тестирует удаление одной записи по ID
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// testSingleInsert тестирует вставку одной записи
|
// testSingleInsert тестирует вставку одной записи
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// User представляет пользователя
|
// User представляет пользователя
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Пример работы с партициями + индексируемой коллекцией.
|
// Пример работы с партициями + индексируемой коллекцией.
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -1,4 +1,4 @@
|
|||||||
module linedb
|
module direct-dev.ru/gitea/GiteaAdmin/elowdb-go
|
||||||
|
|
||||||
go 1.21
|
go 1.21
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package linedb
|
package linedb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -94,6 +95,37 @@ func (c *RecordCache) Clear() {
|
|||||||
c.cache = make(map[string]*CacheEntry)
|
c.cache = make(map[string]*CacheEntry)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ClearCollection удаляет записи кэша, относящиеся к коллекции collectionName:
|
||||||
|
// ключи ReadByFilter ("collection:filter" и "collection_part:filter") и SetByRecord ("id:collection", "id:collection_part").
|
||||||
|
func (c *RecordCache) ClearCollection(collectionName string) {
|
||||||
|
if collectionName == "" {
|
||||||
|
c.Clear()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.mutex.Lock()
|
||||||
|
defer c.mutex.Unlock()
|
||||||
|
for key := range c.cache {
|
||||||
|
if cacheKeyBelongsToCollection(key, collectionName) {
|
||||||
|
delete(c.cache, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func cacheKeyBelongsToCollection(key, collectionName string) bool {
|
||||||
|
if strings.HasPrefix(key, collectionName+":") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if strings.HasPrefix(key, collectionName+"_") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
idx := strings.LastIndex(key, ":")
|
||||||
|
if idx < 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
suf := key[idx+1:]
|
||||||
|
return suf == collectionName || strings.HasPrefix(suf, collectionName+"_")
|
||||||
|
}
|
||||||
|
|
||||||
// ClearEntriesContainingIDs удаляет из кэша только те записи, в данных которых
|
// ClearEntriesContainingIDs удаляет из кэша только те записи, в данных которых
|
||||||
// встречается хотя бы один из переданных id. Если ids пуст — ничего не делает.
|
// встречается хотя бы один из переданных id. Если ids пуст — ничего не делает.
|
||||||
func (c *RecordCache) ClearEntriesContainingIDs(ids []any) {
|
func (c *RecordCache) ClearEntriesContainingIDs(ids []any) {
|
||||||
|
|||||||
@@ -33,8 +33,9 @@ type IndexStore interface {
|
|||||||
UnindexRecord(collection, partition string, fields []string, record map[string]any, lineIndex int)
|
UnindexRecord(collection, partition string, fields []string, record map[string]any, lineIndex int)
|
||||||
|
|
||||||
// Rebuild перестраивает вклад в индекс для одной партиции (или всей коллекции, если одна партиция).
|
// Rebuild перестраивает вклад в индекс для одной партиции (или всей коллекции, если одна партиция).
|
||||||
// partition — имя партиции; records — записи, позиция в срезе = lineIndex.
|
// lineIndexes[i] — 0-based номер физической строки в файле для records[i] (после пустых/удалённых строк).
|
||||||
Rebuild(collection, partition string, fields []string, records []any) error
|
// Если lineIndexes == nil или len != len(records), используются плотные индексы 0..len-1 (устаревшее поведение).
|
||||||
|
Rebuild(collection, partition string, fields []string, records []any, lineIndexes []int) error
|
||||||
|
|
||||||
// Clear очищает индекс коллекции (все партиции).
|
// Clear очищает индекс коллекции (все партиции).
|
||||||
Clear(collection string) error
|
Clear(collection string) error
|
||||||
@@ -164,7 +165,7 @@ func (s *InMemoryIndexStore) Lookup(collection, field, value string) ([]IndexPos
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Rebuild перестраивает вклад партиции в индекс.
|
// Rebuild перестраивает вклад партиции в индекс.
|
||||||
func (s *InMemoryIndexStore) Rebuild(collection, partition string, fields []string, records []any) error {
|
func (s *InMemoryIndexStore) Rebuild(collection, partition string, fields []string, records []any, lineIndexes []int) error {
|
||||||
if partition == "" {
|
if partition == "" {
|
||||||
partition = DefaultPartition
|
partition = DefaultPartition
|
||||||
}
|
}
|
||||||
@@ -194,7 +195,11 @@ func (s *InMemoryIndexStore) Rebuild(collection, partition string, fields []stri
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Добавляем новые позиции
|
// Добавляем новые позиции
|
||||||
for idx, rec := range records {
|
for i, rec := range records {
|
||||||
|
lineIdx := i
|
||||||
|
if lineIndexes != nil && i < len(lineIndexes) {
|
||||||
|
lineIdx = lineIndexes[i]
|
||||||
|
}
|
||||||
recMap, ok := rec.(map[string]any)
|
recMap, ok := rec.(map[string]any)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
@@ -205,7 +210,7 @@ func (s *InMemoryIndexStore) Rebuild(collection, partition string, fields []stri
|
|||||||
if s.index[key] == nil {
|
if s.index[key] == nil {
|
||||||
s.index[key] = make(map[string][]IndexPosition)
|
s.index[key] = make(map[string][]IndexPosition)
|
||||||
}
|
}
|
||||||
s.index[key][val] = append(s.index[key][val], IndexPosition{Partition: partition, LineIndex: idx})
|
s.index[key][val] = append(s.index[key][val], IndexPosition{Partition: partition, LineIndex: lineIdx})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -133,7 +133,7 @@ func (s *MemcachedIndexStore) Lookup(collection, field, value string) ([]IndexPo
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Rebuild перестраивает вклад партиции в индекс.
|
// Rebuild перестраивает вклад партиции в индекс.
|
||||||
func (s *MemcachedIndexStore) Rebuild(collection, partition string, fields []string, records []any) error {
|
func (s *MemcachedIndexStore) Rebuild(collection, partition string, fields []string, records []any, lineIndexes []int) error {
|
||||||
if partition == "" {
|
if partition == "" {
|
||||||
partition = DefaultPartition
|
partition = DefaultPartition
|
||||||
}
|
}
|
||||||
@@ -155,7 +155,11 @@ func (s *MemcachedIndexStore) Rebuild(collection, partition string, fields []str
|
|||||||
// встречаются в records. Для остальных value старые позиции этой partition останутся.
|
// встречаются в records. Для остальных value старые позиции этой partition останутся.
|
||||||
// Значит нужен merge: для каждого value в records делаем Get, убираем старые Position с этой partition, добавляем новые.
|
// Значит нужен merge: для каждого value в records делаем Get, убираем старые Position с этой partition, добавляем новые.
|
||||||
byFieldValue := make(map[string]map[string][]IndexPosition)
|
byFieldValue := make(map[string]map[string][]IndexPosition)
|
||||||
for idx, rec := range records {
|
for i, rec := range records {
|
||||||
|
lineIdx := i
|
||||||
|
if lineIndexes != nil && i < len(lineIndexes) {
|
||||||
|
lineIdx = lineIndexes[i]
|
||||||
|
}
|
||||||
recMap, ok := rec.(map[string]any)
|
recMap, ok := rec.(map[string]any)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
@@ -165,7 +169,7 @@ func (s *MemcachedIndexStore) Rebuild(collection, partition string, fields []str
|
|||||||
if byFieldValue[field] == nil {
|
if byFieldValue[field] == nil {
|
||||||
byFieldValue[field] = make(map[string][]IndexPosition)
|
byFieldValue[field] = make(map[string][]IndexPosition)
|
||||||
}
|
}
|
||||||
byFieldValue[field][val] = append(byFieldValue[field][val], IndexPosition{Partition: partition, LineIndex: idx})
|
byFieldValue[field][val] = append(byFieldValue[field][val], IndexPosition{Partition: partition, LineIndex: lineIdx})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -402,6 +402,133 @@ func (j *JSONLFile) Read(options LineDbAdapterOptions) ([]any, error) {
|
|||||||
return records, scanner.Err()
|
return records, scanner.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReadWithPhysicalLineIndexes как Read, но для каждой записи возвращает 0-based индекс строки (слота),
|
||||||
|
// в том же смысле, что ReadByLineIndexes/WriteAtLineIndexes: смещение в файле = lineIndex * allocSize.
|
||||||
|
// Пустые слоты (пробелы после удаления и т.п.) пропускаются; индекс — номер слота, а не порядковый номер записи.
|
||||||
|
func (j *JSONLFile) ReadWithPhysicalLineIndexes(options LineDbAdapterOptions) ([]any, []int, error) {
|
||||||
|
if !options.InTransaction {
|
||||||
|
j.mutex.RLock()
|
||||||
|
defer j.mutex.RUnlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
if !j.initialized {
|
||||||
|
return nil, nil, fmt.Errorf("file not initialized")
|
||||||
|
}
|
||||||
|
if j.allocSize <= 0 {
|
||||||
|
return nil, nil, fmt.Errorf("invalid allocSize")
|
||||||
|
}
|
||||||
|
|
||||||
|
file, err := os.Open(j.filename)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("failed to open file: %w", err)
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
info, err := file.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("stat file: %w", err)
|
||||||
|
}
|
||||||
|
nSlots := int(info.Size()) / j.allocSize
|
||||||
|
|
||||||
|
buf := make([]byte, j.allocSize)
|
||||||
|
var records []any
|
||||||
|
var lineIndexes []int
|
||||||
|
|
||||||
|
for slot := 0; slot < nSlots; slot++ {
|
||||||
|
n, err := io.ReadFull(file, buf)
|
||||||
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("read slot %d: %w", slot, err)
|
||||||
|
}
|
||||||
|
if n != j.allocSize {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
line := string(buf[:n])
|
||||||
|
line = strings.TrimRight(line, "\n")
|
||||||
|
line = strings.TrimRight(line, " ")
|
||||||
|
if strings.TrimSpace(line) == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if j.cypherKey != "" && !j.options.Encode {
|
||||||
|
decoded, err := base64.StdEncoding.DecodeString(line)
|
||||||
|
if err != nil {
|
||||||
|
if j.options.SkipInvalidLines {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil, nil, fmt.Errorf("failed to decode base64: %w", err)
|
||||||
|
}
|
||||||
|
line = string(decoded)
|
||||||
|
}
|
||||||
|
|
||||||
|
var record any
|
||||||
|
if err := j.jsonUnmarshal([]byte(line), &record); err != nil {
|
||||||
|
if j.options.SkipInvalidLines {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil, nil, fmt.Errorf("failed to unmarshal JSON: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
records = append(records, record)
|
||||||
|
lineIndexes = append(lineIndexes, slot)
|
||||||
|
}
|
||||||
|
|
||||||
|
return records, lineIndexes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasBlankSlots возвращает true, если в файле есть хотя бы один пустой слот
|
||||||
|
// (пробелы + перевод строки — типичный след точечного удаления BlankLinesAtPositions).
|
||||||
|
func (j *JSONLFile) HasBlankSlots(options LineDbAdapterOptions) (bool, error) {
|
||||||
|
if !options.InTransaction {
|
||||||
|
j.mutex.RLock()
|
||||||
|
defer j.mutex.RUnlock()
|
||||||
|
}
|
||||||
|
if !j.initialized {
|
||||||
|
return false, fmt.Errorf("file not initialized")
|
||||||
|
}
|
||||||
|
if j.allocSize <= 0 {
|
||||||
|
return false, fmt.Errorf("invalid allocSize")
|
||||||
|
}
|
||||||
|
|
||||||
|
file, err := os.Open(j.filename)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("failed to open file: %w", err)
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
info, err := file.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("stat file: %w", err)
|
||||||
|
}
|
||||||
|
nSlots := int(info.Size()) / j.allocSize
|
||||||
|
buf := make([]byte, j.allocSize)
|
||||||
|
|
||||||
|
for slot := 0; slot < nSlots; slot++ {
|
||||||
|
n, err := io.ReadFull(file, buf)
|
||||||
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("read slot %d: %w", slot, err)
|
||||||
|
}
|
||||||
|
if n != j.allocSize {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
line := string(buf[:n])
|
||||||
|
line = strings.TrimRight(line, "\n")
|
||||||
|
line = strings.TrimRight(line, " ")
|
||||||
|
if strings.TrimSpace(line) == "" {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ReadByLineIndexes читает записи по номерам строк (0-based) с использованием random access.
|
// ReadByLineIndexes читает записи по номерам строк (0-based) с использованием random access.
|
||||||
// Ожидается, что файл нормализован и каждая строка имеет длину allocSize байт (включая \n),
|
// Ожидается, что файл нормализован и каждая строка имеет длину allocSize байт (включая \n),
|
||||||
// как это обеспечивает Init/normalizeExistingFile/rewriteFile.
|
// как это обеспечивает Init/normalizeExistingFile/rewriteFile.
|
||||||
@@ -662,6 +789,86 @@ func (j *JSONLFile) BlankLinesAtPositions(lineIndexes []int, options LineDbAdapt
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CompactFile перезаписывает файл, копируя подряд только непустые слоты (сырые байты без JSON).
|
||||||
|
// Пустой слот: после обрезки \n и пробелов строка пустая — как после BlankLinesAtPositions.
|
||||||
|
// Один буфер на весь проход; без раскодирования и без удержания всех записей в памяти.
|
||||||
|
func (j *JSONLFile) CompactFile(options LineDbAdapterOptions) error {
|
||||||
|
if !options.InTransaction {
|
||||||
|
j.mutex.Lock()
|
||||||
|
defer j.mutex.Unlock()
|
||||||
|
}
|
||||||
|
if !j.initialized {
|
||||||
|
return fmt.Errorf("file not initialized")
|
||||||
|
}
|
||||||
|
if j.allocSize <= 0 {
|
||||||
|
return fmt.Errorf("invalid allocSize")
|
||||||
|
}
|
||||||
|
|
||||||
|
info, err := os.Stat(j.filename)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("compact stat: %w", err)
|
||||||
|
}
|
||||||
|
nSlots := int(info.Size()) / j.allocSize
|
||||||
|
|
||||||
|
tempFile := j.filename + ".tmp"
|
||||||
|
dst, err := os.Create(tempFile)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("compact create temp: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
src, err := os.Open(j.filename)
|
||||||
|
if err != nil {
|
||||||
|
dst.Close()
|
||||||
|
_ = os.Remove(tempFile)
|
||||||
|
return fmt.Errorf("compact open source: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := make([]byte, j.allocSize)
|
||||||
|
compactErr := func() error {
|
||||||
|
defer src.Close()
|
||||||
|
for slot := 0; slot < nSlots; slot++ {
|
||||||
|
n, err := io.ReadFull(src, buf)
|
||||||
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("compact read slot %d: %w", slot, err)
|
||||||
|
}
|
||||||
|
if n != j.allocSize {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
slot := buf[:n]
|
||||||
|
t := bytes.TrimRight(bytes.TrimRight(slot, "\n"), " ")
|
||||||
|
if len(bytes.TrimSpace(t)) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, err := dst.Write(slot); err != nil {
|
||||||
|
return fmt.Errorf("compact write: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}()
|
||||||
|
|
||||||
|
if err := dst.Close(); err != nil {
|
||||||
|
_ = os.Remove(tempFile)
|
||||||
|
if compactErr != nil {
|
||||||
|
return compactErr
|
||||||
|
}
|
||||||
|
return fmt.Errorf("compact close temp: %w", err)
|
||||||
|
}
|
||||||
|
if compactErr != nil {
|
||||||
|
_ = os.Remove(tempFile)
|
||||||
|
return compactErr
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.Rename(tempFile, j.filename); err != nil {
|
||||||
|
_ = os.Remove(tempFile)
|
||||||
|
return fmt.Errorf("compact rename: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// LineCount возвращает число строк в файле (fileSize / allocSize).
|
// LineCount возвращает число строк в файле (fileSize / allocSize).
|
||||||
// Используется для точечного индексирования после Write.
|
// Используется для точечного индексирования после Write.
|
||||||
func (j *JSONLFile) LineCount() (int, error) {
|
func (j *JSONLFile) LineCount() (int, error) {
|
||||||
@@ -851,6 +1058,11 @@ func (j *JSONLFile) GetEncryptKey() string {
|
|||||||
return j.cypherKey
|
return j.cypherKey
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mutex возвращает RWMutex адаптера файла (тот же замок, что у Read/Write/Update/Delete при options.InTransaction == false).
|
||||||
|
func (j *JSONLFile) Mutex() *sync.RWMutex {
|
||||||
|
return &j.mutex
|
||||||
|
}
|
||||||
|
|
||||||
// matchesFilter проверяет соответствие записи фильтру
|
// matchesFilter проверяет соответствие записи фильтру
|
||||||
func (j *JSONLFile) matchesFilter(record any, filter any, strictCompare bool) bool {
|
func (j *JSONLFile) matchesFilter(record any, filter any, strictCompare bool) bool {
|
||||||
if filter == nil {
|
if filter == nil {
|
||||||
@@ -896,14 +1108,18 @@ func (j *JSONLFile) valuesMatch(a, b any, strictCompare bool) bool {
|
|||||||
return a == b
|
return a == b
|
||||||
}
|
}
|
||||||
|
|
||||||
// Для строк - нечувствительное к регистру сравнение
|
if a == b {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Для строк - нечувствительное к регистру сравнение (равенство, не подстрока)
|
||||||
if aStr, ok := a.(string); ok {
|
if aStr, ok := a.(string); ok {
|
||||||
if bStr, ok := b.(string); ok {
|
if bStr, ok := b.(string); ok {
|
||||||
return strings.Contains(strings.ToLower(aStr), strings.ToLower(bStr))
|
return matchStringByPattern(aStr, bStr, strictCompare)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return a == b
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// rewriteFile перезаписывает файл новыми данными
|
// rewriteFile перезаписывает файл новыми данными
|
||||||
|
|||||||
@@ -4,7 +4,9 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
// LastIDManager управляет последними ID для коллекций
|
// LastIDManager управляет последними ID для коллекций.
|
||||||
|
// Ключ — полное имя коллекции (как в NextID/Insert), без обрезки по «_»:
|
||||||
|
// иначе user_data и events_A ломались бы на первый сегмент.
|
||||||
type LastIDManager struct {
|
type LastIDManager struct {
|
||||||
lastIDs map[string]int
|
lastIDs map[string]int
|
||||||
mutex sync.RWMutex
|
mutex sync.RWMutex
|
||||||
@@ -23,53 +25,32 @@ func GetLastIDManagerInstance() *LastIDManager {
|
|||||||
return lastIDManagerInstance
|
return lastIDManagerInstance
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLastID получает последний ID для коллекции
|
// GetLastID получает последний ID для коллекции (ключ — полное имя коллекции).
|
||||||
func (l *LastIDManager) GetLastID(filename string) int {
|
func (l *LastIDManager) GetLastID(collectionKey string) int {
|
||||||
l.mutex.RLock()
|
l.mutex.RLock()
|
||||||
defer l.mutex.RUnlock()
|
defer l.mutex.RUnlock()
|
||||||
|
|
||||||
baseFileName := l.getBaseFileName(filename)
|
return l.lastIDs[collectionKey]
|
||||||
return l.lastIDs[baseFileName]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetLastID устанавливает последний ID для коллекции
|
// SetLastID устанавливает последний ID для коллекции, если id больше текущего.
|
||||||
func (l *LastIDManager) SetLastID(filename string, id int) {
|
func (l *LastIDManager) SetLastID(collectionKey string, id int) {
|
||||||
l.mutex.Lock()
|
l.mutex.Lock()
|
||||||
defer l.mutex.Unlock()
|
defer l.mutex.Unlock()
|
||||||
|
|
||||||
baseFileName := l.getBaseFileName(filename)
|
currentID := l.lastIDs[collectionKey]
|
||||||
currentID := l.lastIDs[baseFileName]
|
|
||||||
if currentID < id {
|
if currentID < id {
|
||||||
l.lastIDs[baseFileName] = id
|
l.lastIDs[collectionKey] = id
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// IncrementLastID увеличивает последний ID для коллекции
|
// IncrementLastID увеличивает последний ID для коллекции
|
||||||
func (l *LastIDManager) IncrementLastID(filename string) int {
|
func (l *LastIDManager) IncrementLastID(collectionKey string) int {
|
||||||
l.mutex.Lock()
|
l.mutex.Lock()
|
||||||
defer l.mutex.Unlock()
|
defer l.mutex.Unlock()
|
||||||
|
|
||||||
baseFileName := l.getBaseFileName(filename)
|
currentID := l.lastIDs[collectionKey]
|
||||||
currentID := l.lastIDs[baseFileName]
|
|
||||||
newID := currentID + 1
|
newID := currentID + 1
|
||||||
l.lastIDs[baseFileName] = newID
|
l.lastIDs[collectionKey] = newID
|
||||||
return newID
|
return newID
|
||||||
}
|
}
|
||||||
|
|
||||||
// getBaseFileName извлекает базовое имя файла
|
|
||||||
func (l *LastIDManager) getBaseFileName(filename string) string {
|
|
||||||
if idx := l.findPartitionSeparator(filename); idx != -1 {
|
|
||||||
return filename[:idx]
|
|
||||||
}
|
|
||||||
return filename
|
|
||||||
}
|
|
||||||
|
|
||||||
// findPartitionSeparator находит разделитель партиции
|
|
||||||
func (l *LastIDManager) findPartitionSeparator(filename string) int {
|
|
||||||
for i, char := range filename {
|
|
||||||
if char == '_' {
|
|
||||||
return i
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
118
pkg/linedb/string_match.go
Normal file
118
pkg/linedb/string_match.go
Normal file
@@ -0,0 +1,118 @@
|
|||||||
|
package linedb
|
||||||
|
|
||||||
|
import "strings"
|
||||||
|
|
||||||
|
type stringMatchMode int
|
||||||
|
|
||||||
|
const (
|
||||||
|
stringMatchEqual stringMatchMode = iota
|
||||||
|
stringMatchContains
|
||||||
|
stringMatchHasPrefix
|
||||||
|
stringMatchHasSuffix
|
||||||
|
)
|
||||||
|
|
||||||
|
func matchStringByPattern(value string, pattern string, strictCompare bool) bool {
|
||||||
|
mode, lit := parsePercentPattern(pattern)
|
||||||
|
|
||||||
|
if !strictCompare {
|
||||||
|
// For substring checks we use ToLower (close enough to EqualFold intent).
|
||||||
|
value = strings.ToLower(value)
|
||||||
|
lit = strings.ToLower(lit)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch mode {
|
||||||
|
case stringMatchContains:
|
||||||
|
return strings.Contains(value, lit)
|
||||||
|
case stringMatchHasPrefix:
|
||||||
|
return strings.HasPrefix(value, lit)
|
||||||
|
case stringMatchHasSuffix:
|
||||||
|
return strings.HasSuffix(value, lit)
|
||||||
|
default:
|
||||||
|
if strictCompare {
|
||||||
|
return value == lit
|
||||||
|
}
|
||||||
|
return strings.EqualFold(value, lit)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// parsePercentPattern interprets unescaped % at the start/end of pattern:
|
||||||
|
// - %foo% => contains "foo"
|
||||||
|
// - foo% => hasPrefix "foo"
|
||||||
|
// - %foo => hasSuffix "foo"
|
||||||
|
// - foo => equal "foo"
|
||||||
|
// Escaping: \% means literal %, \\ means literal \ (only affects escape processing).
|
||||||
|
func parsePercentPattern(pattern string) (stringMatchMode, string) {
|
||||||
|
if pattern == "" {
|
||||||
|
return stringMatchEqual, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
leading := pattern[0] == '%'
|
||||||
|
trailing := len(pattern) > 0 && pattern[len(pattern)-1] == '%' && !isEscapedAt(pattern, len(pattern)-1)
|
||||||
|
|
||||||
|
start := 0
|
||||||
|
end := len(pattern)
|
||||||
|
if leading {
|
||||||
|
start = 1
|
||||||
|
}
|
||||||
|
if trailing && end > start {
|
||||||
|
end--
|
||||||
|
}
|
||||||
|
lit := unescapePercents(pattern[start:end])
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case leading && trailing:
|
||||||
|
return stringMatchContains, lit
|
||||||
|
case trailing:
|
||||||
|
return stringMatchHasPrefix, lit
|
||||||
|
case leading:
|
||||||
|
return stringMatchHasSuffix, lit
|
||||||
|
default:
|
||||||
|
return stringMatchEqual, unescapePercents(pattern)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func isEscapedAt(s string, idx int) bool {
|
||||||
|
// idx is escaped if preceded by an odd number of backslashes.
|
||||||
|
if idx <= 0 || idx >= len(s) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
n := 0
|
||||||
|
for i := idx - 1; i >= 0 && s[i] == '\\'; i-- {
|
||||||
|
n++
|
||||||
|
}
|
||||||
|
return n%2 == 1
|
||||||
|
}
|
||||||
|
|
||||||
|
func unescapePercents(s string) string {
|
||||||
|
if s == "" {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
var b strings.Builder
|
||||||
|
b.Grow(len(s))
|
||||||
|
esc := false
|
||||||
|
for i := 0; i < len(s); i++ {
|
||||||
|
ch := s[i]
|
||||||
|
if esc {
|
||||||
|
// Only unescape % and \; keep backslash for other chars.
|
||||||
|
if ch == '%' || ch == '\\' {
|
||||||
|
b.WriteByte(ch)
|
||||||
|
} else {
|
||||||
|
b.WriteByte('\\')
|
||||||
|
b.WriteByte(ch)
|
||||||
|
}
|
||||||
|
esc = false
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if ch == '\\' {
|
||||||
|
esc = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
b.WriteByte(ch)
|
||||||
|
}
|
||||||
|
if esc {
|
||||||
|
// dangling backslash
|
||||||
|
b.WriteByte('\\')
|
||||||
|
}
|
||||||
|
return b.String()
|
||||||
|
}
|
||||||
|
|
||||||
@@ -105,6 +105,10 @@ type JSONLFileOptions struct {
|
|||||||
SkipInvalidLines bool `json:"skipInvalidLines,omitempty"`
|
SkipInvalidLines bool `json:"skipInvalidLines,omitempty"`
|
||||||
DecryptKey string `json:"decryptKey,omitempty"`
|
DecryptKey string `json:"decryptKey,omitempty"`
|
||||||
ConvertStringIdToNumber bool `json:"convertStringIdToNumber,omitempty"`
|
ConvertStringIdToNumber bool `json:"convertStringIdToNumber,omitempty"`
|
||||||
|
// RebuildIndexesOnFileChange — после каждой мутации файла коллекции (Insert/Update/Delete через LineDb)
|
||||||
|
// полная пересборка всех индексов по этой логической коллекции; частичное индексирование (IndexRecord),
|
||||||
|
// tryIndexUpdate/tryIndexDelete и периодический ребилд по таймеру для этой коллекции отключаются.
|
||||||
|
RebuildIndexesOnFileChange bool `json:"rebuildIndexesOnFileChange,omitempty"`
|
||||||
// Функции сериализации и десериализации JSON
|
// Функции сериализации и десериализации JSON
|
||||||
JSONMarshal func(any) ([]byte, error) `json:"-"`
|
JSONMarshal func(any) ([]byte, error) `json:"-"`
|
||||||
JSONUnmarshal func([]byte, any) error `json:"-"`
|
JSONUnmarshal func([]byte, any) error `json:"-"`
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// mockMemcached — in-memory реализация MemcachedClient для тестов.
|
// mockMemcached — in-memory реализация MemcachedClient для тестов.
|
||||||
@@ -111,7 +111,7 @@ func TestIndexEncodedCollectionCache(t *testing.T) {
|
|||||||
}
|
}
|
||||||
initOptions := &linedb.LineDbInitOptions{
|
initOptions := &linedb.LineDbInitOptions{
|
||||||
CacheSize: 100,
|
CacheSize: 100,
|
||||||
CacheTTL: time.Minute*10,
|
CacheTTL: time.Minute * 10,
|
||||||
DBFolder: "./data/test-linedb-index-enc",
|
DBFolder: "./data/test-linedb-index-enc",
|
||||||
Collections: []linedb.JSONLFileOptions{
|
Collections: []linedb.JSONLFileOptions{
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func setupPointUpdateDB(t *testing.T) (*linedb.LineDb, func()) {
|
func setupPointUpdateDB(t *testing.T) (*linedb.LineDb, func()) {
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestLineDbBasic(t *testing.T) {
|
func TestLineDbBasic(t *testing.T) {
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
const dbDir = "./data/nonpartitioned"
|
const dbDir = "./data/nonpartitioned"
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// failingLookupStore — IndexStore, у которого Lookup всегда возвращает ошибку (для проверки FailOnFailureIndexRead).
|
// failingLookupStore — IndexStore, у которого Lookup всегда возвращает ошибку (для проверки FailOnFailureIndexRead).
|
||||||
@@ -27,8 +27,8 @@ func (f *failingLookupStore) UnindexRecord(collection, partition string, fields
|
|||||||
f.inner.UnindexRecord(collection, partition, fields, record, lineIndex)
|
f.inner.UnindexRecord(collection, partition, fields, record, lineIndex)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *failingLookupStore) Rebuild(collection, partition string, fields []string, records []any) error {
|
func (f *failingLookupStore) Rebuild(collection, partition string, fields []string, records []any, lineIndexes []int) error {
|
||||||
return f.inner.Rebuild(collection, partition, fields, records)
|
return f.inner.Rebuild(collection, partition, fields, records, lineIndexes)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *failingLookupStore) Clear(collection string) error {
|
func (f *failingLookupStore) Clear(collection string) error {
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"linedb/pkg/linedb"
|
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
const dbDir = "./data/partitioned"
|
const dbDir = "./data/partitioned"
|
||||||
|
|||||||
Reference in New Issue
Block a user