From f3f2b7b39470732fac505ba8aa2210481876910e Mon Sep 17 00:00:00 2001 From: "direct-dev.ru" Date: Tue, 3 Mar 2026 14:44:47 +0600 Subject: [PATCH] create unique fields feature --- examples/README.md | 10 ++++ examples/basic/main.go | 1 + examples/insert/insert.go | 1 + pkg/linedb/line_db.go | 119 ++++++++++++++++++++++++++++++++++++-- pkg/linedb/types.go | 1 + 5 files changed, 128 insertions(+), 4 deletions(-) diff --git a/examples/README.md b/examples/README.md index 812afb3..b350192 100644 --- a/examples/README.md +++ b/examples/README.md @@ -104,6 +104,16 @@ go run main.go Показана работа с кэшем для улучшения производительности чтения данных. +### Уникальные поля (UniqueFields) + +Коллекции могут иметь поля с ограничением уникальности. При вставке и обновлении LineDB проверяет, что значение поля не дублируется: + +```go +UniqueFields: []string{"email"}, +``` + +Пустые значения (`nil`, `""`) не проверяются. При `SkipCheckExistingForWrite: true` проверка уникальности пропускается. + ### Фильтрация Демонстрируются различные способы фильтрации: diff --git a/examples/basic/main.go b/examples/basic/main.go index 9322658..34bf500 100644 --- a/examples/basic/main.go +++ b/examples/basic/main.go @@ -27,6 +27,7 @@ func main() { CollectionName: "users", AllocSize: 512, IndexedFields: []string{"id", "email", "name"}, + UniqueFields: []string{"email"}, }, }, } diff --git a/examples/insert/insert.go b/examples/insert/insert.go index 0231cc9..cc9443c 100644 --- a/examples/insert/insert.go +++ b/examples/insert/insert.go @@ -295,6 +295,7 @@ func testUniquenessCheck() { CollectionName: "unique_users", AllocSize: 256, IndexedFields: []string{"id", "email"}, + UniqueFields: []string{"email"}, }, }, } diff --git a/pkg/linedb/line_db.go b/pkg/linedb/line_db.go index b52645e..8457963 100644 --- a/pkg/linedb/line_db.go +++ b/pkg/linedb/line_db.go @@ -202,12 +202,10 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO } itemMap["id"] = newID - resultDataArray = append(resultDataArray, itemMap) } else { // Проверяем существование записи если не пропускаем проверку if !options.SkipCheckExistingForWrite { filter := map[string]any{"id": itemMap["id"]} - for key, partitionAdapter := range db.adapters { if strings.Contains(key, collectionName) { exists, err := partitionAdapter.ReadByFilter(filter, LineDbAdapterOptions{InTransaction: true}) @@ -215,13 +213,18 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO return fmt.Errorf("failed to check existing record: %w", err) } if len(exists) > 0 { - return fmt.Errorf("record with id %v already exists in collection %s", collectionName, itemMap["id"]) + return fmt.Errorf("record with id %v already exists in collection %s", itemMap["id"], collectionName) } } } } - resultDataArray = append(resultDataArray, itemMap) } + + // Проверяем уникальность полей из UniqueFields + if err := db.checkUniqueFieldsInsert(itemMap, collectionName, resultDataArray, options); err != nil { + return err + } + resultDataArray = append(resultDataArray, itemMap) } // Записываем данные с флагом транзакции @@ -298,6 +301,10 @@ func (db *LineDb) Update(data any, collectionName string, filter any, options Li } } } + // Проверяем уникальность полей из UniqueFields + if err := db.checkUniqueFieldsUpdate(dataMap, filter, collectionName, options); err != nil { + return nil, err + } } // Проверяем партиционирование @@ -475,6 +482,110 @@ func (db *LineDb) getBaseCollectionName(collectionName string) string { return collectionName } +// getCollectionOptions возвращает опции коллекции (для партиционированных — опции базовой коллекции) +func (db *LineDb) getCollectionOptions(collectionName string) *JSONLFileOptions { + if db.initOptions == nil { + return nil + } + baseName := db.getBaseCollectionName(collectionName) + for i := range db.initOptions.Collections { + opts := &db.initOptions.Collections[i] + if opts.CollectionName == collectionName || opts.CollectionName == baseName { + return opts + } + } + return nil +} + +// isValueEmpty проверяет, считается ли значение "пустым" (пропускаем проверку уникальности для пустых) +func (db *LineDb) isValueEmpty(v any) bool { + if v == nil { + return true + } + if s, ok := v.(string); ok && s == "" { + return true + } + return false +} + +// checkUniqueFieldsInsert проверяет уникальность полей при вставке +func (db *LineDb) checkUniqueFieldsInsert(itemMap map[string]any, collectionName string, resultDataArray []any, options LineDbAdapterOptions) error { + if options.SkipCheckExistingForWrite { + return nil + } + opts := db.getCollectionOptions(collectionName) + if opts == nil || len(opts.UniqueFields) == 0 { + return nil + } + for _, fieldName := range opts.UniqueFields { + value := itemMap[fieldName] + if db.isValueEmpty(value) { + continue + } + // Проверяем в batch (уже добавляемые записи) + for _, resultItem := range resultDataArray { + if resultMap, ok := resultItem.(map[string]any); ok { + if db.valuesMatch(resultMap[fieldName], value, true) { + return fmt.Errorf("unique constraint violation: field %q value %v already exists in collection %q", + fieldName, value, collectionName) + } + } + } + // Проверяем в БД (при Insert записи ещё нет, поэтому любое совпадение — конфликт) + filter := map[string]any{fieldName: value} + existing, err := db.ReadByFilter(filter, collectionName, LineDbAdapterOptions{InTransaction: true}) + if err != nil { + return fmt.Errorf("failed to check unique field %q: %w", fieldName, err) + } + if len(existing) > 0 { + return fmt.Errorf("unique constraint violation: field %q value %v already exists in collection %q", + fieldName, value, collectionName) + } + } + return nil +} + +// checkUniqueFieldsUpdate проверяет уникальность полей при обновлении +func (db *LineDb) checkUniqueFieldsUpdate(data map[string]any, filter any, collectionName string, options LineDbAdapterOptions) error { + if options.SkipCheckExistingForWrite { + return nil + } + opts := db.getCollectionOptions(collectionName) + if opts == nil || len(opts.UniqueFields) == 0 { + return nil + } + recordsToUpdate, err := db.ReadByFilter(filter, collectionName, LineDbAdapterOptions{InTransaction: true}) + if err != nil { + return fmt.Errorf("failed to read records for update: %w", err) + } + updatingIDs := make(map[any]bool) + for _, rec := range recordsToUpdate { + if m, ok := rec.(map[string]any); ok && m["id"] != nil { + updatingIDs[m["id"]] = true + } + } + for _, fieldName := range opts.UniqueFields { + value, inData := data[fieldName] + if !inData || db.isValueEmpty(value) { + continue + } + existing, err := db.ReadByFilter(map[string]any{fieldName: value}, collectionName, LineDbAdapterOptions{InTransaction: true}) + if err != nil { + return fmt.Errorf("failed to check unique field %q: %w", fieldName, err) + } + for _, rec := range existing { + if recMap, ok := rec.(map[string]any); ok { + if updatingIDs[recMap["id"]] { + continue + } + return fmt.Errorf("unique constraint violation: field %q value %v already exists in collection %q", + fieldName, value, collectionName) + } + } + } + return nil +} + func (db *LineDb) isCollectionPartitioned(collectionName string) bool { _, exists := db.partitionFunctions[collectionName] return exists diff --git a/pkg/linedb/types.go b/pkg/linedb/types.go index a3bc0b0..a9137f5 100644 --- a/pkg/linedb/types.go +++ b/pkg/linedb/types.go @@ -60,6 +60,7 @@ type JSONLFileOptions struct { CollectionName string `json:"collectionName,omitempty"` AllocSize int `json:"allocSize,omitempty"` IndexedFields []string `json:"indexedFields,omitempty"` + UniqueFields []string `json:"uniqueFields,omitempty"` // Поля с ограничением уникальности EncryptKeyForLineDb string `json:"encryptKeyForLineDb,omitempty"` SkipInvalidLines bool `json:"skipInvalidLines,omitempty"` DecryptKey string `json:"decryptKey,omitempty"`