Files
elowdb-go/pkg/linedb/jsonl_file.go

485 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package linedb
import (
"bufio"
"crypto/sha256"
"encoding/base64"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
goccyjson "github.com/goccy/go-json"
)
// Функции сериализации по умолчанию (используют go-json)
func defaultJSONMarshal(v any) ([]byte, error) {
return goccyjson.Marshal(v)
}
func defaultJSONUnmarshal(data []byte, v any) error {
return goccyjson.Unmarshal(data, v)
}
// JSONLFile представляет адаптер для работы с JSONL файлами
type JSONLFile struct {
filename string
cypherKey string
allocSize int
collectionName string
hashFilename string
options JSONLFileOptions
initialized bool
inTransaction bool
transaction *Transaction
mutex sync.RWMutex
selectCache map[string]any
events map[string][]func(any)
// Функции сериализации
jsonMarshal func(any) ([]byte, error)
jsonUnmarshal func([]byte, any) error
}
// NewJSONLFile создает новый экземпляр JSONLFile
func NewJSONLFile(filename string, cypherKey string, options JSONLFileOptions) *JSONLFile {
hash := sha256.Sum256([]byte(filename))
hashFilename := fmt.Sprintf("%x", hash)
collectionName := options.CollectionName
if collectionName == "" {
collectionName = hashFilename
}
allocSize := options.AllocSize
if allocSize == 0 {
allocSize = 256
}
// Определяем функции сериализации
jsonMarshal := defaultJSONMarshal
jsonUnmarshal := defaultJSONUnmarshal
// Используем пользовательские функции если они предоставлены
if options.JSONMarshal != nil {
jsonMarshal = options.JSONMarshal
}
if options.JSONUnmarshal != nil {
jsonUnmarshal = options.JSONUnmarshal
}
return &JSONLFile{
filename: filename,
cypherKey: cypherKey,
allocSize: allocSize,
collectionName: collectionName,
hashFilename: hashFilename,
options: options,
selectCache: make(map[string]any),
events: make(map[string][]func(any)),
jsonMarshal: jsonMarshal,
jsonUnmarshal: jsonUnmarshal,
}
}
// Init инициализирует файл
func (j *JSONLFile) Init(force bool, options LineDbAdapterOptions) error {
j.mutex.Lock()
defer j.mutex.Unlock()
if j.initialized && !force {
return nil
}
// Создаем директорию если не существует
dir := filepath.Dir(j.filename)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create directory: %w", err)
}
// Создаем файл если не существует
if _, err := os.Stat(j.filename); os.IsNotExist(err) {
file, err := os.Create(j.filename)
if err != nil {
return fmt.Errorf("failed to create file: %w", err)
}
file.Close()
}
j.initialized = true
return nil
}
// Read читает все записи из файла
func (j *JSONLFile) Read(options LineDbAdapterOptions) ([]any, error) {
j.mutex.RLock()
defer j.mutex.RUnlock()
if !j.initialized {
return nil, fmt.Errorf("file not initialized")
}
file, err := os.Open(j.filename)
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()
var records []any
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
// Расшифровываем если нужно
if j.cypherKey != "" {
decoded, err := base64.StdEncoding.DecodeString(line)
if err != nil {
if j.options.SkipInvalidLines {
continue
}
return nil, fmt.Errorf("failed to decode base64: %w", err)
}
line = string(decoded)
}
var record any
if err := j.jsonUnmarshal([]byte(line), &record); err != nil {
if j.options.SkipInvalidLines {
continue
}
return nil, fmt.Errorf("failed to unmarshal JSON: %w", err)
}
records = append(records, record)
}
return records, scanner.Err()
}
// Write записывает данные в файл
func (j *JSONLFile) Write(data any, options LineDbAdapterOptions) error {
j.mutex.Lock()
defer j.mutex.Unlock()
if !j.initialized {
return fmt.Errorf("file not initialized")
}
records, ok := data.([]any)
if !ok {
records = []any{data}
}
file, err := os.OpenFile(j.filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("failed to open file for writing: %w", err)
}
defer file.Close()
for _, record := range records {
jsonData, err := j.jsonMarshal(record)
if err != nil {
return fmt.Errorf("failed to marshal JSON: %w", err)
}
line := string(jsonData)
// Шифруем если нужно
if j.cypherKey != "" {
line = base64.StdEncoding.EncodeToString([]byte(line))
}
// Дополняем до allocSize
if len(line) < j.allocSize {
line += strings.Repeat(" ", j.allocSize-len(line)-1)
}
if _, err := file.WriteString(line + "\n"); err != nil {
return fmt.Errorf("failed to write line: %w", err)
}
}
return nil
}
// Insert вставляет новые записи
func (j *JSONLFile) Insert(data any, options LineDbAdapterOptions) ([]any, error) {
records, ok := data.([]any)
if !ok {
records = []any{data}
}
// Генерируем ID если нужно
for i, record := range records {
if recordMap, ok := record.(map[string]any); ok {
if recordMap["id"] == nil || recordMap["id"] == "" {
recordMap["id"] = time.Now().UnixNano()
records[i] = recordMap
}
}
}
if err := j.Write(records, options); err != nil {
return nil, err
}
return records, nil
}
// ReadByFilter читает записи по фильтру
func (j *JSONLFile) ReadByFilter(filter any, options LineDbAdapterOptions) ([]any, error) {
allRecords, err := j.Read(options)
if err != nil {
return nil, err
}
var filteredRecords []any
for _, record := range allRecords {
if j.matchesFilter(record, filter, options.StrictCompare) {
filteredRecords = append(filteredRecords, record)
}
}
return filteredRecords, nil
}
// Update обновляет записи
func (j *JSONLFile) Update(data any, filter any, options LineDbAdapterOptions) ([]any, error) {
// Читаем все записи
allRecords, err := j.Read(options)
if err != nil {
return nil, err
}
// Фильтруем записи для обновления
var recordsToUpdate []any
updateData, ok := data.(map[string]any)
if !ok {
return nil, fmt.Errorf("update data must be a map")
}
for _, record := range allRecords {
if j.matchesFilter(record, filter, options.StrictCompare) {
// Обновляем запись
if recordMap, ok := record.(map[string]any); ok {
for key, value := range updateData {
recordMap[key] = value
}
recordsToUpdate = append(recordsToUpdate, recordMap)
}
}
}
// Перезаписываем файл
if err := j.rewriteFile(allRecords); err != nil {
return nil, err
}
return recordsToUpdate, nil
}
// Delete удаляет записи
func (j *JSONLFile) Delete(data any, options LineDbAdapterOptions) ([]any, error) {
// Читаем все записи
allRecords, err := j.Read(options)
if err != nil {
return nil, err
}
var remainingRecords []any
var deletedRecords []any
for _, record := range allRecords {
if j.matchesFilter(record, data, options.StrictCompare) {
deletedRecords = append(deletedRecords, record)
} else {
remainingRecords = append(remainingRecords, record)
}
}
// Перезаписываем файл
if err := j.rewriteFile(remainingRecords); err != nil {
return nil, err
}
return deletedRecords, nil
}
// GetFilename возвращает имя файла
func (j *JSONLFile) GetFilename() string {
return j.filename
}
// GetCollectionName возвращает имя коллекции
func (j *JSONLFile) GetCollectionName() string {
return j.collectionName
}
// GetOptions возвращает опции
func (j *JSONLFile) GetOptions() JSONLFileOptions {
return j.options
}
// GetEncryptKey возвращает ключ шифрования
func (j *JSONLFile) GetEncryptKey() string {
return j.cypherKey
}
// matchesFilter проверяет соответствие записи фильтру
func (j *JSONLFile) matchesFilter(record any, filter any, strictCompare bool) bool {
if filter == nil {
return true
}
switch f := filter.(type) {
case string:
// Простая проверка по строке
recordStr := fmt.Sprintf("%v", record)
if strictCompare {
return recordStr == f
}
return strings.Contains(strings.ToLower(recordStr), strings.ToLower(f))
case map[string]any:
// Проверка по полям
if recordMap, ok := record.(map[string]any); ok {
for key, filterValue := range f {
recordValue, exists := recordMap[key]
if !exists {
return false
}
if !j.valuesMatch(recordValue, filterValue, strictCompare) {
return false
}
}
return true
}
case func(any) bool:
// Функция фильтрации
return f(record)
}
return false
}
// valuesMatch сравнивает значения
func (j *JSONLFile) valuesMatch(a, b any, strictCompare bool) bool {
if strictCompare {
return a == b
}
// Для строк - нечувствительное к регистру сравнение
if aStr, ok := a.(string); ok {
if bStr, ok := b.(string); ok {
return strings.Contains(strings.ToLower(aStr), strings.ToLower(bStr))
}
}
return a == b
}
// rewriteFile перезаписывает файл новыми данными
func (j *JSONLFile) rewriteFile(records []any) error {
// Создаем временный файл
tempFile := j.filename + ".tmp"
file, err := os.Create(tempFile)
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
defer file.Close()
// Записываем данные во временный файл
for _, record := range records {
jsonData, err := j.jsonMarshal(record)
if err != nil {
return fmt.Errorf("failed to marshal JSON: %w", err)
}
line := string(jsonData)
// Шифруем если нужно
if j.cypherKey != "" {
line = base64.StdEncoding.EncodeToString([]byte(line))
}
// Дополняем до allocSize
if len(line) < j.allocSize {
line += strings.Repeat(" ", j.allocSize-len(line)-1)
}
if _, err := file.WriteString(line + "\n"); err != nil {
return fmt.Errorf("failed to write line: %w", err)
}
}
// Заменяем оригинальный файл
if err := os.Rename(tempFile, j.filename); err != nil {
return fmt.Errorf("failed to rename temp file: %w", err)
}
return nil
}
// Destroy очищает ресурсы
func (j *JSONLFile) Destroy() {
j.mutex.Lock()
defer j.mutex.Unlock()
j.initialized = false
j.selectCache = nil
j.events = nil
}
// WithTransaction выполняет операцию в транзакции
func (j *JSONLFile) WithTransaction(callback func(*JSONLFile, LineDbAdapterOptions) error, transactionOptions TransactionOptions, methodsOptions LineDbAdapterOptions) error {
// Создаем транзакцию
tx := NewTransaction("write", generateTransactionID(), transactionOptions.Timeout, transactionOptions.Rollback)
// Создаем резервную копию если нужно
if transactionOptions.Rollback {
if err := tx.CreateBackup(j.filename); err != nil {
return fmt.Errorf("failed to create backup: %w", err)
}
}
j.transaction = tx
j.inTransaction = true
defer func() {
j.inTransaction = false
j.transaction = nil
}()
// Выполняем callback
if err := callback(j, methodsOptions); err != nil {
// Откатываем изменения если нужно
if transactionOptions.Rollback {
if restoreErr := tx.RestoreFromBackup(j.filename); restoreErr != nil {
return fmt.Errorf("failed to restore from backup: %w", restoreErr)
}
}
return err
}
// Очищаем резервную копию
if err := tx.CleanupBackup(); err != nil {
return fmt.Errorf("failed to cleanup backup: %w", err)
}
return nil
}
// generateTransactionID генерирует ID транзакции
func generateTransactionID() string {
return fmt.Sprintf("tx_%d", time.Now().UnixNano())
}