Files
elowdb-go/pkg/linedb/jsonl_file.go
2026-04-09 15:55:09 +06:00

1216 lines
32 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package linedb
import (
"bufio"
"bytes"
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"crypto/sha256"
"encoding/base64"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
goccyjson "github.com/goccy/go-json"
)
// Функции сериализации по умолчанию (используют go-json)
func defaultJSONMarshal(v any) ([]byte, error) {
return goccyjson.Marshal(v)
}
func defaultJSONUnmarshal(data []byte, v any) error {
return goccyjson.Unmarshal(data, v)
}
// makeKeyOrderMarshal создаёт JSONMarshal с заданным порядком ключей
func makeKeyOrderMarshal(keyOrder []KeyOrder) func(any) ([]byte, error) {
return func(v any) ([]byte, error) {
if m, ok := v.(map[string]any); ok {
return marshalMapSorted(m, keyOrder)
}
return goccyjson.Marshal(v)
}
}
func marshalMapSorted(m map[string]any, keyOrder []KeyOrder) ([]byte, error) {
orderMap := make(map[string]int)
for _, ko := range keyOrder {
orderMap[ko.Key] = ko.Order
}
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Slice(keys, func(i, j int) bool {
oi, hasI := orderMap[keys[i]]
oj, hasJ := orderMap[keys[j]]
group := func(o int, has bool) int {
if !has {
return 1
}
if o >= 0 {
return 0
}
return 2
}
gi, gj := group(oi, hasI), group(oj, hasJ)
if gi != gj {
return gi < gj
}
switch gi {
case 0:
return oi < oj
case 1:
return keys[i] < keys[j]
default:
return oi < oj
}
})
var buf bytes.Buffer
buf.WriteByte('{')
for i, k := range keys {
if i > 0 {
buf.WriteByte(',')
}
keyEscaped, _ := goccyjson.Marshal(k)
buf.Write(keyEscaped)
buf.WriteByte(':')
val := m[k]
if nested, ok := val.(map[string]any); ok {
valBytes, err := marshalMapSorted(nested, keyOrder)
if err != nil {
return nil, err
}
buf.Write(valBytes)
} else {
valBytes, err := goccyjson.Marshal(val)
if err != nil {
return nil, err
}
buf.Write(valBytes)
}
}
buf.WriteByte('}')
return buf.Bytes(), nil
}
// encodeKeyBytes превращает строку ключа в 32 байта (SHA256) для AES-256
func encodeKeyBytes(keyStr string) []byte {
h := sha256.Sum256([]byte(keyStr))
return h[:]
}
// aesGCMEncrypt — встроенное AES-256-GCM шифрование
func aesGCMEncrypt(plaintext, key []byte) ([]byte, error) {
block, err := aes.NewCipher(key)
if err != nil {
return nil, err
}
aesgcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
nonce := make([]byte, aesgcm.NonceSize())
if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
return nil, err
}
return aesgcm.Seal(nonce, nonce, plaintext, nil), nil
}
// aesGCMDecrypt — встроенная расшифровка AES-256-GCM
func aesGCMDecrypt(ciphertext, key []byte) ([]byte, error) {
block, err := aes.NewCipher(key)
if err != nil {
return nil, err
}
aesgcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
nonceSize := aesgcm.NonceSize()
if len(ciphertext) < nonceSize {
return nil, fmt.Errorf("ciphertext too short")
}
nonce, ciphertext := ciphertext[:nonceSize], ciphertext[nonceSize:]
return aesgcm.Open(nil, nonce, ciphertext, nil)
}
// makeEncodedMarshal оборачивает marshal в шифрование: marshal -> encrypt -> base64
func makeEncodedMarshal(marshal func(any) ([]byte, error), encFn func([]byte, []byte) ([]byte, error), key []byte) func(any) ([]byte, error) {
return func(v any) ([]byte, error) {
jsonData, err := marshal(v)
if err != nil {
return nil, err
}
encrypted, err := encFn(jsonData, key)
if err != nil {
return nil, err
}
return []byte(base64.StdEncoding.EncodeToString(encrypted)), nil
}
}
// makeEncodedUnmarshal оборачивает unmarshal в дешифрование: base64 -> decrypt -> unmarshal
func makeEncodedUnmarshal(unmarshal func([]byte, any) error, decFn func([]byte, []byte) ([]byte, error), key []byte) func([]byte, any) error {
return func(data []byte, v any) error {
encrypted, err := base64.StdEncoding.DecodeString(string(data))
if err != nil {
return err
}
plaintext, err := decFn(encrypted, key)
if err != nil {
return err
}
return unmarshal(plaintext, v)
}
}
// JSONLFile представляет адаптер для работы с JSONL файлами
type JSONLFile struct {
filename string
cypherKey string
allocSize int
collectionName string
hashFilename string
options JSONLFileOptions
initialized bool
inTransaction bool
transaction *Transaction
mutex sync.RWMutex
selectCache map[string]any
events map[string][]func(any)
// Функции сериализации
jsonMarshal func(any) ([]byte, error)
jsonUnmarshal func([]byte, any) error
}
// NewJSONLFile создает новый экземпляр JSONLFile
func NewJSONLFile(filename string, cypherKey string, options JSONLFileOptions) *JSONLFile {
hash := sha256.Sum256([]byte(filename))
hashFilename := fmt.Sprintf("%x", hash)
collectionName := options.CollectionName
if collectionName == "" {
collectionName = hashFilename
}
allocSize := options.AllocSize
if allocSize == 0 {
allocSize = 256
}
// Определяем функции сериализации
jsonMarshal := defaultJSONMarshal
jsonUnmarshal := defaultJSONUnmarshal
if options.JSONMarshal != nil {
jsonMarshal = options.JSONMarshal
} else if len(options.KeyOrder) > 0 {
jsonMarshal = makeKeyOrderMarshal(options.KeyOrder)
}
if options.JSONUnmarshal != nil {
jsonUnmarshal = options.JSONUnmarshal
}
// Encode: оборачиваем marshal/unmarshal в шифрование
if options.Encode && options.EncodeKey != "" {
key := encodeKeyBytes(options.EncodeKey)
encFn := options.EncryptFn
decFn := options.DecryptFn
if encFn == nil {
encFn = aesGCMEncrypt
}
if decFn == nil {
decFn = aesGCMDecrypt
}
jsonMarshal = makeEncodedMarshal(jsonMarshal, encFn, key)
jsonUnmarshal = makeEncodedUnmarshal(jsonUnmarshal, decFn, key)
}
return &JSONLFile{
filename: filename,
cypherKey: cypherKey,
allocSize: allocSize,
collectionName: collectionName,
hashFilename: hashFilename,
options: options,
selectCache: make(map[string]any),
events: make(map[string][]func(any)),
jsonMarshal: jsonMarshal,
jsonUnmarshal: jsonUnmarshal,
}
}
// Init инициализирует файл
func (j *JSONLFile) Init(force bool, options LineDbAdapterOptions) error {
if !options.InTransaction {
j.mutex.Lock()
defer j.mutex.Unlock()
}
if j.initialized && !force {
return nil
}
// Создаем директорию если не существует
dir := filepath.Dir(j.filename)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create directory: %w", err)
}
// Создаем файл если не существует
if st, err := os.Stat(j.filename); os.IsNotExist(err) {
file, err := os.Create(j.filename)
if err != nil {
return fmt.Errorf("failed to create file: %w", err)
}
file.Close()
} else if err == nil && st.Size() > 0 {
// Файл существует и не пустой — проверяем и нормализуем записи
if err := j.normalizeExistingFile(); err != nil {
return err
}
}
j.initialized = true
return nil
}
// normalizeExistingFile проверяет размеры записей и приводит к allocSize-1
func (j *JSONLFile) normalizeExistingFile() error {
data, err := os.ReadFile(j.filename)
if err != nil {
return fmt.Errorf("failed to read file: %w", err)
}
lines := strings.Split(strings.TrimSuffix(string(data), "\n"), "\n")
var nonEmpty []string
for _, l := range lines {
if l == "" {
continue
}
nonEmpty = append(nonEmpty, l)
}
if len(nonEmpty) == 0 {
return nil
}
// Максимальная длина среди записей
maxLen := 0
for _, l := range nonEmpty {
if len(l) > maxLen {
maxLen = len(l)
}
}
// Приводим короткие к maxLen (добавляем пробелы)
for i, l := range nonEmpty {
if len(l) < maxLen {
nonEmpty[i] = l + strings.Repeat(" ", maxLen-len(l))
}
}
targetLen := j.allocSize - 1
if targetLen < 1 {
targetLen = 1
}
if targetLen < maxLen {
// Уменьшаем: можно только если данные (без trailing spaces) помещаются
for i := range nonEmpty {
trimmed := strings.TrimRight(nonEmpty[i], " ")
if len(trimmed) > targetLen {
return fmt.Errorf("init failed: record data size %d exceeds configured allocSize-1 (%d), cannot reduce without data loss",
len(trimmed), targetLen)
}
nonEmpty[i] = trimmed + strings.Repeat(" ", targetLen-len(trimmed))
}
} else if targetLen > maxLen {
// Расширяем
for i := range nonEmpty {
nonEmpty[i] = nonEmpty[i] + strings.Repeat(" ", targetLen-len(nonEmpty[i]))
}
}
// Перезаписываем файл
var buf bytes.Buffer
for _, l := range nonEmpty {
buf.WriteString(l)
buf.WriteByte('\n')
}
return os.WriteFile(j.filename, buf.Bytes(), 0644)
}
// Read читает все записи из файла
func (j *JSONLFile) Read(options LineDbAdapterOptions) ([]any, error) {
if !options.InTransaction {
j.mutex.RLock()
defer j.mutex.RUnlock()
}
if !j.initialized {
return nil, fmt.Errorf("file not initialized")
}
file, err := os.Open(j.filename)
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()
var records []any
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
// Расшифровываем если нужно (только cypherKey; Encode обрабатывается в jsonUnmarshal)
if j.cypherKey != "" && !j.options.Encode {
decoded, err := base64.StdEncoding.DecodeString(line)
if err != nil {
if j.options.SkipInvalidLines {
continue
}
return nil, fmt.Errorf("failed to decode base64: %w", err)
}
line = string(decoded)
}
var record any
if err := j.jsonUnmarshal([]byte(line), &record); err != nil {
if j.options.SkipInvalidLines {
continue
}
return nil, fmt.Errorf("failed to unmarshal JSON: %w", err)
}
records = append(records, record)
}
return records, scanner.Err()
}
// ReadWithPhysicalLineIndexes как Read, но для каждой записи возвращает 0-based индекс строки (слота),
// в том же смысле, что ReadByLineIndexes/WriteAtLineIndexes: смещение в файле = lineIndex * allocSize.
// Пустые слоты (пробелы после удаления и т.п.) пропускаются; индекс — номер слота, а не порядковый номер записи.
func (j *JSONLFile) ReadWithPhysicalLineIndexes(options LineDbAdapterOptions) ([]any, []int, error) {
if !options.InTransaction {
j.mutex.RLock()
defer j.mutex.RUnlock()
}
if !j.initialized {
return nil, nil, fmt.Errorf("file not initialized")
}
if j.allocSize <= 0 {
return nil, nil, fmt.Errorf("invalid allocSize")
}
file, err := os.Open(j.filename)
if err != nil {
return nil, nil, fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return nil, nil, fmt.Errorf("stat file: %w", err)
}
nSlots := int(info.Size()) / j.allocSize
buf := make([]byte, j.allocSize)
var records []any
var lineIndexes []int
for slot := 0; slot < nSlots; slot++ {
n, err := io.ReadFull(file, buf)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
if err != nil {
return nil, nil, fmt.Errorf("read slot %d: %w", slot, err)
}
if n != j.allocSize {
break
}
line := string(buf[:n])
line = strings.TrimRight(line, "\n")
line = strings.TrimRight(line, " ")
if strings.TrimSpace(line) == "" {
continue
}
if j.cypherKey != "" && !j.options.Encode {
decoded, err := base64.StdEncoding.DecodeString(line)
if err != nil {
if j.options.SkipInvalidLines {
continue
}
return nil, nil, fmt.Errorf("failed to decode base64: %w", err)
}
line = string(decoded)
}
var record any
if err := j.jsonUnmarshal([]byte(line), &record); err != nil {
if j.options.SkipInvalidLines {
continue
}
return nil, nil, fmt.Errorf("failed to unmarshal JSON: %w", err)
}
records = append(records, record)
lineIndexes = append(lineIndexes, slot)
}
return records, lineIndexes, nil
}
// HasBlankSlots возвращает true, если в файле есть хотя бы один пустой слот
// (пробелы + перевод строки — типичный след точечного удаления BlankLinesAtPositions).
func (j *JSONLFile) HasBlankSlots(options LineDbAdapterOptions) (bool, error) {
if !options.InTransaction {
j.mutex.RLock()
defer j.mutex.RUnlock()
}
if !j.initialized {
return false, fmt.Errorf("file not initialized")
}
if j.allocSize <= 0 {
return false, fmt.Errorf("invalid allocSize")
}
file, err := os.Open(j.filename)
if err != nil {
return false, fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return false, fmt.Errorf("stat file: %w", err)
}
nSlots := int(info.Size()) / j.allocSize
buf := make([]byte, j.allocSize)
for slot := 0; slot < nSlots; slot++ {
n, err := io.ReadFull(file, buf)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
if err != nil {
return false, fmt.Errorf("read slot %d: %w", slot, err)
}
if n != j.allocSize {
break
}
line := string(buf[:n])
line = strings.TrimRight(line, "\n")
line = strings.TrimRight(line, " ")
if strings.TrimSpace(line) == "" {
return true, nil
}
}
return false, nil
}
// ReadByLineIndexes читает записи по номерам строк (0-based) с использованием random access.
// Ожидается, что файл нормализован и каждая строка имеет длину allocSize байт (включая \n),
// как это обеспечивает Init/normalizeExistingFile/rewriteFile.
func (j *JSONLFile) ReadByLineIndexes(indexes []int, options LineDbAdapterOptions) ([]any, error) {
// Внутри транзакций (options.InTransaction == true) блокировка внешним кодом
// уже обеспечена, повторный лок мог бы привести к дедлоку.
if !options.InTransaction {
j.mutex.RLock()
defer j.mutex.RUnlock()
}
if !j.initialized {
return nil, fmt.Errorf("file not initialized")
}
if len(indexes) == 0 {
return []any{}, nil
}
// Копируем, сортируем и убираем дубликаты
sorted := make([]int, len(indexes))
copy(sorted, indexes)
sort.Ints(sorted)
uniq := sorted[:0]
prev := -1
for _, v := range sorted {
if v < 0 {
continue
}
if v != prev {
uniq = append(uniq, v)
prev = v
}
}
file, err := os.Open(j.filename)
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()
var records []any
buf := make([]byte, j.allocSize)
for _, lineIndex := range uniq {
offset := int64(lineIndex) * int64(j.allocSize)
if _, err := file.Seek(offset, io.SeekStart); err != nil {
return nil, fmt.Errorf("failed to seek to offset %d: %w", offset, err)
}
n, err := io.ReadFull(file, buf)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
return nil, fmt.Errorf("failed to read line at index %d: %w", lineIndex, err)
}
if n <= 0 {
continue
}
line := string(buf[:n])
line = strings.TrimRight(line, "\n")
line = strings.TrimRight(line, " ")
if strings.TrimSpace(line) == "" {
continue
}
// Расшифровываем если нужно (только cypherKey; Encode обрабатывается в jsonUnmarshal)
if j.cypherKey != "" && !j.options.Encode {
decoded, err := base64.StdEncoding.DecodeString(line)
if err != nil {
if j.options.SkipInvalidLines {
continue
}
return nil, fmt.Errorf("failed to decode base64: %w", err)
}
line = string(decoded)
}
var record any
if err := j.jsonUnmarshal([]byte(line), &record); err != nil {
if j.options.SkipInvalidLines {
continue
}
return nil, fmt.Errorf("failed to unmarshal JSON: %w", err)
}
records = append(records, record)
}
return records, nil
}
// ReadByLineIndexesWithPositions как ReadByLineIndexes, но возвращает пары (record, lineIndex).
// Нужно для точечного Update, когда требуется знать позицию каждой записи.
func (j *JSONLFile) ReadByLineIndexesWithPositions(indexes []int, options LineDbAdapterOptions) ([]any, []int, error) {
if !options.InTransaction {
j.mutex.RLock()
defer j.mutex.RUnlock()
}
if !j.initialized {
return nil, nil, fmt.Errorf("file not initialized")
}
if len(indexes) == 0 {
return []any{}, []int{}, nil
}
sorted := make([]int, len(indexes))
copy(sorted, indexes)
sort.Ints(sorted)
uniq := sorted[:0]
prev := -1
for _, v := range sorted {
if v < 0 {
continue
}
if v != prev {
uniq = append(uniq, v)
prev = v
}
}
file, err := os.Open(j.filename)
if err != nil {
return nil, nil, fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()
var records []any
var positions []int
buf := make([]byte, j.allocSize)
for _, lineIndex := range uniq {
offset := int64(lineIndex) * int64(j.allocSize)
if _, err := file.Seek(offset, io.SeekStart); err != nil {
return nil, nil, fmt.Errorf("failed to seek to offset %d: %w", offset, err)
}
n, err := io.ReadFull(file, buf)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
return nil, nil, fmt.Errorf("failed to read line at index %d: %w", lineIndex, err)
}
if n <= 0 {
continue
}
line := string(buf[:n])
line = strings.TrimRight(line, "\n")
line = strings.TrimRight(line, " ")
if strings.TrimSpace(line) == "" {
continue
}
if j.cypherKey != "" && !j.options.Encode {
decoded, err := base64.StdEncoding.DecodeString(line)
if err != nil {
if j.options.SkipInvalidLines {
continue
}
return nil, nil, fmt.Errorf("failed to decode base64: %w", err)
}
line = string(decoded)
}
var record any
if err := j.jsonUnmarshal([]byte(line), &record); err != nil {
if j.options.SkipInvalidLines {
continue
}
return nil, nil, fmt.Errorf("failed to unmarshal JSON: %w", err)
}
records = append(records, record)
positions = append(positions, lineIndex)
}
return records, positions, nil
}
// WriteAtLineIndexes точечно записывает записи по заданным номерам строк (0-based).
// records и lineIndexes должны быть одинаковой длины и в одном порядке.
func (j *JSONLFile) WriteAtLineIndexes(records []any, lineIndexes []int, options LineDbAdapterOptions) error {
if !options.InTransaction {
j.mutex.Lock()
defer j.mutex.Unlock()
}
if !j.initialized {
return fmt.Errorf("file not initialized")
}
if len(records) != len(lineIndexes) {
return fmt.Errorf("records and lineIndexes length mismatch")
}
if len(records) == 0 {
return nil
}
file, err := os.OpenFile(j.filename, os.O_RDWR, 0644)
if err != nil {
return fmt.Errorf("failed to open file for write: %w", err)
}
defer file.Close()
maxLineLen := j.allocSize - 1
if maxLineLen < 1 {
maxLineLen = 1
}
for i, record := range records {
lineIndex := lineIndexes[i]
jsonData, err := j.jsonMarshal(record)
if err != nil {
return fmt.Errorf("marshal record at index %d: %w", lineIndex, err)
}
line := string(jsonData)
if j.cypherKey != "" && !j.options.Encode {
line = base64.StdEncoding.EncodeToString([]byte(line))
}
if len(line) > maxLineLen {
return fmt.Errorf("record at line %d size %d exceeds allocSize-1 (%d)", lineIndex, len(line), maxLineLen)
}
if len(line) < maxLineLen {
line += strings.Repeat(" ", maxLineLen-len(line))
}
line += "\n"
offset := int64(lineIndex) * int64(j.allocSize)
if _, err := file.Seek(offset, io.SeekStart); err != nil {
return fmt.Errorf("seek to %d: %w", offset, err)
}
if _, err := file.WriteString(line); err != nil {
return fmt.Errorf("write at line %d: %w", lineIndex, err)
}
}
return nil
}
// BlankLinesAtPositions затирает строки пробелами (allocSize-1 + \n). При чтении они пропускаются.
func (j *JSONLFile) BlankLinesAtPositions(lineIndexes []int, options LineDbAdapterOptions) error {
if !options.InTransaction {
j.mutex.Lock()
defer j.mutex.Unlock()
}
if !j.initialized {
return fmt.Errorf("file not initialized")
}
if len(lineIndexes) == 0 {
return nil
}
file, err := os.OpenFile(j.filename, os.O_RDWR, 0644)
if err != nil {
return fmt.Errorf("failed to open file for blank: %w", err)
}
defer file.Close()
blank := strings.Repeat(" ", j.allocSize-1) + "\n"
for _, lineIndex := range lineIndexes {
offset := int64(lineIndex) * int64(j.allocSize)
if _, err := file.Seek(offset, io.SeekStart); err != nil {
return fmt.Errorf("seek to %d: %w", offset, err)
}
if _, err := file.WriteString(blank); err != nil {
return fmt.Errorf("write blank at line %d: %w", lineIndex, err)
}
}
return nil
}
// CompactFile перезаписывает файл, копируя подряд только непустые слоты (сырые байты без JSON).
// Пустой слот: после обрезки \n и пробелов строка пустая — как после BlankLinesAtPositions.
// Один буфер на весь проход; без раскодирования и без удержания всех записей в памяти.
func (j *JSONLFile) CompactFile(options LineDbAdapterOptions) error {
if !options.InTransaction {
j.mutex.Lock()
defer j.mutex.Unlock()
}
if !j.initialized {
return fmt.Errorf("file not initialized")
}
if j.allocSize <= 0 {
return fmt.Errorf("invalid allocSize")
}
info, err := os.Stat(j.filename)
if err != nil {
return fmt.Errorf("compact stat: %w", err)
}
nSlots := int(info.Size()) / j.allocSize
tempFile := j.filename + ".tmp"
dst, err := os.Create(tempFile)
if err != nil {
return fmt.Errorf("compact create temp: %w", err)
}
src, err := os.Open(j.filename)
if err != nil {
dst.Close()
_ = os.Remove(tempFile)
return fmt.Errorf("compact open source: %w", err)
}
buf := make([]byte, j.allocSize)
compactErr := func() error {
defer src.Close()
for slot := 0; slot < nSlots; slot++ {
n, err := io.ReadFull(src, buf)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
if err != nil {
return fmt.Errorf("compact read slot %d: %w", slot, err)
}
if n != j.allocSize {
break
}
slot := buf[:n]
t := bytes.TrimRight(bytes.TrimRight(slot, "\n"), " ")
if len(bytes.TrimSpace(t)) == 0 {
continue
}
if _, err := dst.Write(slot); err != nil {
return fmt.Errorf("compact write: %w", err)
}
}
return nil
}()
if err := dst.Close(); err != nil {
_ = os.Remove(tempFile)
if compactErr != nil {
return compactErr
}
return fmt.Errorf("compact close temp: %w", err)
}
if compactErr != nil {
_ = os.Remove(tempFile)
return compactErr
}
if err := os.Rename(tempFile, j.filename); err != nil {
_ = os.Remove(tempFile)
return fmt.Errorf("compact rename: %w", err)
}
return nil
}
// LineCount возвращает число строк в файле (fileSize / allocSize).
// Используется для точечного индексирования после Write.
func (j *JSONLFile) LineCount() (int, error) {
info, err := os.Stat(j.filename)
if err != nil {
if os.IsNotExist(err) {
return 0, nil
}
return 0, err
}
return int(info.Size()) / j.allocSize, nil
}
// Write записывает данные в файл
func (j *JSONLFile) Write(data any, options LineDbAdapterOptions) error {
if !options.InTransaction {
j.mutex.Lock()
defer j.mutex.Unlock()
}
if !j.initialized {
return fmt.Errorf("file not initialized")
}
records, ok := data.([]any)
if !ok {
records = []any{data}
}
file, err := os.OpenFile(j.filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("failed to open file for writing: %w", err)
}
defer file.Close()
for _, record := range records {
jsonData, err := j.jsonMarshal(record)
if err != nil {
return fmt.Errorf("failed to marshal JSON: %w", err)
}
line := string(jsonData)
// Шифруем если нужно (только cypherKey; Encode уже в jsonMarshal)
if j.cypherKey != "" && !j.options.Encode {
line = base64.StdEncoding.EncodeToString([]byte(line))
}
maxLineLen := j.allocSize - 1
if len(line) > maxLineLen {
return fmt.Errorf("record size %d exceeds allocSize-1 (%d)", len(line), maxLineLen)
}
// Дополняем до allocSize-1
if len(line) < maxLineLen {
line += strings.Repeat(" ", maxLineLen-len(line))
}
if _, err := file.WriteString(line + "\n"); err != nil {
return fmt.Errorf("failed to write line: %w", err)
}
}
return nil
}
// Insert вставляет новые записи
func (j *JSONLFile) Insert(data any, options LineDbAdapterOptions) ([]any, error) {
records, ok := data.([]any)
if !ok {
records = []any{data}
}
// Генерируем ID если нужно
for i, record := range records {
if recordMap, ok := record.(map[string]any); ok {
if recordMap["id"] == nil || recordMap["id"] == "" {
recordMap["id"] = time.Now().UnixNano()
records[i] = recordMap
}
}
}
if err := j.Write(records, options); err != nil {
return nil, err
}
return records, nil
}
// ReadByFilter читает записи по фильтру
func (j *JSONLFile) ReadByFilter(filter any, options LineDbAdapterOptions) ([]any, error) {
allRecords, err := j.Read(options)
if err != nil {
return nil, err
}
var filteredRecords []any
for _, record := range allRecords {
if j.matchesFilter(record, filter, options.StrictCompare) {
filteredRecords = append(filteredRecords, record)
}
}
return filteredRecords, nil
}
// Update обновляет записи
func (j *JSONLFile) Update(data any, filter any, options LineDbAdapterOptions) ([]any, error) {
// Читаем все записи
allRecords, err := j.Read(options)
if err != nil {
return nil, err
}
// Фильтруем записи для обновления
var recordsToUpdate []any
updateData, ok := data.(map[string]any)
if !ok {
return nil, fmt.Errorf("update data must be a map")
}
for _, record := range allRecords {
if j.matchesFilter(record, filter, options.StrictCompare) {
// Обновляем запись
if recordMap, ok := record.(map[string]any); ok {
for key, value := range updateData {
recordMap[key] = value
}
recordsToUpdate = append(recordsToUpdate, recordMap)
}
}
}
// Перезаписываем файл
if err := j.rewriteFile(allRecords); err != nil {
return nil, err
}
return recordsToUpdate, nil
}
// Delete удаляет записи
func (j *JSONLFile) Delete(data any, options LineDbAdapterOptions) ([]any, error) {
// Читаем все записи
allRecords, err := j.Read(options)
if err != nil {
return nil, err
}
var remainingRecords []any
var deletedRecords []any
for _, record := range allRecords {
if j.matchesFilter(record, data, options.StrictCompare) {
deletedRecords = append(deletedRecords, record)
} else {
remainingRecords = append(remainingRecords, record)
}
}
// Перезаписываем файл
if err := j.rewriteFile(remainingRecords); err != nil {
return nil, err
}
return deletedRecords, nil
}
// GetFilename возвращает имя файла
func (j *JSONLFile) GetFilename() string {
return j.filename
}
// GetCollectionName возвращает имя коллекции
func (j *JSONLFile) GetCollectionName() string {
return j.collectionName
}
// GetOptions возвращает опции
func (j *JSONLFile) GetOptions() JSONLFileOptions {
return j.options
}
// GetEncryptKey возвращает ключ шифрования
func (j *JSONLFile) GetEncryptKey() string {
return j.cypherKey
}
// matchesFilter проверяет соответствие записи фильтру
func (j *JSONLFile) matchesFilter(record any, filter any, strictCompare bool) bool {
if filter == nil {
return true
}
switch f := filter.(type) {
case string:
// Простая проверка по строке
recordStr := fmt.Sprintf("%v", record)
if strictCompare {
return recordStr == f
}
return strings.Contains(strings.ToLower(recordStr), strings.ToLower(f))
case map[string]any:
// Проверка по полям
if recordMap, ok := record.(map[string]any); ok {
for key, filterValue := range f {
recordValue, exists := recordMap[key]
if !exists {
return false
}
if !j.valuesMatch(recordValue, filterValue, strictCompare) {
return false
}
}
return true
}
case func(any) bool:
// Функция фильтрации
return f(record)
}
return false
}
// valuesMatch сравнивает значения
func (j *JSONLFile) valuesMatch(a, b any, strictCompare bool) bool {
if strictCompare {
return a == b
}
// Для строк - нечувствительное к регистру сравнение
if aStr, ok := a.(string); ok {
if bStr, ok := b.(string); ok {
return strings.Contains(strings.ToLower(aStr), strings.ToLower(bStr))
}
}
return a == b
}
// rewriteFile перезаписывает файл новыми данными
func (j *JSONLFile) rewriteFile(records []any) error {
// Создаем временный файл
tempFile := j.filename + ".tmp"
file, err := os.Create(tempFile)
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
defer file.Close()
// Записываем данные во временный файл
for _, record := range records {
jsonData, err := j.jsonMarshal(record)
if err != nil {
return fmt.Errorf("failed to marshal JSON: %w", err)
}
line := string(jsonData)
// Шифруем если нужно (только cypherKey; Encode уже в jsonMarshal)
if j.cypherKey != "" && !j.options.Encode {
line = base64.StdEncoding.EncodeToString([]byte(line))
}
maxLineLen := j.allocSize - 1
if len(line) > maxLineLen {
return fmt.Errorf("record size %d exceeds allocSize-1 (%d)", len(line), maxLineLen)
}
// Дополняем до allocSize-1
if len(line) < maxLineLen {
line += strings.Repeat(" ", maxLineLen-len(line))
}
if _, err := file.WriteString(line + "\n"); err != nil {
return fmt.Errorf("failed to write line: %w", err)
}
}
// Заменяем оригинальный файл
if err := os.Rename(tempFile, j.filename); err != nil {
return fmt.Errorf("failed to rename temp file: %w", err)
}
return nil
}
// Destroy очищает ресурсы
func (j *JSONLFile) Destroy() {
j.mutex.Lock()
defer j.mutex.Unlock()
j.initialized = false
j.selectCache = nil
j.events = nil
}
// WithTransaction выполняет операцию в транзакции
func (j *JSONLFile) WithTransaction(callback func(*JSONLFile, LineDbAdapterOptions) error, transactionOptions TransactionOptions, methodsOptions LineDbAdapterOptions) error {
// Создаем транзакцию
tx := NewTransaction("write", generateTransactionID(), transactionOptions.Timeout, transactionOptions.Rollback)
// Создаем резервную копию если нужно
if transactionOptions.Rollback {
if err := tx.CreateBackup(j.filename); err != nil {
return fmt.Errorf("failed to create backup: %w", err)
}
}
j.transaction = tx
j.inTransaction = true
defer func() {
j.inTransaction = false
j.transaction = nil
}()
// Выполняем callback
if err := callback(j, methodsOptions); err != nil {
// Откатываем изменения если нужно
if transactionOptions.Rollback {
if restoreErr := tx.RestoreFromBackup(j.filename); restoreErr != nil {
return fmt.Errorf("failed to restore from backup: %w", restoreErr)
}
}
return err
}
// Очищаем резервную копию
if err := tx.CleanupBackup(); err != nil {
return fmt.Errorf("failed to cleanup backup: %w", err)
}
return nil
}
// generateTransactionID генерирует ID транзакции
func generateTransactionID() string {
return fmt.Sprintf("tx_%d", time.Now().UnixNano())
}