1086 lines
29 KiB
Go
1086 lines
29 KiB
Go
package linedb
|
||
|
||
import (
|
||
"bufio"
|
||
"bytes"
|
||
"crypto/aes"
|
||
"crypto/cipher"
|
||
"crypto/rand"
|
||
"crypto/sha256"
|
||
"encoding/base64"
|
||
"fmt"
|
||
"io"
|
||
"os"
|
||
"path/filepath"
|
||
"sort"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
goccyjson "github.com/goccy/go-json"
|
||
)
|
||
|
||
// Функции сериализации по умолчанию (используют go-json)
|
||
func defaultJSONMarshal(v any) ([]byte, error) {
|
||
return goccyjson.Marshal(v)
|
||
}
|
||
|
||
func defaultJSONUnmarshal(data []byte, v any) error {
|
||
return goccyjson.Unmarshal(data, v)
|
||
}
|
||
|
||
// makeKeyOrderMarshal создаёт JSONMarshal с заданным порядком ключей
|
||
func makeKeyOrderMarshal(keyOrder []KeyOrder) func(any) ([]byte, error) {
|
||
return func(v any) ([]byte, error) {
|
||
if m, ok := v.(map[string]any); ok {
|
||
return marshalMapSorted(m, keyOrder)
|
||
}
|
||
return goccyjson.Marshal(v)
|
||
}
|
||
}
|
||
|
||
func marshalMapSorted(m map[string]any, keyOrder []KeyOrder) ([]byte, error) {
|
||
orderMap := make(map[string]int)
|
||
for _, ko := range keyOrder {
|
||
orderMap[ko.Key] = ko.Order
|
||
}
|
||
|
||
keys := make([]string, 0, len(m))
|
||
for k := range m {
|
||
keys = append(keys, k)
|
||
}
|
||
sort.Slice(keys, func(i, j int) bool {
|
||
oi, hasI := orderMap[keys[i]]
|
||
oj, hasJ := orderMap[keys[j]]
|
||
group := func(o int, has bool) int {
|
||
if !has {
|
||
return 1
|
||
}
|
||
if o >= 0 {
|
||
return 0
|
||
}
|
||
return 2
|
||
}
|
||
gi, gj := group(oi, hasI), group(oj, hasJ)
|
||
if gi != gj {
|
||
return gi < gj
|
||
}
|
||
switch gi {
|
||
case 0:
|
||
return oi < oj
|
||
case 1:
|
||
return keys[i] < keys[j]
|
||
default:
|
||
return oi < oj
|
||
}
|
||
})
|
||
|
||
var buf bytes.Buffer
|
||
buf.WriteByte('{')
|
||
for i, k := range keys {
|
||
if i > 0 {
|
||
buf.WriteByte(',')
|
||
}
|
||
keyEscaped, _ := goccyjson.Marshal(k)
|
||
buf.Write(keyEscaped)
|
||
buf.WriteByte(':')
|
||
val := m[k]
|
||
if nested, ok := val.(map[string]any); ok {
|
||
valBytes, err := marshalMapSorted(nested, keyOrder)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
buf.Write(valBytes)
|
||
} else {
|
||
valBytes, err := goccyjson.Marshal(val)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
buf.Write(valBytes)
|
||
}
|
||
}
|
||
buf.WriteByte('}')
|
||
return buf.Bytes(), nil
|
||
}
|
||
|
||
// encodeKeyBytes превращает строку ключа в 32 байта (SHA256) для AES-256
|
||
func encodeKeyBytes(keyStr string) []byte {
|
||
h := sha256.Sum256([]byte(keyStr))
|
||
return h[:]
|
||
}
|
||
|
||
// aesGCMEncrypt — встроенное AES-256-GCM шифрование
|
||
func aesGCMEncrypt(plaintext, key []byte) ([]byte, error) {
|
||
block, err := aes.NewCipher(key)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
aesgcm, err := cipher.NewGCM(block)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
nonce := make([]byte, aesgcm.NonceSize())
|
||
if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
|
||
return nil, err
|
||
}
|
||
return aesgcm.Seal(nonce, nonce, plaintext, nil), nil
|
||
}
|
||
|
||
// aesGCMDecrypt — встроенная расшифровка AES-256-GCM
|
||
func aesGCMDecrypt(ciphertext, key []byte) ([]byte, error) {
|
||
block, err := aes.NewCipher(key)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
aesgcm, err := cipher.NewGCM(block)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
nonceSize := aesgcm.NonceSize()
|
||
if len(ciphertext) < nonceSize {
|
||
return nil, fmt.Errorf("ciphertext too short")
|
||
}
|
||
nonce, ciphertext := ciphertext[:nonceSize], ciphertext[nonceSize:]
|
||
return aesgcm.Open(nil, nonce, ciphertext, nil)
|
||
}
|
||
|
||
// makeEncodedMarshal оборачивает marshal в шифрование: marshal -> encrypt -> base64
|
||
func makeEncodedMarshal(marshal func(any) ([]byte, error), encFn func([]byte, []byte) ([]byte, error), key []byte) func(any) ([]byte, error) {
|
||
return func(v any) ([]byte, error) {
|
||
jsonData, err := marshal(v)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
encrypted, err := encFn(jsonData, key)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return []byte(base64.StdEncoding.EncodeToString(encrypted)), nil
|
||
}
|
||
}
|
||
|
||
// makeEncodedUnmarshal оборачивает unmarshal в дешифрование: base64 -> decrypt -> unmarshal
|
||
func makeEncodedUnmarshal(unmarshal func([]byte, any) error, decFn func([]byte, []byte) ([]byte, error), key []byte) func([]byte, any) error {
|
||
return func(data []byte, v any) error {
|
||
encrypted, err := base64.StdEncoding.DecodeString(string(data))
|
||
if err != nil {
|
||
return err
|
||
}
|
||
plaintext, err := decFn(encrypted, key)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return unmarshal(plaintext, v)
|
||
}
|
||
}
|
||
|
||
// JSONLFile представляет адаптер для работы с JSONL файлами
|
||
type JSONLFile struct {
|
||
filename string
|
||
cypherKey string
|
||
allocSize int
|
||
collectionName string
|
||
hashFilename string
|
||
options JSONLFileOptions
|
||
initialized bool
|
||
inTransaction bool
|
||
transaction *Transaction
|
||
mutex sync.RWMutex
|
||
selectCache map[string]any
|
||
events map[string][]func(any)
|
||
// Функции сериализации
|
||
jsonMarshal func(any) ([]byte, error)
|
||
jsonUnmarshal func([]byte, any) error
|
||
}
|
||
|
||
// NewJSONLFile создает новый экземпляр JSONLFile
|
||
func NewJSONLFile(filename string, cypherKey string, options JSONLFileOptions) *JSONLFile {
|
||
hash := sha256.Sum256([]byte(filename))
|
||
hashFilename := fmt.Sprintf("%x", hash)
|
||
|
||
collectionName := options.CollectionName
|
||
if collectionName == "" {
|
||
collectionName = hashFilename
|
||
}
|
||
|
||
allocSize := options.AllocSize
|
||
if allocSize == 0 {
|
||
allocSize = 256
|
||
}
|
||
|
||
// Определяем функции сериализации
|
||
jsonMarshal := defaultJSONMarshal
|
||
jsonUnmarshal := defaultJSONUnmarshal
|
||
|
||
if options.JSONMarshal != nil {
|
||
jsonMarshal = options.JSONMarshal
|
||
} else if len(options.KeyOrder) > 0 {
|
||
jsonMarshal = makeKeyOrderMarshal(options.KeyOrder)
|
||
}
|
||
|
||
if options.JSONUnmarshal != nil {
|
||
jsonUnmarshal = options.JSONUnmarshal
|
||
}
|
||
|
||
// Encode: оборачиваем marshal/unmarshal в шифрование
|
||
if options.Encode && options.EncodeKey != "" {
|
||
key := encodeKeyBytes(options.EncodeKey)
|
||
encFn := options.EncryptFn
|
||
decFn := options.DecryptFn
|
||
if encFn == nil {
|
||
encFn = aesGCMEncrypt
|
||
}
|
||
if decFn == nil {
|
||
decFn = aesGCMDecrypt
|
||
}
|
||
jsonMarshal = makeEncodedMarshal(jsonMarshal, encFn, key)
|
||
jsonUnmarshal = makeEncodedUnmarshal(jsonUnmarshal, decFn, key)
|
||
}
|
||
|
||
return &JSONLFile{
|
||
filename: filename,
|
||
cypherKey: cypherKey,
|
||
allocSize: allocSize,
|
||
collectionName: collectionName,
|
||
hashFilename: hashFilename,
|
||
options: options,
|
||
selectCache: make(map[string]any),
|
||
events: make(map[string][]func(any)),
|
||
jsonMarshal: jsonMarshal,
|
||
jsonUnmarshal: jsonUnmarshal,
|
||
}
|
||
}
|
||
|
||
// Init инициализирует файл
|
||
func (j *JSONLFile) Init(force bool, options LineDbAdapterOptions) error {
|
||
if !options.InTransaction {
|
||
j.mutex.Lock()
|
||
defer j.mutex.Unlock()
|
||
}
|
||
|
||
if j.initialized && !force {
|
||
return nil
|
||
}
|
||
|
||
// Создаем директорию если не существует
|
||
dir := filepath.Dir(j.filename)
|
||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||
return fmt.Errorf("failed to create directory: %w", err)
|
||
}
|
||
|
||
// Создаем файл если не существует
|
||
if st, err := os.Stat(j.filename); os.IsNotExist(err) {
|
||
file, err := os.Create(j.filename)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to create file: %w", err)
|
||
}
|
||
file.Close()
|
||
} else if err == nil && st.Size() > 0 {
|
||
// Файл существует и не пустой — проверяем и нормализуем записи
|
||
if err := j.normalizeExistingFile(); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
j.initialized = true
|
||
return nil
|
||
}
|
||
|
||
// normalizeExistingFile проверяет размеры записей и приводит к allocSize-1
|
||
func (j *JSONLFile) normalizeExistingFile() error {
|
||
data, err := os.ReadFile(j.filename)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to read file: %w", err)
|
||
}
|
||
lines := strings.Split(strings.TrimSuffix(string(data), "\n"), "\n")
|
||
var nonEmpty []string
|
||
for _, l := range lines {
|
||
if l == "" {
|
||
continue
|
||
}
|
||
nonEmpty = append(nonEmpty, l)
|
||
}
|
||
if len(nonEmpty) == 0 {
|
||
return nil
|
||
}
|
||
|
||
// Максимальная длина среди записей
|
||
maxLen := 0
|
||
for _, l := range nonEmpty {
|
||
if len(l) > maxLen {
|
||
maxLen = len(l)
|
||
}
|
||
}
|
||
|
||
// Приводим короткие к maxLen (добавляем пробелы)
|
||
for i, l := range nonEmpty {
|
||
if len(l) < maxLen {
|
||
nonEmpty[i] = l + strings.Repeat(" ", maxLen-len(l))
|
||
}
|
||
}
|
||
|
||
targetLen := j.allocSize - 1
|
||
if targetLen < 1 {
|
||
targetLen = 1
|
||
}
|
||
|
||
if targetLen < maxLen {
|
||
// Уменьшаем: можно только если данные (без trailing spaces) помещаются
|
||
for i := range nonEmpty {
|
||
trimmed := strings.TrimRight(nonEmpty[i], " ")
|
||
if len(trimmed) > targetLen {
|
||
return fmt.Errorf("init failed: record data size %d exceeds configured allocSize-1 (%d), cannot reduce without data loss",
|
||
len(trimmed), targetLen)
|
||
}
|
||
nonEmpty[i] = trimmed + strings.Repeat(" ", targetLen-len(trimmed))
|
||
}
|
||
} else if targetLen > maxLen {
|
||
// Расширяем
|
||
for i := range nonEmpty {
|
||
nonEmpty[i] = nonEmpty[i] + strings.Repeat(" ", targetLen-len(nonEmpty[i]))
|
||
}
|
||
}
|
||
|
||
// Перезаписываем файл
|
||
var buf bytes.Buffer
|
||
for _, l := range nonEmpty {
|
||
buf.WriteString(l)
|
||
buf.WriteByte('\n')
|
||
}
|
||
return os.WriteFile(j.filename, buf.Bytes(), 0644)
|
||
}
|
||
|
||
// Read читает все записи из файла
|
||
func (j *JSONLFile) Read(options LineDbAdapterOptions) ([]any, error) {
|
||
if !options.InTransaction {
|
||
j.mutex.RLock()
|
||
defer j.mutex.RUnlock()
|
||
}
|
||
|
||
if !j.initialized {
|
||
return nil, fmt.Errorf("file not initialized")
|
||
}
|
||
|
||
file, err := os.Open(j.filename)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to open file: %w", err)
|
||
}
|
||
defer file.Close()
|
||
|
||
var records []any
|
||
scanner := bufio.NewScanner(file)
|
||
|
||
for scanner.Scan() {
|
||
line := strings.TrimSpace(scanner.Text())
|
||
if line == "" {
|
||
continue
|
||
}
|
||
|
||
// Расшифровываем если нужно (только cypherKey; Encode обрабатывается в jsonUnmarshal)
|
||
if j.cypherKey != "" && !j.options.Encode {
|
||
decoded, err := base64.StdEncoding.DecodeString(line)
|
||
if err != nil {
|
||
if j.options.SkipInvalidLines {
|
||
continue
|
||
}
|
||
return nil, fmt.Errorf("failed to decode base64: %w", err)
|
||
}
|
||
line = string(decoded)
|
||
}
|
||
|
||
var record any
|
||
if err := j.jsonUnmarshal([]byte(line), &record); err != nil {
|
||
if j.options.SkipInvalidLines {
|
||
continue
|
||
}
|
||
return nil, fmt.Errorf("failed to unmarshal JSON: %w", err)
|
||
}
|
||
|
||
records = append(records, record)
|
||
}
|
||
|
||
return records, scanner.Err()
|
||
}
|
||
|
||
// ReadWithPhysicalLineIndexes как Read, но для каждой записи возвращает 0-based индекс строки (слота),
|
||
// в том же смысле, что ReadByLineIndexes/WriteAtLineIndexes: смещение в файле = lineIndex * allocSize.
|
||
// Пустые слоты (пробелы после удаления и т.п.) пропускаются; индекс — номер слота, а не порядковый номер записи.
|
||
func (j *JSONLFile) ReadWithPhysicalLineIndexes(options LineDbAdapterOptions) ([]any, []int, error) {
|
||
if !options.InTransaction {
|
||
j.mutex.RLock()
|
||
defer j.mutex.RUnlock()
|
||
}
|
||
|
||
if !j.initialized {
|
||
return nil, nil, fmt.Errorf("file not initialized")
|
||
}
|
||
if j.allocSize <= 0 {
|
||
return nil, nil, fmt.Errorf("invalid allocSize")
|
||
}
|
||
|
||
file, err := os.Open(j.filename)
|
||
if err != nil {
|
||
return nil, nil, fmt.Errorf("failed to open file: %w", err)
|
||
}
|
||
defer file.Close()
|
||
|
||
info, err := file.Stat()
|
||
if err != nil {
|
||
return nil, nil, fmt.Errorf("stat file: %w", err)
|
||
}
|
||
nSlots := int(info.Size()) / j.allocSize
|
||
|
||
buf := make([]byte, j.allocSize)
|
||
var records []any
|
||
var lineIndexes []int
|
||
|
||
for slot := 0; slot < nSlots; slot++ {
|
||
n, err := io.ReadFull(file, buf)
|
||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||
break
|
||
}
|
||
if err != nil {
|
||
return nil, nil, fmt.Errorf("read slot %d: %w", slot, err)
|
||
}
|
||
if n != j.allocSize {
|
||
break
|
||
}
|
||
|
||
line := string(buf[:n])
|
||
line = strings.TrimRight(line, "\n")
|
||
line = strings.TrimRight(line, " ")
|
||
if strings.TrimSpace(line) == "" {
|
||
continue
|
||
}
|
||
|
||
if j.cypherKey != "" && !j.options.Encode {
|
||
decoded, err := base64.StdEncoding.DecodeString(line)
|
||
if err != nil {
|
||
if j.options.SkipInvalidLines {
|
||
continue
|
||
}
|
||
return nil, nil, fmt.Errorf("failed to decode base64: %w", err)
|
||
}
|
||
line = string(decoded)
|
||
}
|
||
|
||
var record any
|
||
if err := j.jsonUnmarshal([]byte(line), &record); err != nil {
|
||
if j.options.SkipInvalidLines {
|
||
continue
|
||
}
|
||
return nil, nil, fmt.Errorf("failed to unmarshal JSON: %w", err)
|
||
}
|
||
|
||
records = append(records, record)
|
||
lineIndexes = append(lineIndexes, slot)
|
||
}
|
||
|
||
return records, lineIndexes, nil
|
||
}
|
||
|
||
// ReadByLineIndexes читает записи по номерам строк (0-based) с использованием random access.
|
||
// Ожидается, что файл нормализован и каждая строка имеет длину allocSize байт (включая \n),
|
||
// как это обеспечивает Init/normalizeExistingFile/rewriteFile.
|
||
func (j *JSONLFile) ReadByLineIndexes(indexes []int, options LineDbAdapterOptions) ([]any, error) {
|
||
// Внутри транзакций (options.InTransaction == true) блокировка внешним кодом
|
||
// уже обеспечена, повторный лок мог бы привести к дедлоку.
|
||
if !options.InTransaction {
|
||
j.mutex.RLock()
|
||
defer j.mutex.RUnlock()
|
||
}
|
||
|
||
if !j.initialized {
|
||
return nil, fmt.Errorf("file not initialized")
|
||
}
|
||
|
||
if len(indexes) == 0 {
|
||
return []any{}, nil
|
||
}
|
||
|
||
// Копируем, сортируем и убираем дубликаты
|
||
sorted := make([]int, len(indexes))
|
||
copy(sorted, indexes)
|
||
sort.Ints(sorted)
|
||
uniq := sorted[:0]
|
||
prev := -1
|
||
for _, v := range sorted {
|
||
if v < 0 {
|
||
continue
|
||
}
|
||
if v != prev {
|
||
uniq = append(uniq, v)
|
||
prev = v
|
||
}
|
||
}
|
||
|
||
file, err := os.Open(j.filename)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to open file: %w", err)
|
||
}
|
||
defer file.Close()
|
||
|
||
var records []any
|
||
buf := make([]byte, j.allocSize)
|
||
|
||
for _, lineIndex := range uniq {
|
||
offset := int64(lineIndex) * int64(j.allocSize)
|
||
if _, err := file.Seek(offset, io.SeekStart); err != nil {
|
||
return nil, fmt.Errorf("failed to seek to offset %d: %w", offset, err)
|
||
}
|
||
|
||
n, err := io.ReadFull(file, buf)
|
||
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
|
||
return nil, fmt.Errorf("failed to read line at index %d: %w", lineIndex, err)
|
||
}
|
||
if n <= 0 {
|
||
continue
|
||
}
|
||
|
||
line := string(buf[:n])
|
||
line = strings.TrimRight(line, "\n")
|
||
line = strings.TrimRight(line, " ")
|
||
if strings.TrimSpace(line) == "" {
|
||
continue
|
||
}
|
||
|
||
// Расшифровываем если нужно (только cypherKey; Encode обрабатывается в jsonUnmarshal)
|
||
if j.cypherKey != "" && !j.options.Encode {
|
||
decoded, err := base64.StdEncoding.DecodeString(line)
|
||
if err != nil {
|
||
if j.options.SkipInvalidLines {
|
||
continue
|
||
}
|
||
return nil, fmt.Errorf("failed to decode base64: %w", err)
|
||
}
|
||
line = string(decoded)
|
||
}
|
||
|
||
var record any
|
||
if err := j.jsonUnmarshal([]byte(line), &record); err != nil {
|
||
if j.options.SkipInvalidLines {
|
||
continue
|
||
}
|
||
return nil, fmt.Errorf("failed to unmarshal JSON: %w", err)
|
||
}
|
||
|
||
records = append(records, record)
|
||
}
|
||
|
||
return records, nil
|
||
}
|
||
|
||
// ReadByLineIndexesWithPositions как ReadByLineIndexes, но возвращает пары (record, lineIndex).
|
||
// Нужно для точечного Update, когда требуется знать позицию каждой записи.
|
||
func (j *JSONLFile) ReadByLineIndexesWithPositions(indexes []int, options LineDbAdapterOptions) ([]any, []int, error) {
|
||
if !options.InTransaction {
|
||
j.mutex.RLock()
|
||
defer j.mutex.RUnlock()
|
||
}
|
||
if !j.initialized {
|
||
return nil, nil, fmt.Errorf("file not initialized")
|
||
}
|
||
if len(indexes) == 0 {
|
||
return []any{}, []int{}, nil
|
||
}
|
||
sorted := make([]int, len(indexes))
|
||
copy(sorted, indexes)
|
||
sort.Ints(sorted)
|
||
uniq := sorted[:0]
|
||
prev := -1
|
||
for _, v := range sorted {
|
||
if v < 0 {
|
||
continue
|
||
}
|
||
if v != prev {
|
||
uniq = append(uniq, v)
|
||
prev = v
|
||
}
|
||
}
|
||
|
||
file, err := os.Open(j.filename)
|
||
if err != nil {
|
||
return nil, nil, fmt.Errorf("failed to open file: %w", err)
|
||
}
|
||
defer file.Close()
|
||
|
||
var records []any
|
||
var positions []int
|
||
buf := make([]byte, j.allocSize)
|
||
|
||
for _, lineIndex := range uniq {
|
||
offset := int64(lineIndex) * int64(j.allocSize)
|
||
if _, err := file.Seek(offset, io.SeekStart); err != nil {
|
||
return nil, nil, fmt.Errorf("failed to seek to offset %d: %w", offset, err)
|
||
}
|
||
n, err := io.ReadFull(file, buf)
|
||
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
|
||
return nil, nil, fmt.Errorf("failed to read line at index %d: %w", lineIndex, err)
|
||
}
|
||
if n <= 0 {
|
||
continue
|
||
}
|
||
line := string(buf[:n])
|
||
line = strings.TrimRight(line, "\n")
|
||
line = strings.TrimRight(line, " ")
|
||
if strings.TrimSpace(line) == "" {
|
||
continue
|
||
}
|
||
if j.cypherKey != "" && !j.options.Encode {
|
||
decoded, err := base64.StdEncoding.DecodeString(line)
|
||
if err != nil {
|
||
if j.options.SkipInvalidLines {
|
||
continue
|
||
}
|
||
return nil, nil, fmt.Errorf("failed to decode base64: %w", err)
|
||
}
|
||
line = string(decoded)
|
||
}
|
||
var record any
|
||
if err := j.jsonUnmarshal([]byte(line), &record); err != nil {
|
||
if j.options.SkipInvalidLines {
|
||
continue
|
||
}
|
||
return nil, nil, fmt.Errorf("failed to unmarshal JSON: %w", err)
|
||
}
|
||
records = append(records, record)
|
||
positions = append(positions, lineIndex)
|
||
}
|
||
return records, positions, nil
|
||
}
|
||
|
||
// WriteAtLineIndexes точечно записывает записи по заданным номерам строк (0-based).
|
||
// records и lineIndexes должны быть одинаковой длины и в одном порядке.
|
||
func (j *JSONLFile) WriteAtLineIndexes(records []any, lineIndexes []int, options LineDbAdapterOptions) error {
|
||
if !options.InTransaction {
|
||
j.mutex.Lock()
|
||
defer j.mutex.Unlock()
|
||
}
|
||
if !j.initialized {
|
||
return fmt.Errorf("file not initialized")
|
||
}
|
||
if len(records) != len(lineIndexes) {
|
||
return fmt.Errorf("records and lineIndexes length mismatch")
|
||
}
|
||
if len(records) == 0 {
|
||
return nil
|
||
}
|
||
|
||
file, err := os.OpenFile(j.filename, os.O_RDWR, 0644)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to open file for write: %w", err)
|
||
}
|
||
defer file.Close()
|
||
|
||
maxLineLen := j.allocSize - 1
|
||
if maxLineLen < 1 {
|
||
maxLineLen = 1
|
||
}
|
||
|
||
for i, record := range records {
|
||
lineIndex := lineIndexes[i]
|
||
jsonData, err := j.jsonMarshal(record)
|
||
if err != nil {
|
||
return fmt.Errorf("marshal record at index %d: %w", lineIndex, err)
|
||
}
|
||
line := string(jsonData)
|
||
if j.cypherKey != "" && !j.options.Encode {
|
||
line = base64.StdEncoding.EncodeToString([]byte(line))
|
||
}
|
||
if len(line) > maxLineLen {
|
||
return fmt.Errorf("record at line %d size %d exceeds allocSize-1 (%d)", lineIndex, len(line), maxLineLen)
|
||
}
|
||
if len(line) < maxLineLen {
|
||
line += strings.Repeat(" ", maxLineLen-len(line))
|
||
}
|
||
line += "\n"
|
||
|
||
offset := int64(lineIndex) * int64(j.allocSize)
|
||
if _, err := file.Seek(offset, io.SeekStart); err != nil {
|
||
return fmt.Errorf("seek to %d: %w", offset, err)
|
||
}
|
||
if _, err := file.WriteString(line); err != nil {
|
||
return fmt.Errorf("write at line %d: %w", lineIndex, err)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// BlankLinesAtPositions затирает строки пробелами (allocSize-1 + \n). При чтении они пропускаются.
|
||
func (j *JSONLFile) BlankLinesAtPositions(lineIndexes []int, options LineDbAdapterOptions) error {
|
||
if !options.InTransaction {
|
||
j.mutex.Lock()
|
||
defer j.mutex.Unlock()
|
||
}
|
||
if !j.initialized {
|
||
return fmt.Errorf("file not initialized")
|
||
}
|
||
if len(lineIndexes) == 0 {
|
||
return nil
|
||
}
|
||
|
||
file, err := os.OpenFile(j.filename, os.O_RDWR, 0644)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to open file for blank: %w", err)
|
||
}
|
||
defer file.Close()
|
||
|
||
blank := strings.Repeat(" ", j.allocSize-1) + "\n"
|
||
|
||
for _, lineIndex := range lineIndexes {
|
||
offset := int64(lineIndex) * int64(j.allocSize)
|
||
if _, err := file.Seek(offset, io.SeekStart); err != nil {
|
||
return fmt.Errorf("seek to %d: %w", offset, err)
|
||
}
|
||
if _, err := file.WriteString(blank); err != nil {
|
||
return fmt.Errorf("write blank at line %d: %w", lineIndex, err)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// LineCount возвращает число строк в файле (fileSize / allocSize).
|
||
// Используется для точечного индексирования после Write.
|
||
func (j *JSONLFile) LineCount() (int, error) {
|
||
info, err := os.Stat(j.filename)
|
||
if err != nil {
|
||
if os.IsNotExist(err) {
|
||
return 0, nil
|
||
}
|
||
return 0, err
|
||
}
|
||
return int(info.Size()) / j.allocSize, nil
|
||
}
|
||
|
||
// Write записывает данные в файл
|
||
func (j *JSONLFile) Write(data any, options LineDbAdapterOptions) error {
|
||
if !options.InTransaction {
|
||
j.mutex.Lock()
|
||
defer j.mutex.Unlock()
|
||
}
|
||
|
||
if !j.initialized {
|
||
return fmt.Errorf("file not initialized")
|
||
}
|
||
|
||
records, ok := data.([]any)
|
||
if !ok {
|
||
records = []any{data}
|
||
}
|
||
|
||
file, err := os.OpenFile(j.filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to open file for writing: %w", err)
|
||
}
|
||
defer file.Close()
|
||
|
||
for _, record := range records {
|
||
jsonData, err := j.jsonMarshal(record)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to marshal JSON: %w", err)
|
||
}
|
||
|
||
line := string(jsonData)
|
||
|
||
// Шифруем если нужно (только cypherKey; Encode уже в jsonMarshal)
|
||
if j.cypherKey != "" && !j.options.Encode {
|
||
line = base64.StdEncoding.EncodeToString([]byte(line))
|
||
}
|
||
|
||
maxLineLen := j.allocSize - 1
|
||
if len(line) > maxLineLen {
|
||
return fmt.Errorf("record size %d exceeds allocSize-1 (%d)", len(line), maxLineLen)
|
||
}
|
||
// Дополняем до allocSize-1
|
||
if len(line) < maxLineLen {
|
||
line += strings.Repeat(" ", maxLineLen-len(line))
|
||
}
|
||
|
||
if _, err := file.WriteString(line + "\n"); err != nil {
|
||
return fmt.Errorf("failed to write line: %w", err)
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// Insert вставляет новые записи
|
||
func (j *JSONLFile) Insert(data any, options LineDbAdapterOptions) ([]any, error) {
|
||
records, ok := data.([]any)
|
||
if !ok {
|
||
records = []any{data}
|
||
}
|
||
|
||
// Генерируем ID если нужно
|
||
for i, record := range records {
|
||
if recordMap, ok := record.(map[string]any); ok {
|
||
if recordMap["id"] == nil || recordMap["id"] == "" {
|
||
recordMap["id"] = time.Now().UnixNano()
|
||
records[i] = recordMap
|
||
}
|
||
}
|
||
}
|
||
|
||
if err := j.Write(records, options); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return records, nil
|
||
}
|
||
|
||
// ReadByFilter читает записи по фильтру
|
||
func (j *JSONLFile) ReadByFilter(filter any, options LineDbAdapterOptions) ([]any, error) {
|
||
allRecords, err := j.Read(options)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
var filteredRecords []any
|
||
|
||
for _, record := range allRecords {
|
||
if j.matchesFilter(record, filter, options.StrictCompare) {
|
||
filteredRecords = append(filteredRecords, record)
|
||
}
|
||
}
|
||
|
||
return filteredRecords, nil
|
||
}
|
||
|
||
// Update обновляет записи
|
||
func (j *JSONLFile) Update(data any, filter any, options LineDbAdapterOptions) ([]any, error) {
|
||
// Читаем все записи
|
||
allRecords, err := j.Read(options)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// Фильтруем записи для обновления
|
||
var recordsToUpdate []any
|
||
updateData, ok := data.(map[string]any)
|
||
if !ok {
|
||
return nil, fmt.Errorf("update data must be a map")
|
||
}
|
||
|
||
for _, record := range allRecords {
|
||
if j.matchesFilter(record, filter, options.StrictCompare) {
|
||
// Обновляем запись
|
||
if recordMap, ok := record.(map[string]any); ok {
|
||
for key, value := range updateData {
|
||
recordMap[key] = value
|
||
}
|
||
recordsToUpdate = append(recordsToUpdate, recordMap)
|
||
}
|
||
}
|
||
}
|
||
|
||
// Перезаписываем файл
|
||
if err := j.rewriteFile(allRecords); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return recordsToUpdate, nil
|
||
}
|
||
|
||
// Delete удаляет записи
|
||
func (j *JSONLFile) Delete(data any, options LineDbAdapterOptions) ([]any, error) {
|
||
// Читаем все записи
|
||
allRecords, err := j.Read(options)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
var remainingRecords []any
|
||
var deletedRecords []any
|
||
|
||
for _, record := range allRecords {
|
||
if j.matchesFilter(record, data, options.StrictCompare) {
|
||
deletedRecords = append(deletedRecords, record)
|
||
} else {
|
||
remainingRecords = append(remainingRecords, record)
|
||
}
|
||
}
|
||
|
||
// Перезаписываем файл
|
||
if err := j.rewriteFile(remainingRecords); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return deletedRecords, nil
|
||
}
|
||
|
||
// GetFilename возвращает имя файла
|
||
func (j *JSONLFile) GetFilename() string {
|
||
return j.filename
|
||
}
|
||
|
||
// GetCollectionName возвращает имя коллекции
|
||
func (j *JSONLFile) GetCollectionName() string {
|
||
return j.collectionName
|
||
}
|
||
|
||
// GetOptions возвращает опции
|
||
func (j *JSONLFile) GetOptions() JSONLFileOptions {
|
||
return j.options
|
||
}
|
||
|
||
// GetEncryptKey возвращает ключ шифрования
|
||
func (j *JSONLFile) GetEncryptKey() string {
|
||
return j.cypherKey
|
||
}
|
||
|
||
// matchesFilter проверяет соответствие записи фильтру
|
||
func (j *JSONLFile) matchesFilter(record any, filter any, strictCompare bool) bool {
|
||
if filter == nil {
|
||
return true
|
||
}
|
||
|
||
switch f := filter.(type) {
|
||
case string:
|
||
// Простая проверка по строке
|
||
recordStr := fmt.Sprintf("%v", record)
|
||
if strictCompare {
|
||
return recordStr == f
|
||
}
|
||
return strings.Contains(strings.ToLower(recordStr), strings.ToLower(f))
|
||
|
||
case map[string]any:
|
||
// Проверка по полям
|
||
if recordMap, ok := record.(map[string]any); ok {
|
||
for key, filterValue := range f {
|
||
recordValue, exists := recordMap[key]
|
||
if !exists {
|
||
return false
|
||
}
|
||
|
||
if !j.valuesMatch(recordValue, filterValue, strictCompare) {
|
||
return false
|
||
}
|
||
}
|
||
return true
|
||
}
|
||
|
||
case func(any) bool:
|
||
// Функция фильтрации
|
||
return f(record)
|
||
}
|
||
|
||
return false
|
||
}
|
||
|
||
// valuesMatch сравнивает значения
|
||
func (j *JSONLFile) valuesMatch(a, b any, strictCompare bool) bool {
|
||
if strictCompare {
|
||
return a == b
|
||
}
|
||
|
||
// Для строк - нечувствительное к регистру сравнение
|
||
if aStr, ok := a.(string); ok {
|
||
if bStr, ok := b.(string); ok {
|
||
return strings.Contains(strings.ToLower(aStr), strings.ToLower(bStr))
|
||
}
|
||
}
|
||
|
||
return a == b
|
||
}
|
||
|
||
// rewriteFile перезаписывает файл новыми данными
|
||
func (j *JSONLFile) rewriteFile(records []any) error {
|
||
// Создаем временный файл
|
||
tempFile := j.filename + ".tmp"
|
||
|
||
file, err := os.Create(tempFile)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to create temp file: %w", err)
|
||
}
|
||
defer file.Close()
|
||
|
||
// Записываем данные во временный файл
|
||
for _, record := range records {
|
||
jsonData, err := j.jsonMarshal(record)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to marshal JSON: %w", err)
|
||
}
|
||
|
||
line := string(jsonData)
|
||
|
||
// Шифруем если нужно (только cypherKey; Encode уже в jsonMarshal)
|
||
if j.cypherKey != "" && !j.options.Encode {
|
||
line = base64.StdEncoding.EncodeToString([]byte(line))
|
||
}
|
||
|
||
maxLineLen := j.allocSize - 1
|
||
if len(line) > maxLineLen {
|
||
return fmt.Errorf("record size %d exceeds allocSize-1 (%d)", len(line), maxLineLen)
|
||
}
|
||
// Дополняем до allocSize-1
|
||
if len(line) < maxLineLen {
|
||
line += strings.Repeat(" ", maxLineLen-len(line))
|
||
}
|
||
|
||
if _, err := file.WriteString(line + "\n"); err != nil {
|
||
return fmt.Errorf("failed to write line: %w", err)
|
||
}
|
||
}
|
||
|
||
// Заменяем оригинальный файл
|
||
if err := os.Rename(tempFile, j.filename); err != nil {
|
||
return fmt.Errorf("failed to rename temp file: %w", err)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// Destroy очищает ресурсы
|
||
func (j *JSONLFile) Destroy() {
|
||
j.mutex.Lock()
|
||
defer j.mutex.Unlock()
|
||
|
||
j.initialized = false
|
||
j.selectCache = nil
|
||
j.events = nil
|
||
}
|
||
|
||
// WithTransaction выполняет операцию в транзакции
|
||
func (j *JSONLFile) WithTransaction(callback func(*JSONLFile, LineDbAdapterOptions) error, transactionOptions TransactionOptions, methodsOptions LineDbAdapterOptions) error {
|
||
// Создаем транзакцию
|
||
tx := NewTransaction("write", generateTransactionID(), transactionOptions.Timeout, transactionOptions.Rollback)
|
||
|
||
// Создаем резервную копию если нужно
|
||
if transactionOptions.Rollback {
|
||
if err := tx.CreateBackup(j.filename); err != nil {
|
||
return fmt.Errorf("failed to create backup: %w", err)
|
||
}
|
||
}
|
||
|
||
j.transaction = tx
|
||
j.inTransaction = true
|
||
|
||
defer func() {
|
||
j.inTransaction = false
|
||
j.transaction = nil
|
||
}()
|
||
|
||
// Выполняем callback
|
||
if err := callback(j, methodsOptions); err != nil {
|
||
// Откатываем изменения если нужно
|
||
if transactionOptions.Rollback {
|
||
if restoreErr := tx.RestoreFromBackup(j.filename); restoreErr != nil {
|
||
return fmt.Errorf("failed to restore from backup: %w", restoreErr)
|
||
}
|
||
}
|
||
return err
|
||
}
|
||
|
||
// Очищаем резервную копию
|
||
if err := tx.CleanupBackup(); err != nil {
|
||
return fmt.Errorf("failed to cleanup backup: %w", err)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// generateTransactionID генерирует ID транзакции
|
||
func generateTransactionID() string {
|
||
return fmt.Sprintf("tx_%d", time.Now().UnixNano())
|
||
}
|