Files
elowdb-go/pkg/linedb/line_db.go
2026-03-04 10:10:57 +06:00

1076 lines
30 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package linedb
import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
)
// LineDb представляет основную базу данных
// Соответствует TypeScript классу LineDb
type LineDb struct {
adapters map[string]*JSONLFile
collections map[string]string
partitionFunctions map[string]func(any) string
mutex sync.RWMutex
cacheSize int
cacheExternal *RecordCache
nextIDFn func(any, string) (any, error)
lastIDManager *LastIDManager
// inTransaction bool
cacheTTL time.Duration
constructorOptions *LineDbOptions
initOptions *LineDbInitOptions
}
// NewLineDb создает новый экземпляр LineDb
func NewLineDb(options *LineDbOptions, adapters ...*JSONLFile) *LineDb {
if options == nil {
options = &LineDbOptions{}
}
db := &LineDb{
adapters: make(map[string]*JSONLFile),
collections: make(map[string]string),
partitionFunctions: make(map[string]func(any) string),
cacheSize: options.CacheSize,
cacheTTL: options.CacheTTL,
lastIDManager: GetLastIDManagerInstance(),
constructorOptions: options,
}
// Инициализируем кэш если нужно
if db.cacheSize > 0 && db.cacheTTL > 0 {
db.cacheExternal = NewRecordCache(db.cacheSize, db.cacheTTL)
}
// Добавляем готовые адаптеры
for _, adapter := range adapters {
collectionName := adapter.GetCollectionName()
db.adapters[collectionName] = adapter
db.collections[collectionName] = adapter.GetFilename()
}
return db
}
// Init инициализирует базу данных
func (db *LineDb) Init(force bool, initOptions *LineDbInitOptions) error {
db.mutex.Lock()
defer db.mutex.Unlock()
if initOptions == nil {
return fmt.Errorf("no init options provided")
}
// Устанавливаем опции
db.initOptions = initOptions
db.cacheSize = initOptions.CacheSize
db.cacheTTL = initOptions.CacheTTL
// Инициализируем кэш если нужно
if db.cacheSize > 0 && db.cacheTTL > 0 {
db.cacheExternal = NewRecordCache(db.cacheSize, db.cacheTTL)
}
// Создаем папку базы данных
dbFolder := initOptions.DBFolder
if dbFolder == "" {
dbFolder = "linedb"
}
if err := os.MkdirAll(dbFolder, 0755); err != nil {
return fmt.Errorf("failed to create database folder: %w", err)
}
// Сохраняем функции партиционирования
for _, partition := range initOptions.Partitions {
if partition.PartIDFn != nil {
db.partitionFunctions[partition.CollectionName] = partition.PartIDFn
}
}
// Создаем адаптеры для коллекций
for i, adapterOptions := range initOptions.Collections {
collectionName := adapterOptions.CollectionName
if collectionName == "" {
collectionName = fmt.Sprintf("collection_%d", i+1)
}
// Создаем путь к файлу
filename := filepath.Join(dbFolder, collectionName+".jsonl")
// Создаем адаптер
adapter := NewJSONLFile(filename, adapterOptions.EncryptKeyForLineDb, adapterOptions)
// Инициализируем адаптер
if err := adapter.Init(force, LineDbAdapterOptions{}); err != nil {
return fmt.Errorf("failed to init adapter for collection %s: %w", collectionName, err)
}
// Добавляем в карту адаптеров
db.adapters[collectionName] = adapter
db.collections[collectionName] = filename
}
return nil
}
// Read читает все записи из коллекции
func (db *LineDb) Read(collectionName string, options LineDbAdapterOptions) ([]any, error) {
db.mutex.RLock()
defer db.mutex.RUnlock()
if collectionName == "" {
collectionName = db.getFirstCollection()
}
adapter, exists := db.adapters[collectionName]
if !exists {
return nil, fmt.Errorf("collection %s not found", collectionName)
}
return adapter.Read(options)
}
// Insert вставляет новые записи в коллекцию
func (db *LineDb) Insert(data any, collectionName string, options LineDbAdapterOptions) error {
// Блокируем только если не в транзакции
if !options.InTransaction {
db.mutex.Lock()
defer db.mutex.Unlock()
}
if collectionName == "" {
collectionName = db.getFirstCollection()
}
// Проверяем debug tag
if options.DebugTag == "error" {
return fmt.Errorf("test error")
}
// Обрабатываем данные
dataArray := db.normalizeDataArray(data)
resultDataArray := make([]any, 0, len(dataArray))
for _, item := range dataArray {
itemMap, ok := item.(map[string]any)
if !ok {
return fmt.Errorf("invalid data format")
}
// Генерируем ID если отсутствует
if itemMap["id"] == nil || db.isInvalidID(itemMap["id"]) {
newID, err := db.NextID(item, collectionName)
if err != nil {
return fmt.Errorf("failed to generate ID: %w", err)
}
// Проверяем уникальность ID
done := false
count := 0
for !done && count < 10000 {
// Проверяем, что ID не существует в результатах
exists := false
for _, resultItem := range resultDataArray {
if resultMap, ok := resultItem.(map[string]any); ok {
if resultMap["id"] == newID {
exists = true
break
}
}
}
if !exists {
done = true
} else {
newID, err = db.NextID(item, collectionName)
if err != nil {
return fmt.Errorf("failed to generate unique ID: %w", err)
}
}
count++
}
if count >= 10000 {
return fmt.Errorf("can not generate new id for 10 000 iterations")
}
itemMap["id"] = newID
} else {
// Проверяем существование записи если не пропускаем проверку
if !options.SkipCheckExistingForWrite {
filter := map[string]any{"id": itemMap["id"]}
for key, partitionAdapter := range db.adapters {
if strings.Contains(key, collectionName) {
exists, err := partitionAdapter.ReadByFilter(filter, LineDbAdapterOptions{InTransaction: true})
if err != nil {
return fmt.Errorf("failed to check existing record: %w", err)
}
if len(exists) > 0 {
return fmt.Errorf("record with id %v already exists in collection %s", itemMap["id"], collectionName)
}
}
}
}
}
// Проверяем уникальность полей из UniqueFields
if err := db.checkUniqueFieldsInsert(itemMap, collectionName, resultDataArray, options); err != nil {
return err
}
resultDataArray = append(resultDataArray, itemMap)
}
// Записываем данные с флагом транзакции
writeOptions := LineDbAdapterOptions{InTransaction: true, InternalCall: true}
if err := db.Write(resultDataArray, collectionName, writeOptions); err != nil {
return fmt.Errorf("failed to write data: %w", err)
}
// Обновляем кэш
if db.cacheExternal != nil {
for _, item := range resultDataArray {
db.cacheExternal.UpdateCacheAfterInsert(item, collectionName)
}
}
return nil
}
// Write записывает данные в коллекцию
func (db *LineDb) Write(data any, collectionName string, options LineDbAdapterOptions) error {
// Блокируем только если не в транзакции
if !options.InTransaction {
db.mutex.Lock()
defer db.mutex.Unlock()
}
if collectionName == "" {
collectionName = db.getFirstCollection()
}
// Проверяем партиционирование
if db.isCollectionPartitioned(collectionName) {
dataArray := db.normalizeDataArray(data)
for _, item := range dataArray {
adapter, err := db.getPartitionAdapter(item, collectionName)
if err != nil {
return fmt.Errorf("failed to get partition adapter: %w", err)
}
if err := adapter.Write(item, options); err != nil {
return fmt.Errorf("failed to write to partition: %w", err)
}
}
return nil
}
// Обычная запись
adapter, exists := db.adapters[collectionName]
if !exists {
return fmt.Errorf("collection %s not found", collectionName)
}
return adapter.Write(data, options)
}
// Update обновляет записи в коллекции
func (db *LineDb) Update(data any, collectionName string, filter any, options LineDbAdapterOptions) ([]any, error) {
// Блокируем только если не в транзакции
if !options.InTransaction {
db.mutex.Lock()
defer db.mutex.Unlock()
}
if collectionName == "" {
collectionName = db.getFirstCollection()
}
// Проверяем конфликт ID
if dataMap, ok := data.(map[string]any); ok {
if filterMap, ok := filter.(map[string]any); ok {
if dataMap["id"] != nil && filterMap["id"] != nil {
if !db.compareIDs(dataMap["id"], filterMap["id"]) {
return nil, fmt.Errorf("you can not update record id with filter by another id. Use delete and insert instead")
}
}
}
// Проверяем уникальность полей из UniqueFields
if err := db.checkUniqueFieldsUpdate(dataMap, filter, collectionName, options); err != nil {
return nil, err
}
}
// Проверяем партиционирование
if db.isCollectionPartitioned(collectionName) {
return db.updatePartitioned(data, collectionName, filter, options)
}
// Обычное обновление
adapter, exists := db.adapters[collectionName]
if !exists {
return nil, fmt.Errorf("collection %s not found", collectionName)
}
return adapter.Update(data, filter, options)
}
// Delete удаляет записи из коллекции
func (db *LineDb) Delete(data any, collectionName string, options LineDbAdapterOptions) ([]any, error) {
// Блокируем только если не в транзакции
if !options.InTransaction {
db.mutex.Lock()
defer db.mutex.Unlock()
}
if collectionName == "" {
collectionName = db.getFirstCollection()
}
// Проверяем партиционирование
if db.isCollectionPartitioned(collectionName) {
return db.deletePartitioned(data, collectionName, options)
}
// Обычное удаление
adapter, exists := db.adapters[collectionName]
if !exists {
return nil, fmt.Errorf("collection %s not found", collectionName)
}
return adapter.Delete(data, options)
}
// Select выполняет выборку с поддержкой цепочки
func (db *LineDb) Select(filter any, collectionName string, options LineDbAdapterOptions) (any, error) {
result, err := db.ReadByFilter(filter, collectionName, options)
if err != nil {
return nil, err
}
if options.ReturnChain {
return NewCollectionChain(result), nil
}
return result, nil
}
// ReadByFilter читает записи по фильтру
func (db *LineDb) ReadByFilter(filter any, collectionName string, options LineDbAdapterOptions) ([]any, error) {
if !options.InTransaction {
db.mutex.RLock()
defer db.mutex.RUnlock()
}
if collectionName == "" {
collectionName = db.getFirstCollection()
}
// Проверяем кэш
if db.cacheExternal != nil && !options.InTransaction {
if cached, exists := db.cacheExternal.Get(db.generateCacheKey(filter, collectionName)); exists {
if cachedArray, ok := cached.([]any); ok {
return cachedArray, nil
}
}
}
// Проверяем партиционирование
if db.isCollectionPartitioned(collectionName) {
return db.readByFilterPartitioned(filter, collectionName, options)
}
// Обычная фильтрация
adapter, exists := db.adapters[collectionName]
if !exists {
return nil, fmt.Errorf("collection %s not found", collectionName)
}
result, err := adapter.ReadByFilter(filter, options)
if err != nil {
return nil, err
}
// Обновляем кэш
if db.cacheExternal != nil && !options.InTransaction {
db.cacheExternal.Set(db.generateCacheKey(filter, collectionName), result)
}
return result, nil
}
// NextID генерирует следующий ID
func (db *LineDb) NextID(data any, collectionName string) (any, error) {
if db.nextIDFn != nil {
return db.nextIDFn(data, collectionName)
}
// Используем LastIDManager по умолчанию
lastID := db.lastIDManager.GetLastID(collectionName)
newID := lastID + 1
db.lastIDManager.SetLastID(collectionName, newID)
return newID, nil
}
// LastSequenceID возвращает последний последовательный ID
func (db *LineDb) LastSequenceID(collectionName string) int {
if collectionName == "" {
collectionName = db.getFirstCollection()
}
return db.lastIDManager.GetLastID(collectionName)
}
// ClearCache очищает кэш
func (db *LineDb) ClearCache(collectionName string, options LineDbAdapterOptions) error {
// Блокируем только если не в транзакции
if !options.InTransaction {
db.mutex.Lock()
defer db.mutex.Unlock()
}
if db.cacheExternal != nil {
if collectionName == "" {
db.cacheExternal.Clear()
} else {
// Очищаем только записи для конкретной коллекции
// Это упрощенная реализация
db.cacheExternal.Clear()
}
}
return nil
}
// Close закрывает базу данных
func (db *LineDb) Close() {
db.mutex.Lock()
defer db.mutex.Unlock()
// Закрываем все адаптеры
for _, adapter := range db.adapters {
adapter.Destroy()
}
// Останавливаем и очищаем кэш
if db.cacheExternal != nil {
db.cacheExternal.Stop()
db.cacheExternal.Clear()
}
// Очищаем карты
db.adapters = make(map[string]*JSONLFile)
db.collections = make(map[string]string)
db.partitionFunctions = make(map[string]func(any) string)
}
// Вспомогательные методы
func (db *LineDb) getFirstCollection() string {
for name := range db.adapters {
return name
}
return ""
}
func (db *LineDb) getBaseCollectionName(collectionName string) string {
if idx := strings.Index(collectionName, "_"); idx != -1 {
return collectionName[:idx]
}
return collectionName
}
// getCollectionOptions возвращает опции коллекции (для партиционированных — опции базовой коллекции)
func (db *LineDb) getCollectionOptions(collectionName string) *JSONLFileOptions {
if db.initOptions == nil {
return nil
}
baseName := db.getBaseCollectionName(collectionName)
for i := range db.initOptions.Collections {
opts := &db.initOptions.Collections[i]
if opts.CollectionName == collectionName || opts.CollectionName == baseName {
return opts
}
}
return nil
}
// isValueEmpty проверяет, считается ли значение "пустым" (пропускаем проверку уникальности для пустых)
func (db *LineDb) isValueEmpty(v any) bool {
if v == nil {
return true
}
if s, ok := v.(string); ok && s == "" {
return true
}
return false
}
// checkUniqueFieldsInsert проверяет уникальность полей при вставке
func (db *LineDb) checkUniqueFieldsInsert(itemMap map[string]any, collectionName string, resultDataArray []any, options LineDbAdapterOptions) error {
if options.SkipCheckExistingForWrite {
return nil
}
opts := db.getCollectionOptions(collectionName)
if opts == nil || len(opts.UniqueFields) == 0 {
return nil
}
for _, fieldName := range opts.UniqueFields {
value := itemMap[fieldName]
if db.isValueEmpty(value) {
continue
}
// Проверяем в batch (уже добавляемые записи)
for _, resultItem := range resultDataArray {
if resultMap, ok := resultItem.(map[string]any); ok {
if db.valuesMatch(resultMap[fieldName], value, true) {
return fmt.Errorf("unique constraint violation: field %q value %v already exists in collection %q",
fieldName, value, collectionName)
}
}
}
// Проверяем в БД (при Insert записи ещё нет, поэтому любое совпадение — конфликт)
filter := map[string]any{fieldName: value}
existing, err := db.ReadByFilter(filter, collectionName, LineDbAdapterOptions{InTransaction: true})
if err != nil {
return fmt.Errorf("failed to check unique field %q: %w", fieldName, err)
}
if len(existing) > 0 {
return fmt.Errorf("unique constraint violation: field %q value %v already exists in collection %q",
fieldName, value, collectionName)
}
}
return nil
}
// checkUniqueFieldsUpdate проверяет уникальность полей при обновлении
func (db *LineDb) checkUniqueFieldsUpdate(data map[string]any, filter any, collectionName string, options LineDbAdapterOptions) error {
if options.SkipCheckExistingForWrite {
return nil
}
opts := db.getCollectionOptions(collectionName)
if opts == nil || len(opts.UniqueFields) == 0 {
return nil
}
recordsToUpdate, err := db.ReadByFilter(filter, collectionName, LineDbAdapterOptions{InTransaction: true})
if err != nil {
return fmt.Errorf("failed to read records for update: %w", err)
}
updatingIDs := make(map[any]bool)
for _, rec := range recordsToUpdate {
if m, ok := rec.(map[string]any); ok && m["id"] != nil {
updatingIDs[m["id"]] = true
}
}
for _, fieldName := range opts.UniqueFields {
value, inData := data[fieldName]
if !inData || db.isValueEmpty(value) {
continue
}
existing, err := db.ReadByFilter(map[string]any{fieldName: value}, collectionName, LineDbAdapterOptions{InTransaction: true})
if err != nil {
return fmt.Errorf("failed to check unique field %q: %w", fieldName, err)
}
for _, rec := range existing {
if recMap, ok := rec.(map[string]any); ok {
if updatingIDs[recMap["id"]] {
continue
}
return fmt.Errorf("unique constraint violation: field %q value %v already exists in collection %q",
fieldName, value, collectionName)
}
}
}
return nil
}
func (db *LineDb) isCollectionPartitioned(collectionName string) bool {
_, exists := db.partitionFunctions[collectionName]
return exists
}
func (db *LineDb) getPartitionFiles(collectionName string) ([]string, error) {
baseName := db.getBaseCollectionName(collectionName)
var files []string
for name, filename := range db.collections {
if strings.HasPrefix(name, baseName+"_") {
files = append(files, filename)
}
}
return files, nil
}
func (db *LineDb) getPartitionAdapter(data any, collectionName string) (*JSONLFile, error) {
partitionFn, exists := db.partitionFunctions[collectionName]
if !exists {
return nil, fmt.Errorf("partition function not found for collection %s", collectionName)
}
partitionID := partitionFn(data)
partitionName := fmt.Sprintf("%s_%s", collectionName, partitionID)
adapter, exists := db.adapters[partitionName]
if !exists {
// Создаем новый адаптер для партиции
filename := filepath.Join(db.initOptions.DBFolder, partitionName+".jsonl")
adapter = NewJSONLFile(filename, "", JSONLFileOptions{CollectionName: partitionName})
if err := adapter.Init(false, LineDbAdapterOptions{}); err != nil {
return nil, fmt.Errorf("failed to init partition adapter: %w", err)
}
db.adapters[partitionName] = adapter
db.collections[partitionName] = filename
}
return adapter, nil
}
func (db *LineDb) GetMaxID(records []any) int {
maxID := 0
for _, record := range records {
if recordMap, ok := record.(map[string]any); ok {
if id, ok := recordMap["id"]; ok {
if idInt, ok := id.(int); ok && idInt > maxID {
maxID = idInt
}
}
}
}
return maxID
}
func (db *LineDb) matchesFilter(record any, filter any, strictCompare bool) bool {
if recordMap, ok := record.(map[string]any); ok {
if filterMap, ok := filter.(map[string]any); ok {
for key, filterValue := range filterMap {
if recordValue, exists := recordMap[key]; exists {
if !db.valuesMatch(recordValue, filterValue, strictCompare) {
return false
}
} else if strictCompare {
return false
}
}
return true
}
}
return false
}
func (db *LineDb) valuesMatch(a, b any, strictCompare bool) bool {
if strictCompare {
return a == b
}
// Нестрогое сравнение
if a == b {
return true
}
// Сравнение строк
if aStr, ok := a.(string); ok {
if bStr, ok := b.(string); ok {
return strings.EqualFold(aStr, bStr)
}
}
// Сравнение чисел
if aNum, ok := db.toNumber(a); ok {
if bNum, ok := db.toNumber(b); ok {
return aNum == bNum
}
}
return false
}
func (db *LineDb) toNumber(value any) (float64, bool) {
switch v := value.(type) {
case int:
return float64(v), true
case int64:
return float64(v), true
case float64:
return v, true
case string:
if f, err := strconv.ParseFloat(v, 64); err == nil {
return f, true
}
}
return 0, false
}
func (db *LineDb) normalizeDataArray(data any) []any {
switch v := data.(type) {
case []any:
return v
case any:
return []any{v}
default:
return []any{data}
}
}
func (db *LineDb) isInvalidID(id any) bool {
if id == nil {
return true
}
if idNum, ok := id.(int); ok {
return idNum <= -1
}
return false
}
func (db *LineDb) compareIDs(a, b any) bool {
return a == b
}
func (db *LineDb) generateCacheKey(filter any, collectionName string) string {
// Упрощенная реализация генерации ключа кэша
return fmt.Sprintf("%s:%v", collectionName, filter)
}
func (db *LineDb) updatePartitioned(data any, collectionName string, filter any, options LineDbAdapterOptions) ([]any, error) {
// Получаем все партиции
partitionFiles, err := db.getPartitionFiles(collectionName)
if err != nil {
return nil, err
}
var allResults []any
for _, filename := range partitionFiles {
// Находим адаптер по имени файла
var adapter *JSONLFile
for name, adapterFile := range db.collections {
if adapterFile == filename {
adapter = db.adapters[name]
break
}
}
if adapter != nil {
results, err := adapter.Update(data, filter, options)
if err != nil {
return nil, err
}
allResults = append(allResults, results...)
}
}
return allResults, nil
}
func (db *LineDb) deletePartitioned(data any, collectionName string, options LineDbAdapterOptions) ([]any, error) {
// Получаем все партиции
partitionFiles, err := db.getPartitionFiles(collectionName)
if err != nil {
return nil, err
}
var allResults []any
for _, filename := range partitionFiles {
// Находим адаптер по имени файла
var adapter *JSONLFile
for name, adapterFile := range db.collections {
if adapterFile == filename {
adapter = db.adapters[name]
break
}
}
if adapter != nil {
results, err := adapter.Delete(data, options)
if err != nil {
return nil, err
}
allResults = append(allResults, results...)
}
}
return allResults, nil
}
func (db *LineDb) readByFilterPartitioned(filter any, collectionName string, options LineDbAdapterOptions) ([]any, error) {
// Получаем все партиции
partitionFiles, err := db.getPartitionFiles(collectionName)
if err != nil {
return nil, err
}
var allResults []any
for _, filename := range partitionFiles {
// Находим адаптер по имени файла
var adapter *JSONLFile
for name, adapterFile := range db.collections {
if adapterFile == filename {
adapter = db.adapters[name]
break
}
}
if adapter != nil {
results, err := adapter.ReadByFilter(filter, options)
if err != nil {
return nil, err
}
allResults = append(allResults, results...)
}
}
return allResults, nil
}
// Добавляем недостающие методы
// SelectWithPagination выполняет выборку с пагинацией
func (db *LineDb) SelectWithPagination(filter any, page, limit int, collectionName string, options LineDbAdapterOptions) (*PaginatedResult, error) {
if page < 1 {
page = 1
}
if limit < 1 {
limit = 20
}
// Получаем все данные
allData, err := db.ReadByFilter(filter, collectionName, options)
if err != nil {
return nil, err
}
total := len(allData)
pages := (total + limit - 1) / limit // Округление вверх
// Вычисляем индексы для пагинации
start := (page - 1) * limit
end := start + limit
if end > total {
end = total
}
var data []any
if start < total {
data = allData[start:end]
}
return &PaginatedResult{
Data: data,
Total: total,
Limit: limit,
Pages: pages,
Page: page,
}, nil
}
// Join выполняет операцию JOIN между коллекциями
func (db *LineDb) Join(leftCollection, rightCollection any, options JoinOptions) (*CollectionChain, error) {
// Получаем данные левой коллекции
var leftData []any
switch v := leftCollection.(type) {
case string:
data, err := db.Read(v, LineDbAdapterOptions{InTransaction: options.InTransaction})
if err != nil {
return nil, err
}
leftData = data
case []any:
leftData = v
default:
return nil, fmt.Errorf("invalid left collection type")
}
// Получаем данные правой коллекции
var rightData []any
switch v := rightCollection.(type) {
case string:
data, err := db.Read(v, LineDbAdapterOptions{InTransaction: options.InTransaction})
if err != nil {
return nil, err
}
rightData = data
case []any:
rightData = v
default:
return nil, fmt.Errorf("invalid right collection type")
}
// Применяем фильтры
if options.LeftFilter != nil {
leftData = db.applyFilter(leftData, options.LeftFilter, options.StrictCompare)
}
if options.RightFilter != nil {
rightData = db.applyFilter(rightData, options.RightFilter, options.StrictCompare)
}
// Выполняем JOIN
var result []any
switch options.Type {
case JoinTypeInner:
result = db.innerJoin(leftData, rightData, options)
case JoinTypeLeft:
result = db.leftJoin(leftData, rightData, options)
case JoinTypeRight:
result = db.rightJoin(leftData, rightData, options)
case JoinTypeFull:
result = db.fullJoin(leftData, rightData, options)
default:
return nil, fmt.Errorf("unsupported join type: %s", options.Type)
}
return NewCollectionChain(result), nil
}
func (db *LineDb) applyFilter(data []any, filter map[string]any, strictCompare bool) []any {
var result []any
for _, item := range data {
if db.matchesFilter(item, filter, strictCompare) {
result = append(result, item)
}
}
return result
}
func (db *LineDb) innerJoin(leftData, rightData []any, options JoinOptions) []any {
var result []any
for _, left := range leftData {
for _, right := range rightData {
if db.matchJoinFields(left, right, options.LeftFields, options.RightFields, options.StrictCompare) {
result = append(result, JoinResult{Left: left, Right: right})
if options.OnlyOneFromRight {
break
}
}
}
}
return result
}
func (db *LineDb) leftJoin(leftData, rightData []any, options JoinOptions) []any {
var result []any
for _, left := range leftData {
matched := false
for _, right := range rightData {
if db.matchJoinFields(left, right, options.LeftFields, options.RightFields, options.StrictCompare) {
result = append(result, JoinResult{Left: left, Right: right})
matched = true
if options.OnlyOneFromRight {
break
}
}
}
if !matched {
result = append(result, JoinResult{Left: left, Right: nil})
}
}
return result
}
func (db *LineDb) rightJoin(leftData, rightData []any, options JoinOptions) []any {
var result []any
for _, right := range rightData {
matched := false
for _, left := range leftData {
if db.matchJoinFields(left, right, options.LeftFields, options.RightFields, options.StrictCompare) {
result = append(result, JoinResult{Left: left, Right: right})
matched = true
if options.OnlyOneFromRight {
break
}
}
}
if !matched {
result = append(result, JoinResult{Left: nil, Right: right})
}
}
return result
}
func (db *LineDb) fullJoin(leftData, rightData []any, options JoinOptions) []any {
// Объединяем LEFT и RIGHT JOIN
leftResult := db.leftJoin(leftData, rightData, options)
rightResult := db.rightJoin(leftData, rightData, options)
// Удаляем дубликаты
seen := make(map[string]bool)
var result []any
for _, item := range append(leftResult, rightResult...) {
key := db.generateJoinKey(item)
if !seen[key] {
seen[key] = true
result = append(result, item)
}
}
return result
}
func (db *LineDb) matchJoinFields(left, right any, leftFields, rightFields []string, strictCompare bool) bool {
if len(leftFields) != len(rightFields) {
return false
}
leftMap, leftOk := left.(map[string]any)
rightMap, rightOk := right.(map[string]any)
if !leftOk || !rightOk {
return false
}
for i, leftField := range leftFields {
rightField := rightFields[i]
leftValue := leftMap[leftField]
rightValue := rightMap[rightField]
if !db.valuesMatch(leftValue, rightValue, strictCompare) {
return false
}
}
return true
}
func (db *LineDb) generateJoinKey(item any) string {
// Упрощенная реализация генерации ключа для JOIN
if joinResult, ok := item.(JoinResult); ok {
return fmt.Sprintf("%v:%v", joinResult.Left, joinResult.Right)
}
return fmt.Sprintf("%v", item)
}
// Getter методы для совместимости с TypeScript версией
func (db *LineDb) GetActualCacheSize() int {
if db.cacheExternal != nil {
return db.cacheExternal.Size()
}
return 0
}
func (db *LineDb) GetLimitCacheSize() int {
return db.cacheSize
}
func (db *LineDb) GetCacheMap() map[string]*CacheEntry {
if db.cacheExternal != nil {
return db.cacheExternal.GetFlatCacheMap()
}
return make(map[string]*CacheEntry)
}
func (db *LineDb) GetFirstCollection() string {
return db.getFirstCollection()
}