create unique fields feature

This commit is contained in:
2026-03-03 14:44:47 +06:00
parent 1f31ab074b
commit f3f2b7b394
5 changed files with 128 additions and 4 deletions

View File

@@ -202,12 +202,10 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO
}
itemMap["id"] = newID
resultDataArray = append(resultDataArray, itemMap)
} else {
// Проверяем существование записи если не пропускаем проверку
if !options.SkipCheckExistingForWrite {
filter := map[string]any{"id": itemMap["id"]}
for key, partitionAdapter := range db.adapters {
if strings.Contains(key, collectionName) {
exists, err := partitionAdapter.ReadByFilter(filter, LineDbAdapterOptions{InTransaction: true})
@@ -215,13 +213,18 @@ func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterO
return fmt.Errorf("failed to check existing record: %w", err)
}
if len(exists) > 0 {
return fmt.Errorf("record with id %v already exists in collection %s", collectionName, itemMap["id"])
return fmt.Errorf("record with id %v already exists in collection %s", itemMap["id"], collectionName)
}
}
}
}
resultDataArray = append(resultDataArray, itemMap)
}
// Проверяем уникальность полей из UniqueFields
if err := db.checkUniqueFieldsInsert(itemMap, collectionName, resultDataArray, options); err != nil {
return err
}
resultDataArray = append(resultDataArray, itemMap)
}
// Записываем данные с флагом транзакции
@@ -298,6 +301,10 @@ func (db *LineDb) Update(data any, collectionName string, filter any, options Li
}
}
}
// Проверяем уникальность полей из UniqueFields
if err := db.checkUniqueFieldsUpdate(dataMap, filter, collectionName, options); err != nil {
return nil, err
}
}
// Проверяем партиционирование
@@ -475,6 +482,110 @@ func (db *LineDb) getBaseCollectionName(collectionName string) string {
return collectionName
}
// getCollectionOptions возвращает опции коллекции (для партиционированных — опции базовой коллекции)
func (db *LineDb) getCollectionOptions(collectionName string) *JSONLFileOptions {
if db.initOptions == nil {
return nil
}
baseName := db.getBaseCollectionName(collectionName)
for i := range db.initOptions.Collections {
opts := &db.initOptions.Collections[i]
if opts.CollectionName == collectionName || opts.CollectionName == baseName {
return opts
}
}
return nil
}
// isValueEmpty проверяет, считается ли значение "пустым" (пропускаем проверку уникальности для пустых)
func (db *LineDb) isValueEmpty(v any) bool {
if v == nil {
return true
}
if s, ok := v.(string); ok && s == "" {
return true
}
return false
}
// checkUniqueFieldsInsert проверяет уникальность полей при вставке
func (db *LineDb) checkUniqueFieldsInsert(itemMap map[string]any, collectionName string, resultDataArray []any, options LineDbAdapterOptions) error {
if options.SkipCheckExistingForWrite {
return nil
}
opts := db.getCollectionOptions(collectionName)
if opts == nil || len(opts.UniqueFields) == 0 {
return nil
}
for _, fieldName := range opts.UniqueFields {
value := itemMap[fieldName]
if db.isValueEmpty(value) {
continue
}
// Проверяем в batch (уже добавляемые записи)
for _, resultItem := range resultDataArray {
if resultMap, ok := resultItem.(map[string]any); ok {
if db.valuesMatch(resultMap[fieldName], value, true) {
return fmt.Errorf("unique constraint violation: field %q value %v already exists in collection %q",
fieldName, value, collectionName)
}
}
}
// Проверяем в БД (при Insert записи ещё нет, поэтому любое совпадение — конфликт)
filter := map[string]any{fieldName: value}
existing, err := db.ReadByFilter(filter, collectionName, LineDbAdapterOptions{InTransaction: true})
if err != nil {
return fmt.Errorf("failed to check unique field %q: %w", fieldName, err)
}
if len(existing) > 0 {
return fmt.Errorf("unique constraint violation: field %q value %v already exists in collection %q",
fieldName, value, collectionName)
}
}
return nil
}
// checkUniqueFieldsUpdate проверяет уникальность полей при обновлении
func (db *LineDb) checkUniqueFieldsUpdate(data map[string]any, filter any, collectionName string, options LineDbAdapterOptions) error {
if options.SkipCheckExistingForWrite {
return nil
}
opts := db.getCollectionOptions(collectionName)
if opts == nil || len(opts.UniqueFields) == 0 {
return nil
}
recordsToUpdate, err := db.ReadByFilter(filter, collectionName, LineDbAdapterOptions{InTransaction: true})
if err != nil {
return fmt.Errorf("failed to read records for update: %w", err)
}
updatingIDs := make(map[any]bool)
for _, rec := range recordsToUpdate {
if m, ok := rec.(map[string]any); ok && m["id"] != nil {
updatingIDs[m["id"]] = true
}
}
for _, fieldName := range opts.UniqueFields {
value, inData := data[fieldName]
if !inData || db.isValueEmpty(value) {
continue
}
existing, err := db.ReadByFilter(map[string]any{fieldName: value}, collectionName, LineDbAdapterOptions{InTransaction: true})
if err != nil {
return fmt.Errorf("failed to check unique field %q: %w", fieldName, err)
}
for _, rec := range existing {
if recMap, ok := rec.(map[string]any); ok {
if updatingIDs[recMap["id"]] {
continue
}
return fmt.Errorf("unique constraint violation: field %q value %v already exists in collection %q",
fieldName, value, collectionName)
}
}
}
return nil
}
func (db *LineDb) isCollectionPartitioned(collectionName string) bool {
_, exists := db.partitionFunctions[collectionName]
return exists

View File

@@ -60,6 +60,7 @@ type JSONLFileOptions struct {
CollectionName string `json:"collectionName,omitempty"`
AllocSize int `json:"allocSize,omitempty"`
IndexedFields []string `json:"indexedFields,omitempty"`
UniqueFields []string `json:"uniqueFields,omitempty"` // Поля с ограничением уникальности
EncryptKeyForLineDb string `json:"encryptKeyForLineDb,omitempty"`
SkipInvalidLines bool `json:"skipInvalidLines,omitempty"`
DecryptKey string `json:"decryptKey,omitempty"`