diff --git a/pkg/linedb/jsonl_file.go b/pkg/linedb/jsonl_file.go index 913d207..c524c00 100644 --- a/pkg/linedb/jsonl_file.go +++ b/pkg/linedb/jsonl_file.go @@ -1058,6 +1058,11 @@ func (j *JSONLFile) GetEncryptKey() string { return j.cypherKey } +// Mutex возвращает RWMutex адаптера файла (тот же замок, что у Read/Write/Update/Delete при options.InTransaction == false). +func (j *JSONLFile) Mutex() *sync.RWMutex { + return &j.mutex +} + // matchesFilter проверяет соответствие записи фильтру func (j *JSONLFile) matchesFilter(record any, filter any, strictCompare bool) bool { if filter == nil { diff --git a/pkg/linedb/line_db.go b/pkg/linedb/line_db.go index c3218d5..e70bd62 100644 --- a/pkg/linedb/line_db.go +++ b/pkg/linedb/line_db.go @@ -776,6 +776,46 @@ func (db *LineDb) ClearCache(collectionName string, options LineDbAdapterOptions return nil } +// DBMutex возвращает RWMutex уровня LineDb: им сериализуются Insert/Update/Delete/Read и др. при обычных вызовах. +// Пары вызовов: Lock/Unlock (эксклюзивно) и RLock/RUnlock (чтение). Неверный порядок вместе с AdapterMutex может дать взаимную блокировку. +func (db *LineDb) DBMutex() *sync.RWMutex { + return &db.mutex +} + +// AdapterMutex возвращает RWMutex JSONL-адаптера для имени хранилища (логическая коллекция или имя партиции, например events_A). +// Пустое collectionName — первая коллекция из конфигурации. Поиск выполняется под RLock карты адаптеров. +func (db *LineDb) AdapterMutex(collectionName string) (*sync.RWMutex, error) { + db.mutex.RLock() + defer db.mutex.RUnlock() + if collectionName == "" { + collectionName = db.getFirstCollection() + } + if collectionName == "" { + return nil, fmt.Errorf("no collection configured") + } + a, ok := db.adapters[collectionName] + if !ok { + return nil, fmt.Errorf("collection %s not found", collectionName) + } + return a.Mutex(), nil +} + +// AdapterMutexAssumeDBLocked возвращает RWMutex адаптера. Вызывать только если текущая горутина +// уже удерживает DBMutex().RLock() или Lock() — иначе доступ к карте адаптеров не синхронизирован. +func (db *LineDb) AdapterMutexAssumeDBLocked(collectionName string) (*sync.RWMutex, error) { + if collectionName == "" { + collectionName = db.getFirstCollection() + } + if collectionName == "" { + return nil, fmt.Errorf("no collection configured") + } + a, ok := db.adapters[collectionName] + if !ok { + return nil, fmt.Errorf("collection %s not found", collectionName) + } + return a.Mutex(), nil +} + // clearCacheForCollection сбрасывает кэш запросов по логической коллекции (включая ключи партиций base_*). func (db *LineDb) clearCacheForCollection(collectionName string) { if db.cacheExternal == nil || collectionName == "" {