3 Commits

Author SHA1 Message Date
a1bbd9177a change module name 3 2026-04-09 14:08:27 +06:00
7894474f2d change module name 2 2026-04-07 15:04:38 +06:00
17d1a538ca change module name 2026-04-07 14:58:54 +06:00
23 changed files with 153 additions and 59 deletions

View File

@@ -25,7 +25,7 @@ package main
import (
"log"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go"
)
func main() {

View File

@@ -5,7 +5,7 @@ import (
"os"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
func main() {

View File

@@ -5,7 +5,7 @@ import (
"log"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
type User struct {

View File

@@ -6,7 +6,7 @@ import (
"testing"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
func TestCustomJSONSerialization(t *testing.T) {

View File

@@ -10,7 +10,7 @@ import (
"strings"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
// User представляет пользователя

View File

@@ -7,7 +7,7 @@ import (
"os"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
// Item — структура для вставки (LineDB поддерживает struct и map)

View File

@@ -5,7 +5,7 @@ import (
"log"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
// testDeleteOneRecordByID тестирует удаление одной записи по ID

View File

@@ -7,7 +7,7 @@ import (
"os"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
func main() {

View File

@@ -5,7 +5,7 @@ import (
"log"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
// testSingleInsert тестирует вставку одной записи

View File

@@ -11,7 +11,7 @@ import (
"strings"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
// User представляет пользователя

View File

@@ -7,7 +7,7 @@ import (
"path/filepath"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
// Пример работы с партициями + индексируемой коллекцией.

View File

@@ -20,7 +20,7 @@ import (
"strings"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
const (

View File

@@ -10,7 +10,7 @@ import (
"strings"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
func main() {

View File

@@ -9,7 +9,7 @@ import (
"os"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
func main() {

2
go.mod
View File

@@ -1,4 +1,4 @@
module linedb
module direct-dev.ru/gitea/GiteaAdmin/elowdb-go
go 1.21

View File

@@ -4,7 +4,9 @@ import (
"sync"
)
// LastIDManager управляет последними ID для коллекций
// LastIDManager управляет последними ID для коллекций.
// Ключ — полное имя коллекции (как в NextID/Insert), без обрезки по «_»:
// иначе user_data и events_A ломались бы на первый сегмент.
type LastIDManager struct {
lastIDs map[string]int
mutex sync.RWMutex
@@ -23,53 +25,32 @@ func GetLastIDManagerInstance() *LastIDManager {
return lastIDManagerInstance
}
// GetLastID получает последний ID для коллекции
func (l *LastIDManager) GetLastID(filename string) int {
// GetLastID получает последний ID для коллекции (ключ — полное имя коллекции).
func (l *LastIDManager) GetLastID(collectionKey string) int {
l.mutex.RLock()
defer l.mutex.RUnlock()
baseFileName := l.getBaseFileName(filename)
return l.lastIDs[baseFileName]
return l.lastIDs[collectionKey]
}
// SetLastID устанавливает последний ID для коллекции
func (l *LastIDManager) SetLastID(filename string, id int) {
// SetLastID устанавливает последний ID для коллекции, если id больше текущего.
func (l *LastIDManager) SetLastID(collectionKey string, id int) {
l.mutex.Lock()
defer l.mutex.Unlock()
baseFileName := l.getBaseFileName(filename)
currentID := l.lastIDs[baseFileName]
currentID := l.lastIDs[collectionKey]
if currentID < id {
l.lastIDs[baseFileName] = id
l.lastIDs[collectionKey] = id
}
}
// IncrementLastID увеличивает последний ID для коллекции
func (l *LastIDManager) IncrementLastID(filename string) int {
func (l *LastIDManager) IncrementLastID(collectionKey string) int {
l.mutex.Lock()
defer l.mutex.Unlock()
baseFileName := l.getBaseFileName(filename)
currentID := l.lastIDs[baseFileName]
currentID := l.lastIDs[collectionKey]
newID := currentID + 1
l.lastIDs[baseFileName] = newID
l.lastIDs[collectionKey] = newID
return newID
}
// getBaseFileName извлекает базовое имя файла
func (l *LastIDManager) getBaseFileName(filename string) string {
if idx := l.findPartitionSeparator(filename); idx != -1 {
return filename[:idx]
}
return filename
}
// findPartitionSeparator находит разделитель партиции
func (l *LastIDManager) findPartitionSeparator(filename string) int {
for i, char := range filename {
if char == '_' {
return i
}
}
return -1
}

View File

@@ -171,6 +171,81 @@ func (db *LineDb) Init(force bool, initOptions *LineDbInitOptions) error {
go db.indexRebuildTimerLoop(interval)
}
if err := db.seedLastIDsFromData(dbFolder); err != nil {
return err
}
return nil
}
// seedLastIDsFromData выставляет LastIDManager по максимальному id в существующих данных
// (после рестарта процесса счётчик не начинается с 1).
func (db *LineDb) seedLastIDsFromData(dbFolder string) error {
for i, opt := range db.initOptions.Collections {
collectionName := opt.CollectionName
if collectionName == "" {
collectionName = fmt.Sprintf("collection_%d", i+1)
}
if db.isCollectionPartitioned(collectionName) {
if err := db.seedLastIDPartitioned(dbFolder, collectionName); err != nil {
return err
}
continue
}
adapter := db.adapters[collectionName]
if adapter == nil {
continue
}
records, err := adapter.Read(LineDbAdapterOptions{})
if err != nil {
return fmt.Errorf("seed last id: read %s: %w", collectionName, err)
}
db.lastIDManager.SetLastID(collectionName, db.GetMaxID(records))
}
return nil
}
// seedLastIDPartitioned — максимальный id по базовому файлу и всем events_*.jsonl на диске.
// NextID для партиций вызывается с логическим именем (например events), ключ тот же.
func (db *LineDb) seedLastIDPartitioned(dbFolder, baseName string) error {
maxID := 0
if a := db.adapters[baseName]; a != nil {
recs, err := a.Read(LineDbAdapterOptions{})
if err != nil {
return fmt.Errorf("seed last id: read base %s: %w", baseName, err)
}
if m := db.GetMaxID(recs); m > maxID {
maxID = m
}
}
pattern := filepath.Join(dbFolder, baseName+"_*.jsonl")
matches, err := filepath.Glob(pattern)
if err != nil {
return fmt.Errorf("seed last id: glob %s: %w", pattern, err)
}
for _, path := range matches {
base := filepath.Base(path)
partName := strings.TrimSuffix(base, ".jsonl")
var adapter *JSONLFile
if existing, ok := db.adapters[partName]; ok {
adapter = existing
} else {
adapter = NewJSONLFile(path, "", JSONLFileOptions{CollectionName: partName})
if err := adapter.Init(false, LineDbAdapterOptions{}); err != nil {
continue
}
db.adapters[partName] = adapter
db.collections[partName] = path
}
recs, err := adapter.Read(LineDbAdapterOptions{})
if err != nil {
continue
}
if m := db.GetMaxID(recs); m > maxID {
maxID = m
}
}
db.lastIDManager.SetLastID(baseName, maxID)
return nil
}
@@ -1285,8 +1360,9 @@ func (db *LineDb) GetMaxID(records []any) int {
for _, record := range records {
if recordMap, ok := record.(map[string]any); ok {
if id, ok := recordMap["id"]; ok {
if idInt, ok := id.(int); ok && idInt > maxID {
maxID = idInt
n := idToIntForMax(id)
if n > maxID {
maxID = n
}
}
}
@@ -1294,6 +1370,21 @@ func (db *LineDb) GetMaxID(records []any) int {
return maxID
}
func idToIntForMax(id any) int {
switch v := id.(type) {
case int:
return v
case int64:
return int(v)
case float64:
return int(v)
case float32:
return int(v)
default:
return 0
}
}
func (db *LineDb) matchesFilter(record any, filter any, strictCompare bool) bool {
if recordMap, ok := record.(map[string]any); ok {
if filterMap, ok := filter.(map[string]any); ok {
@@ -1383,10 +1474,32 @@ func (db *LineDb) isInvalidID(id any) bool {
if id == nil {
return true
}
if idNum, ok := id.(int); ok {
return idNum <= -1
switch v := id.(type) {
case int:
return v <= -1
case int8:
return v <= -1
case int16:
return v <= -1
case int32:
return v <= -1
case int64:
return v <= -1
case uint, uint8, uint16, uint32, uint64:
return false
case float32:
if v != v { // NaN
return true
}
return float64(v) <= -1
case float64:
if v != v { // NaN
return true
}
return v <= -1
default:
return false
}
return false
}
func (db *LineDb) compareIDs(a, b any) bool {

View File

@@ -5,7 +5,7 @@ import (
"testing"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
// mockMemcached — in-memory реализация MemcachedClient для тестов.
@@ -111,7 +111,7 @@ func TestIndexEncodedCollectionCache(t *testing.T) {
}
initOptions := &linedb.LineDbInitOptions{
CacheSize: 100,
CacheTTL: time.Minute*10,
CacheTTL: time.Minute * 10,
DBFolder: "./data/test-linedb-index-enc",
Collections: []linedb.JSONLFileOptions{
{

View File

@@ -8,7 +8,7 @@ import (
"testing"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
func setupPointUpdateDB(t *testing.T) (*linedb.LineDb, func()) {

View File

@@ -7,7 +7,7 @@ import (
"testing"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
func TestLineDbBasic(t *testing.T) {

View File

@@ -5,7 +5,7 @@ import (
"path/filepath"
"testing"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
const dbDir = "./data/nonpartitioned"

View File

@@ -7,7 +7,7 @@ import (
"strings"
"testing"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
// failingLookupStore — IndexStore, у которого Lookup всегда возвращает ошибку (для проверки FailOnFailureIndexRead).

View File

@@ -6,7 +6,7 @@ import (
"testing"
"time"
"linedb/pkg/linedb"
"direct-dev.ru/gitea/GiteaAdmin/elowdb-go/pkg/linedb"
)
const dbDir = "./data/partitioned"