Compare commits

...

33 Commits

Author SHA1 Message Date
刘祥超
e42c8452ff 优化并发读写相关代码 2024-05-01 15:53:49 +08:00
刘祥超
fe6e5ba5f9 优化并发读写限制 2024-05-01 12:42:35 +08:00
刘祥超
406d5de482 从内存刷新到磁盘时无需并发写限制 2024-04-30 21:23:44 +08:00
刘祥超
f66e672bb4 读取缓存时总是尝试打开文件,不受并发读的限制 2024-04-30 20:26:34 +08:00
刘祥超
7252d00c8e 版本号修改为1.3.8 2024-04-30 19:09:57 +08:00
刘祥超
7ec4656aea 可以在集群设置中修改节点最大并发读/写数 2024-04-30 19:09:40 +08:00
刘祥超
33156fed2a 延长读写线程超时时间/增加相关测试用例 2024-04-30 17:24:08 +08:00
刘祥超
e8538d4e34 bfs commit 2024-04-30 12:38:43 +08:00
刘祥超
c7d60e01b8 在请求上下文中停用统计的时候也停用对应的指标统计 2024-04-30 00:05:32 +08:00
刘祥超
bbb4b7d31a 版本修改为1.3.7 2024-04-29 23:12:12 +08:00
刘祥超
aa2467c7cf 增加读写线程限制相关测试用例 2024-04-29 23:10:02 +08:00
刘祥超
fa02713ab5 限制读写线程最小值为4,最大值为32 2024-04-29 22:51:51 +08:00
刘祥超
973324ae8f 写入和删除缓存文件时增加线程数限制 2024-04-29 22:36:26 +08:00
刘祥超
7febc6aaf3 读取文件时增加线程数限制 2024-04-29 22:01:55 +08:00
刘祥超
1cede74db3 bfs:修复并发读提示ErrClosed都问题 2024-04-28 19:02:59 +08:00
刘祥超
0dbbc12eb7 bfs:实现对FileHeader中Block数据的compaction 2024-04-28 15:38:13 +08:00
刘祥超
b272de2122 bfs:对FileHeader的压缩和解压使用Pool管理 2024-04-28 10:06:29 +08:00
刘祥超
962cbde4e9 bfs:弹出BFile的时候确保没有正在被使用 2024-04-27 20:55:04 +08:00
刘祥超
6226e31cdc bfs:实现FileHeader的lazy load 2024-04-27 20:11:50 +08:00
刘祥超
fb8c78553a bfs:同一个Hash同时只能有一个Writer,避免多线程读冲突 2024-04-27 18:27:49 +08:00
刘祥超
801f5d4525 bfs:实现maxOpenFiles 2024-04-27 17:29:12 +08:00
刘祥超
04007bf8f1 bfs: 增加读写线程限制 2024-04-27 07:09:14 +08:00
刘祥超
8c044faace bfs: bfs.FS增加锁 (experimental) 2024-04-26 19:26:22 +08:00
刘祥超
f262e76f96 bfs commit (exprimental) 2024-04-26 18:44:29 +08:00
刘祥超
ef057b106e 初步实现bfs原型(仅用于实验) 2024-04-26 17:16:32 +08:00
刘祥超
508f8cbae0 调整KV相关选项尺寸 2024-04-26 17:15:55 +08:00
刘祥超
d8e4ff3d01 增强KV字节编码安全性 2024-04-24 14:33:15 +08:00
刘祥超
f247e57e76 优化KV相关日志 2024-04-24 09:03:15 +08:00
刘祥超
3581ce3763 优化KV相关错误提示/可以从路径直接加载数据库 2024-04-23 11:56:00 +08:00
刘祥超
b66c70de66 ${requestMethod}变量增加默认值GET 2024-04-22 21:32:53 +08:00
刘祥超
d3a4f14999 版本号修改为1.3.6.1 2024-04-22 18:46:29 +08:00
刘祥超
00a7324640 增加缓存索引相关测试用例 2024-04-22 18:46:21 +08:00
刘祥超
d8ea1809dc 优化代码 2024-04-22 12:38:18 +08:00
62 changed files with 4255 additions and 188 deletions

View File

@@ -55,3 +55,7 @@ func IsCapacityError(err error) bool {
var capacityErr *CapacityError
return errors.As(err, &capacityErr)
}
func IsBusyError(err error) bool {
return err != nil && errors.Is(err, ErrServerIsBusy)
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/types"
@@ -593,9 +594,9 @@ func (this *SQLiteFileListDB) shouldRecover() bool {
// 删除数据库文件
func (this *SQLiteFileListDB) deleteDB() {
_ = os.Remove(this.dbPath)
_ = os.Remove(this.dbPath + "-shm")
_ = os.Remove(this.dbPath + "-wal")
_ = fsutils.Remove(this.dbPath)
_ = fsutils.Remove(this.dbPath + "-shm")
_ = fsutils.Remove(this.dbPath + "-wal")
}
// 加载Hash列表

View File

@@ -91,7 +91,7 @@ func TestKVFileList_Add_Many(t *testing.T) {
err := list.Add(stringutil.Md5(strconv.Itoa(i)), &caches.Item{
Type: caches.ItemTypeFile,
Key: "https://www.example.com/index.html" + strconv.Itoa(i),
ExpiresAt: time.Now().Unix() + 60,
ExpiresAt: time.Now().Unix() + 3600,
StaleAt: 0,
HeaderSize: 0,
BodySize: int64(rand.Int() % 1_000_000),
@@ -176,6 +176,32 @@ func TestKVFileList_Exist(t *testing.T) {
}
}
func TestKVFileList_ExistMany(t *testing.T) {
var list = testOpenKVFileList(t)
defer func() {
_ = list.Close()
}()
var countFound int
var count = 10
if testutils.IsSingleTesting() {
count = 2_000_000
}
var before = time.Now()
for i := 0; i < count; i++ {
ok, _, err := list.Exist(stringutil.Md5(strconv.Itoa(i)))
if err != nil {
t.Fatal(err)
}
if ok {
countFound++
}
}
var costSeconds = time.Since(before).Seconds()
t.Log("total:", costSeconds*1000, "ms", "found:", countFound, "qps:", fmt.Sprintf("%.2fK/s", float64(count)/costSeconds/1000), "per read:", fmt.Sprintf("%.4fms", costSeconds*1000/float64(count)))
}
func TestKVFileList_ExistQuick(t *testing.T) {
var list = testOpenKVFileList(t)
defer func() {
@@ -211,6 +237,28 @@ func TestKVFileList_Remove(t *testing.T) {
}
}
func TestKVFileList_RemoveMany(t *testing.T) {
var list = testOpenKVFileList(t)
defer func() {
_ = list.Close()
}()
var count = 10
if testutils.IsSingleTesting() {
count = 2_000_000
}
var before = time.Now()
for i := 0; i < count; i++ {
err := list.Remove(stringutil.Md5(strconv.Itoa(i)))
if err != nil {
t.Fatal(err)
}
}
var costSeconds = time.Since(before).Seconds()
t.Log("total:", costSeconds*1000, "ms", "qps:", fmt.Sprintf("%.2fK/s", float64(count)/costSeconds/1000), "per delete:", fmt.Sprintf("%.4fms", costSeconds*1000/float64(count)))
}
func TestKVFileList_CleanAll(t *testing.T) {
var list = testOpenKVFileList(t)
defer func() {

View File

@@ -11,6 +11,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
"github.com/TeaOSLab/EdgeNode/internal/utils/fnv"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/iwind/TeaGo/types"
"os"
@@ -486,7 +487,7 @@ func (this *SQLiteFileList) UpgradeV3(oldDir string, brokenOnError bool) error {
remotelogs.Println("CACHE", "upgrading local database from '"+oldDir+"' ...")
defer func() {
_ = os.Remove(indexDBPath)
_ = fsutils.Remove(indexDBPath)
remotelogs.Println("CACHE", "upgrading local database finished")
}()

View File

@@ -7,8 +7,8 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fnv"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
"os"
"sync"
)
@@ -91,7 +91,7 @@ func (this *PartialRangesQueue) Get(filename string) ([]byte, error) {
return data, nil
}
return os.ReadFile(filename)
return fsutils.ReadFile(filename)
}
// Delete ranges filename
@@ -119,7 +119,7 @@ func (this *PartialRangesQueue) Dump() {
continue
}
err := os.WriteFile(filename, data, 0666)
err := fsutils.WriteFile(filename, data, 0666)
if err != nil {
remotelogs.Println("PARTIAL_RANGES_QUEUE", "write file '"+filename+"' failed: "+err.Error())
}

View File

@@ -3,6 +3,7 @@ package caches
import (
"encoding/binary"
"errors"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
rangeutils "github.com/TeaOSLab/EdgeNode/internal/utils/ranges"
"github.com/iwind/TeaGo/types"
"io"
@@ -10,7 +11,7 @@ import (
)
type FileReader struct {
fp *os.File
fp *fsutils.File
openFile *OpenFile
openFileCache *OpenFileCache
@@ -28,7 +29,7 @@ type FileReader struct {
isClosed bool
}
func NewFileReader(fp *os.File) *FileReader {
func NewFileReader(fp *fsutils.File) *FileReader {
return &FileReader{fp: fp}
}
@@ -305,7 +306,8 @@ func (this *FileReader) ReadBodyRange(buf []byte, start int64, end int64, callba
}
for {
n, err := this.fp.Read(buf)
var n int
n, err = this.fp.Read(buf)
if n > 0 {
var n2 = int(end-offset) + 1
if n2 <= n {
@@ -351,7 +353,7 @@ func (this *FileReader) ContainsRange(r rangeutils.Range) (r2 rangeutils.Range,
// FP 原始的文件句柄
func (this *FileReader) FP() *os.File {
return this.fp
return this.fp.Raw()
}
func (this *FileReader) Close() error {
@@ -366,7 +368,7 @@ func (this *FileReader) Close() error {
} else {
var cacheMeta = make([]byte, len(this.meta))
copy(cacheMeta, this.meta)
this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp, cacheMeta, this.header, this.LastModified(), this.bodySize))
this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp.Raw(), cacheMeta, this.header, this.LastModified(), this.bodySize))
}
return nil
}
@@ -374,7 +376,7 @@ func (this *FileReader) Close() error {
return this.fp.Close()
}
func (this *FileReader) readToBuff(fp *os.File, buf []byte) (ok bool, err error) {
func (this *FileReader) readToBuff(fp *fsutils.File, buf []byte) (ok bool, err error) {
n, err := fp.Read(buf)
if err != nil {
return false, err
@@ -393,5 +395,5 @@ func (this *FileReader) discard() error {
}
// remove file
return os.Remove(this.fp.Name())
return fsutils.Remove(this.fp.Name())
}

View File

@@ -4,10 +4,10 @@ import (
"encoding/binary"
"errors"
"fmt"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
rangeutils "github.com/TeaOSLab/EdgeNode/internal/utils/ranges"
"github.com/iwind/TeaGo/types"
"io"
"os"
)
type PartialFileReader struct {
@@ -17,7 +17,7 @@ type PartialFileReader struct {
rangePath string
}
func NewPartialFileReader(fp *os.File) *PartialFileReader {
func NewPartialFileReader(fp *fsutils.File) *PartialFileReader {
return &PartialFileReader{
FileReader: NewFileReader(fp),
rangePath: PartialRangesFilePath(fp.Name()),
@@ -146,7 +146,7 @@ func (this *PartialFileReader) IsCompleted() bool {
func (this *PartialFileReader) discard() error {
SharedPartialRangesQueue.Delete(this.rangePath)
_ = os.Remove(this.rangePath)
_ = fsutils.Remove(this.rangePath)
return this.FileReader.discard()
}

View File

@@ -380,6 +380,7 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
// 检查文件记录是否已过期
var estimatedSize int64
var existInList bool
if !useStale {
exists, filesize, err := this.list.Exist(hash)
if err != nil {
@@ -389,6 +390,7 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
return nil, ErrNotFound
}
estimatedSize = filesize
existInList = true
}
// 尝试通过MMAP读取
@@ -412,7 +414,13 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
var err error
if openFile == nil {
if existInList {
fsutils.ReaderLimiter.Ack()
}
fp, err = os.OpenFile(path, os.O_RDONLY, 0444)
if existInList {
fsutils.ReaderLimiter.Release()
}
if err != nil {
if !os.IsNotExist(err) {
return nil, err
@@ -431,12 +439,12 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
var reader Reader
if isPartial {
var partialFileReader = NewPartialFileReader(fp)
var partialFileReader = NewPartialFileReader(fsutils.NewFile(fp, fsutils.FlagRead))
partialFileReader.openFile = openFile
partialFileReader.openFileCache = openFileCache
reader = partialFileReader
} else {
var fileReader = NewFileReader(fp)
var fileReader = NewFileReader(fsutils.NewFile(fp, fsutils.FlagRead))
fileReader.openFile = openFile
fileReader.openFileCache = openFileCache
reader = fileReader
@@ -517,11 +525,6 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
return nil, fmt.Errorf("%w(001)", ErrFileIsWriting)
}
if !isFlushing && !fsutils.WriteReady() {
sharedWritingFileKeyLocker.Unlock()
return nil, ErrServerIsBusy
}
sharedWritingFileKeyMap[key] = zero.New()
sharedWritingFileKeyLocker.Unlock()
defer func() {
@@ -588,9 +591,9 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
// 数据库中是否存在
existsCacheItem, _, _ := this.list.Exist(hash)
if existsCacheItem {
readerFp, err := os.OpenFile(tmpPath, os.O_RDONLY, 0444)
readerFp, err := fsutils.OpenFile(tmpPath, os.O_RDONLY, 0444)
if err == nil {
var partialReader = NewPartialFileReader(readerFp)
var partialReader = NewPartialFileReader(fsutils.NewFile(readerFp, fsutils.FlagRead))
err = partialReader.Init()
_ = partialReader.Close()
if err == nil && partialReader.bodyOffset > 0 {
@@ -621,21 +624,31 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
if isNewCreated && existsFile {
flags |= os.O_TRUNC
}
fsutils.WriteBegin()
writer, err := os.OpenFile(tmpPath, flags, 0666)
fsutils.WriteEnd()
if !isFlushing {
if !fsutils.WriterLimiter.TryAck() {
return nil, ErrServerIsBusy
}
}
fp, err := os.OpenFile(tmpPath, flags, 0666)
if !isFlushing {
fsutils.WriterLimiter.Release()
}
if err != nil {
if os.IsNotExist(err) {
_ = os.MkdirAll(dir, 0777)
// open file again
writer, err = os.OpenFile(tmpPath, flags, 0666)
fsutils.WriterLimiter.Ack()
fp, err = os.OpenFile(tmpPath, flags, 0666)
fsutils.WriterLimiter.Release()
}
if err != nil {
return nil, err
}
}
var writer = fsutils.NewFile(fp, fsutils.FlagWrite)
var removeOnFailure = true
defer func() {
if err != nil {
@@ -646,13 +659,15 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
if !isOk {
_ = writer.Close()
if removeOnFailure {
_ = os.Remove(tmpPath)
_ = fsutils.Remove(tmpPath)
}
}
}()
// 尝试锁定,如果锁定失败,则直接返回
fsutils.WriterLimiter.Ack()
err = syscall.Flock(int(writer.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
fsutils.WriterLimiter.Release()
if err != nil {
removeOnFailure = false
return nil, fmt.Errorf("%w (003)", ErrFileIsWriting)
@@ -689,9 +704,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
metaBodySize = bodySize
}
fsutils.WriteBegin()
_, err = writer.Write(metaBytes)
fsutils.WriteEnd()
if err != nil {
return nil, err
}
@@ -1144,9 +1157,7 @@ func (this *FileStorage) purgeLoop() {
for i := 0; i < times; i++ {
countFound, err := this.list.Purge(purgeCount, func(hash string) error {
path, _ := this.hashPath(hash)
fsutils.WriteBegin()
err := this.removeCacheFile(path)
fsutils.WriteEnd()
if err != nil && !os.IsNotExist(err) {
remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error())
}
@@ -1203,9 +1214,7 @@ func (this *FileStorage) purgeLoop() {
var before = time.Now()
err := this.list.PurgeLFU(count, func(hash string) error {
path, _ := this.hashPath(hash)
fsutils.WriteBegin()
err := this.removeCacheFile(path)
fsutils.WriteEnd()
if err != nil && !os.IsNotExist(err) {
remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error())
}
@@ -1473,7 +1482,7 @@ func (this *FileStorage) removeCacheFile(path string) error {
openFileCache.Close(path)
}
var err = os.Remove(path)
var err = fsutils.Remove(path)
if err == nil || os.IsNotExist(err) {
err = nil
@@ -1485,7 +1494,7 @@ func (this *FileStorage) removeCacheFile(path string) error {
_, statErr := os.Stat(partialPath)
if statErr == nil {
_ = os.Remove(partialPath)
_ = fsutils.Remove(partialPath)
SharedPartialRangesQueue.Delete(partialPath)
}
}

View File

@@ -189,7 +189,7 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, h
if isDirty &&
this.parentStorage != nil &&
this.dirtyQueueSize > 0 &&
len(this.dirtyChan) >= this.dirtyQueueSize-int(fsutils.DiskMaxWrites) /** delta **/ { // 缓存时间过长
len(this.dirtyChan) >= this.dirtyQueueSize-64 /** delta **/ { // 缓存时间过长
return nil, ErrWritingQueueFull
}

View File

@@ -6,14 +6,13 @@ import (
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/iwind/TeaGo/types"
"io"
"os"
"strings"
"sync"
)
type FileWriter struct {
storage StorageInterface
rawWriter *os.File
rawWriter *fsutils.File
key string
metaHeaderSize int
@@ -26,9 +25,11 @@ type FileWriter struct {
maxSize int64
endFunc func()
once sync.Once
modifiedBytes int
}
func NewFileWriter(storage StorageInterface, rawWriter *os.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, maxSize int64, endFunc func()) *FileWriter {
func NewFileWriter(storage StorageInterface, rawWriter *fsutils.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, maxSize int64, endFunc func()) *FileWriter {
return &FileWriter{
storage: storage,
key: key,
@@ -43,9 +44,7 @@ func NewFileWriter(storage StorageInterface, rawWriter *os.File, key string, exp
// WriteHeader 写入数据
func (this *FileWriter) WriteHeader(data []byte) (n int, err error) {
fsutils.WriteBegin()
n, err = this.rawWriter.Write(data)
fsutils.WriteEnd()
this.headerSize += int64(n)
if err != nil {
_ = this.Discard()
@@ -79,7 +78,7 @@ func (this *FileWriter) Write(data []byte) (n int, err error) {
var l = len(data)
if l > (2 << 20) {
var offset = 0
const bufferSize = 256 << 10
const bufferSize = 64 << 10
for {
var end = offset + bufferSize
if end > l {
@@ -139,36 +138,30 @@ func (this *FileWriter) Close() error {
// check content length
if this.metaBodySize > 0 && this.bodySize != this.metaBodySize {
_ = this.rawWriter.Close()
_ = os.Remove(path)
_ = fsutils.Remove(path)
return ErrUnexpectedContentLength
}
err := this.WriteHeaderLength(types.Int(this.headerSize))
if err != nil {
fsutils.WriteBegin()
_ = this.rawWriter.Close()
fsutils.WriteEnd()
_ = os.Remove(path)
_ = fsutils.Remove(path)
return err
}
err = this.WriteBodyLength(this.bodySize)
if err != nil {
fsutils.WriteBegin()
_ = this.rawWriter.Close()
fsutils.WriteEnd()
_ = os.Remove(path)
_ = fsutils.Remove(path)
return err
}
fsutils.WriteBegin()
err = this.rawWriter.Close()
fsutils.WriteEnd()
if err != nil {
_ = os.Remove(path)
_ = fsutils.Remove(path)
} else if strings.HasSuffix(path, FileTmpSuffix) {
err = os.Rename(path, strings.Replace(path, FileTmpSuffix, "", 1))
err = fsutils.Rename(path, strings.Replace(path, FileTmpSuffix, "", 1))
if err != nil {
_ = os.Remove(path)
_ = fsutils.Remove(path)
}
}
@@ -181,11 +174,9 @@ func (this *FileWriter) Discard() error {
this.endFunc()
})
fsutils.WriteBegin()
_ = this.rawWriter.Close()
fsutils.WriteEnd()
err := os.Remove(this.rawWriter.Name())
err := fsutils.Remove(this.rawWriter.Name())
return err
}
@@ -211,9 +202,7 @@ func (this *FileWriter) ItemType() ItemType {
}
func (this *FileWriter) write(data []byte) (n int, err error) {
fsutils.WriteBegin()
n, err = this.rawWriter.Write(data)
fsutils.WriteEnd()
this.bodySize += int64(n)
if this.maxSize > 0 && this.bodySize > this.maxSize {
@@ -227,5 +216,6 @@ func (this *FileWriter) write(data []byte) (n int, err error) {
if err != nil {
_ = this.Discard()
}
return
}

View File

@@ -7,12 +7,11 @@ import (
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/iwind/TeaGo/types"
"io"
"os"
"sync"
)
type PartialFileWriter struct {
rawWriter *os.File
rawWriter *fsutils.File
key string
metaHeaderSize int
@@ -33,7 +32,7 @@ type PartialFileWriter struct {
rangePath string
}
func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, isNew bool, isPartial bool, bodyOffset int64, ranges *PartialRanges, endFunc func()) *PartialFileWriter {
func NewPartialFileWriter(rawWriter *fsutils.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, isNew bool, isPartial bool, bodyOffset int64, ranges *PartialRanges, endFunc func()) *PartialFileWriter {
return &PartialFileWriter{
key: key,
rawWriter: rawWriter,
@@ -54,9 +53,7 @@ func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) {
if !this.isNew {
return
}
fsutils.WriteBegin()
n, err = this.rawWriter.Write(data)
fsutils.WriteEnd()
this.headerSize += int64(n)
if err != nil {
_ = this.Discard()
@@ -65,9 +62,7 @@ func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) {
}
func (this *PartialFileWriter) AppendHeader(data []byte) error {
fsutils.WriteBegin()
_, err := this.rawWriter.Write(data)
fsutils.WriteEnd()
if err != nil {
_ = this.Discard()
} else {
@@ -104,9 +99,7 @@ func (this *PartialFileWriter) WriteHeaderLength(headerLength int) error {
// Write 写入数据
func (this *PartialFileWriter) Write(data []byte) (n int, err error) {
fsutils.WriteBegin()
n, err = this.rawWriter.Write(data)
fsutils.WriteEnd()
this.bodySize += int64(n)
if err != nil {
_ = this.Discard()
@@ -130,11 +123,14 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error {
// prevent extending too much space in a single writing
var maxOffset = this.ranges.Max()
if offset-maxOffset > 16<<20 {
var extendSizePerStep int64 = 1 << 20
var maxExtendSize int64 = 32 << 20
if fsutils.DiskIsExtremelyFast() {
maxExtendSize = 128 << 20
extendSizePerStep = 4 << 20
} else if fsutils.DiskIsFast() {
maxExtendSize = 64 << 20
extendSizePerStep = 2 << 20
}
if offset-maxOffset > maxExtendSize {
stat, err := this.rawWriter.Stat()
@@ -143,11 +139,8 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error {
}
// extend min size to prepare for file tail
const extendSizePerStep = 8 << 20
if stat.Size()+extendSizePerStep <= this.bodyOffset+offset+int64(len(data)) {
fsutils.WriteBegin()
_ = this.rawWriter.Truncate(stat.Size() + extendSizePerStep)
fsutils.WriteEnd()
return nil
}
}
@@ -161,9 +154,7 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error {
this.bodyOffset = SizeMeta + int64(keyLength) + this.headerSize
}
fsutils.WriteBegin()
_, err := this.rawWriter.WriteAt(data, this.bodyOffset+offset)
fsutils.WriteEnd()
if err != nil {
return err
}
@@ -207,9 +198,7 @@ func (this *PartialFileWriter) Close() error {
this.ranges.BodySize = this.bodySize
err := this.ranges.WriteToFile(this.rangePath)
if err != nil {
fsutils.WriteBegin()
_ = this.rawWriter.Close()
fsutils.WriteEnd()
this.remove()
return err
}
@@ -218,25 +207,19 @@ func (this *PartialFileWriter) Close() error {
if this.isNew {
err = this.WriteHeaderLength(types.Int(this.headerSize))
if err != nil {
fsutils.WriteBegin()
_ = this.rawWriter.Close()
fsutils.WriteEnd()
this.remove()
return err
}
err = this.WriteBodyLength(this.bodySize)
if err != nil {
fsutils.WriteBegin()
_ = this.rawWriter.Close()
fsutils.WriteEnd()
this.remove()
return err
}
}
fsutils.WriteBegin()
err = this.rawWriter.Close()
fsutils.WriteEnd()
if err != nil {
this.remove()
}
@@ -250,14 +233,14 @@ func (this *PartialFileWriter) Discard() error {
this.endFunc()
})
fsutils.WriteBegin()
_ = this.rawWriter.Close()
fsutils.WriteEnd()
SharedPartialRangesQueue.Delete(this.rangePath)
_ = os.Remove(this.rangePath)
err := os.Remove(this.rawWriter.Name())
_ = fsutils.Remove(this.rangePath)
err := fsutils.Remove(this.rawWriter.Name())
return err
}
@@ -287,8 +270,9 @@ func (this *PartialFileWriter) IsNew() bool {
}
func (this *PartialFileWriter) remove() {
_ = os.Remove(this.rawWriter.Name())
_ = fsutils.Remove(this.rawWriter.Name())
SharedPartialRangesQueue.Delete(this.rangePath)
_ = os.Remove(this.rangePath)
_ = fsutils.Remove(this.rangePath)
}

View File

@@ -4,6 +4,7 @@ package caches_test
import (
"github.com/TeaOSLab/EdgeNode/internal/caches"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/iwind/TeaGo/types"
"os"
"testing"
@@ -15,7 +16,7 @@ func TestPartialFileWriter_Write(t *testing.T) {
_ = os.Remove(path)
var reader = func() {
data, err := os.ReadFile(path)
data, err := fsutils.ReadFile(path)
if err != nil {
t.Fatal(err)
}

View File

@@ -1,7 +1,7 @@
package teaconst
const (
Version = "1.3.6"
Version = "1.3.8.1"
ProductName = "Edge Node"
ProcessName = "edge-node"

View File

@@ -71,7 +71,7 @@ func (this *HTTPClientPool) Client(req *HTTPRequest,
urlPort = "443"
}
originHost = originHost + ":" + urlPort
originHost += ":" + urlPort
}
var rawKey = origin.UniqueKey() + "@" + originAddr + "@" + originHost

View File

@@ -103,6 +103,8 @@ type HTTPRequest struct {
disableLog bool // 是否在当前请求中关闭Log
forceLog bool // 是否强制记录日志
disableMetrics bool // 不记录统计指标
isHijacked bool
// script相关操作
@@ -458,7 +460,7 @@ func (this *HTTPRequest) doEnd() {
stats.SharedDAUManager.AddIP(this.ReqServer.Id, this.requestRemoteAddr(true))
// 指标
if metrics.SharedManager.HasHTTPMetrics() {
if !this.disableMetrics && metrics.SharedManager.HasHTTPMetrics() {
this.doMetricsResponse()
}
@@ -825,6 +827,9 @@ func (this *HTTPRequest) Format(source string) string {
case "requestTime":
return fmt.Sprintf("%.6f", this.requestCost)
case "requestMethod":
if len(this.RawReq.Method) == 0 {
return http.MethodGet
}
return this.RawReq.Method
case "requestFilename":
filename := this.requestFilename()

View File

@@ -232,7 +232,12 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
if this.web.Compression != nil && this.web.Compression.IsOn {
_, encoding, ok := this.web.Compression.MatchAcceptEncoding(this.RawReq.Header.Get("Accept-Encoding"))
if ok {
reader, _ = storage.OpenReader(key+caches.SuffixWebP+caches.SuffixCompression+encoding, useStale, false)
reader, err = storage.OpenReader(key+caches.SuffixWebP+caches.SuffixCompression+encoding, useStale, false)
if err != nil && caches.IsBusyError(err) {
this.varMapping["cache.status"] = "BUSY"
this.cacheRef = nil
return
}
if reader != nil {
tags = append(tags, "webp", encoding)
}
@@ -244,7 +249,12 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
if webPIsEnabled && !isPartialRequest &&
!isHeadMethod &&
reader == nil {
reader, _ = storage.OpenReader(key+caches.SuffixWebP, useStale, false)
reader, err = storage.OpenReader(key+caches.SuffixWebP, useStale, false)
if err != nil && caches.IsBusyError(err) {
this.varMapping["cache.status"] = "BUSY"
this.cacheRef = nil
return
}
if reader != nil {
this.writer.cacheReaderSuffix = caches.SuffixWebP
tags = append(tags, "webp")
@@ -256,7 +266,12 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
if this.web.Compression != nil && this.web.Compression.IsOn {
_, encoding, ok := this.web.Compression.MatchAcceptEncoding(this.RawReq.Header.Get("Accept-Encoding"))
if ok {
reader, _ = storage.OpenReader(key+caches.SuffixCompression+encoding, useStale, false)
reader, err = storage.OpenReader(key+caches.SuffixCompression+encoding, useStale, false)
if err != nil && caches.IsBusyError(err) {
this.varMapping["cache.status"] = "BUSY"
this.cacheRef = nil
return
}
if reader != nil {
tags = append(tags, encoding)
}
@@ -269,6 +284,11 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
var partialRanges []rangeutils.Range
if reader == nil {
reader, err = storage.OpenReader(key, useStale, false)
if err != nil && caches.IsBusyError(err) {
this.varMapping["cache.status"] = "BUSY"
this.cacheRef = nil
return
}
if err != nil && this.cacheRef.AllowPartialContent {
// 尝试读取分片的缓存内容
if len(rangeHeader) == 0 && this.cacheRef.ForcePartialContent {
@@ -277,7 +297,11 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
}
if len(rangeHeader) > 0 {
pReader, ranges := this.tryPartialReader(storage, key, useStale, rangeHeader, this.cacheRef.ForcePartialContent)
pReader, ranges, goNext := this.tryPartialReader(storage, key, useStale, rangeHeader, this.cacheRef.ForcePartialContent)
if !goNext {
this.cacheRef = nil
return
}
if pReader != nil {
isPartialCache = true
reader = pReader
@@ -648,26 +672,33 @@ func (this *HTTPRequest) addExpiresHeader(expiresAt int64) {
}
// 尝试读取区间缓存
func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key string, useStale bool, rangeHeader string, forcePartialContent bool) (caches.Reader, []rangeutils.Range) {
func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key string, useStale bool, rangeHeader string, forcePartialContent bool) (resultReader caches.Reader, ranges []rangeutils.Range, goNext bool) {
goNext = true
// 尝试读取Partial cache
if len(rangeHeader) == 0 {
return nil, nil
return
}
ranges, ok := httpRequestParseRangeHeader(rangeHeader)
if !ok {
return nil, nil
return
}
pReader, pErr := storage.OpenReader(key+caches.SuffixPartial, useStale, true)
if pErr != nil {
return nil, nil
if caches.IsBusyError(pErr) {
this.varMapping["cache.status"] = "BUSY"
goNext = false
return
}
return
}
partialReader, ok := pReader.(*caches.PartialFileReader)
if !ok {
_ = pReader.Close()
return nil, nil
return
}
var isOk = false
defer func() {
@@ -681,7 +712,7 @@ func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key s
len(ranges) > 0 &&
ranges[0][1] < 0 &&
!partialReader.IsCompleted() {
return nil, nil
return
}
// 检查范围
@@ -689,15 +720,15 @@ func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key s
for index, r := range ranges {
r1, ok := r.Convert(partialReader.MaxLength())
if !ok {
return nil, nil
return
}
r2, ok := partialReader.ContainsRange(r1)
if !ok {
return nil, nil
return
}
ranges[index] = r2
}
isOk = true
return pReader, ranges
return pReader, ranges, true
}

View File

@@ -546,4 +546,6 @@ func (this *HTTPRequest) DisableStat() {
if this.web != nil {
this.web.StatRef = nil
}
this.disableMetrics = true
}

View File

@@ -28,6 +28,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
_ "github.com/TeaOSLab/EdgeNode/internal/utils/agents" // 引入Agent管理器
_ "github.com/TeaOSLab/EdgeNode/internal/utils/clock" // 触发时钟更新
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/TeaOSLab/EdgeNode/internal/utils/jsonutils"
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
"github.com/TeaOSLab/EdgeNode/internal/waf"
@@ -880,6 +881,10 @@ func (this *Node) onReload(config *nodeconfigs.NodeConfig, reloadAll bool) {
nodeconfigs.ResetNodeConfig(config)
sharedNodeConfig = config
// 并发读写数
fsutils.ReaderLimiter.SetThreads(config.MaxConcurrentReads)
fsutils.WriterLimiter.SetThreads(config.MaxConcurrentWrites)
if reloadAll {
// 缓存策略
var subDirs = config.CacheDiskSubDirs

2
internal/utils/bfs/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
DESIGN.md
test.*

View File

@@ -0,0 +1,15 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs
type BlockInfo struct {
OriginOffsetFrom int64 `json:"1,omitempty"`
OriginOffsetTo int64 `json:"2,omitempty"`
BFileOffsetFrom int64 `json:"3,omitempty"`
BFileOffsetTo int64 `json:"4,omitempty"`
}
func (this BlockInfo) Contains(offset int64) bool {
return this.OriginOffsetFrom <= offset && this.OriginOffsetTo > /** MUST be gt, NOT gte **/ offset
}

View File

@@ -0,0 +1,403 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs
import (
"errors"
"fmt"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"io"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
)
const BFileExt = ".b"
type BlockType string
const (
BlockTypeHeader BlockType = "header"
BlockTypeBody BlockType = "body"
)
type BlocksFile struct {
opt *BlockFileOptions
fp *os.File
mFile *MetaFile
isClosing bool
isClosed bool
mu *sync.RWMutex
writtenBytes int64
writingFileMap map[string]zero.Zero // hash => Zero
syncAt time.Time
readerPool chan *FileReader
countRefs int32
}
func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFile, error) {
options.EnsureDefaults()
var bFilename = fp.Name()
if !strings.HasSuffix(bFilename, BFileExt) {
return nil, errors.New("filename '" + bFilename + "' must has a '" + BFileExt + "' extension")
}
var mu = &sync.RWMutex{}
var mFilename = strings.TrimSuffix(bFilename, BFileExt) + MFileExt
mFile, err := OpenMetaFile(mFilename, mu)
if err != nil {
_ = fp.Close()
return nil, fmt.Errorf("load '%s' failed: %w", mFilename, err)
}
AckReadThread()
_, err = fp.Seek(0, io.SeekEnd)
ReleaseReadThread()
if err != nil {
_ = fp.Close()
_ = mFile.Close()
return nil, err
}
return &BlocksFile{
fp: fp,
mFile: mFile,
mu: mu,
opt: options,
syncAt: time.Now(),
readerPool: make(chan *FileReader, 32),
writingFileMap: map[string]zero.Zero{},
}, nil
}
func OpenBlocksFile(filename string, options *BlockFileOptions) (*BlocksFile, error) {
// TODO 考虑是否使用flock锁定防止多进程写冲突
fp, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
if os.IsNotExist(err) {
var dir = filepath.Dir(filename)
_ = os.MkdirAll(dir, 0777)
// try again
fp, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0666)
}
if err != nil {
return nil, fmt.Errorf("open blocks file failed: %w", err)
}
}
return NewBlocksFileWithRawFile(fp, options)
}
func (this *BlocksFile) Filename() string {
return this.fp.Name()
}
func (this *BlocksFile) Write(hash string, blockType BlockType, b []byte, originOffset int64) (n int, err error) {
if len(b) == 0 {
return
}
this.mu.Lock()
defer this.mu.Unlock()
posBefore, err := this.currentPos()
if err != nil {
return 0, err
}
err = this.checkStatus()
if err != nil {
return
}
AckWriteThread()
n, err = this.fp.Write(b)
ReleaseWriteThread()
if err == nil {
if n > 0 {
this.writtenBytes += int64(n)
}
if blockType == BlockTypeHeader {
err = this.mFile.WriteHeaderBlockUnsafe(hash, posBefore, posBefore+int64(n))
} else if blockType == BlockTypeBody {
err = this.mFile.WriteBodyBlockUnsafe(hash, posBefore, posBefore+int64(n), originOffset, originOffset+int64(n))
} else {
err = errors.New("invalid block type '" + string(blockType) + "'")
}
}
return
}
func (this *BlocksFile) OpenFileWriter(fileHash string, bodySize int64, isPartial bool) (writer *FileWriter, err error) {
err = CheckHashErr(fileHash)
if err != nil {
return nil, err
}
this.mu.Lock()
defer this.mu.Unlock()
_, isWriting := this.writingFileMap[fileHash]
if isWriting {
err = ErrFileIsWriting
return
}
this.writingFileMap[fileHash] = zero.Zero{}
err = this.checkStatus()
if err != nil {
return
}
return NewFileWriter(this, fileHash, bodySize, isPartial)
}
func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileReader, error) {
err := CheckHashErr(fileHash)
if err != nil {
return nil, err
}
this.mu.RLock()
err = this.checkStatus()
this.mu.RUnlock()
if err != nil {
return nil, err
}
// 是否存在
header, ok := this.mFile.CloneFileHeader(fileHash)
if !ok {
return nil, os.ErrNotExist
}
// TODO 对于partial content需要传入ranges用来判断是否有交集
if header.IsWriting {
return nil, ErrFileIsWriting
}
if !isPartial && !header.IsCompleted {
return nil, os.ErrNotExist
}
// 先尝试从Pool中获取
select {
case reader := <-this.readerPool:
if reader == nil {
return nil, ErrClosed
}
reader.Reset(header)
atomic.AddInt32(&this.countRefs, 1)
return reader, nil
default:
}
AckReadThread()
fp, err := os.Open(this.fp.Name())
ReleaseReadThread()
if err != nil {
return nil, err
}
atomic.AddInt32(&this.countRefs, 1)
return NewFileReader(this, fp, header), nil
}
func (this *BlocksFile) CloseFileReader(reader *FileReader) error {
defer atomic.AddInt32(&this.countRefs, -1)
select {
case this.readerPool <- reader:
return nil
default:
return reader.Free()
}
}
func (this *BlocksFile) ExistFile(fileHash string) bool {
err := CheckHashErr(fileHash)
if err != nil {
return false
}
return this.mFile.ExistFile(fileHash)
}
func (this *BlocksFile) RemoveFile(fileHash string) error {
err := CheckHashErr(fileHash)
if err != nil {
return err
}
return this.mFile.RemoveFile(fileHash)
}
func (this *BlocksFile) Sync() error {
this.mu.Lock()
defer this.mu.Unlock()
err := this.checkStatus()
if err != nil {
return err
}
return this.sync(false)
}
func (this *BlocksFile) ForceSync() error {
this.mu.Lock()
defer this.mu.Unlock()
err := this.checkStatus()
if err != nil {
return err
}
return this.sync(true)
}
func (this *BlocksFile) SyncAt() time.Time {
return this.syncAt
}
func (this *BlocksFile) Compact() error {
// TODO 需要实现
return nil
}
func (this *BlocksFile) RemoveAll() error {
this.mu.Lock()
defer this.mu.Unlock()
this.isClosed = true
_ = this.mFile.RemoveAll()
this.closeReaderPool()
_ = this.fp.Close()
return os.Remove(this.fp.Name())
}
// CanClose 检查是否可以关闭
func (this *BlocksFile) CanClose() bool {
this.mu.RLock()
defer this.mu.RUnlock()
if len(this.writingFileMap) > 0 || atomic.LoadInt32(&this.countRefs) > 0 {
return false
}
this.isClosing = true
return true
}
// Close 关闭当前文件
func (this *BlocksFile) Close() error {
this.mu.Lock()
defer this.mu.Unlock()
if this.isClosed {
return nil
}
// TODO 决定是否同步
//_ = this.sync(true)
this.isClosed = true
_ = this.mFile.Close()
this.closeReaderPool()
return this.fp.Close()
}
// IsClosing 判断当前文件是否正在关闭或者已关闭
func (this *BlocksFile) IsClosing() bool {
return this.isClosed || this.isClosing
}
func (this *BlocksFile) IncrRef() {
atomic.AddInt32(&this.countRefs, 1)
}
func (this *BlocksFile) DecrRef() {
atomic.AddInt32(&this.countRefs, -1)
}
func (this *BlocksFile) TestReaderPool() chan *FileReader {
return this.readerPool
}
func (this *BlocksFile) removeWritingFile(hash string) {
this.mu.Lock()
delete(this.writingFileMap, hash)
this.mu.Unlock()
}
func (this *BlocksFile) checkStatus() error {
if this.isClosed || this.isClosing {
return fmt.Errorf("check status failed: %w", ErrClosed)
}
return nil
}
func (this *BlocksFile) currentPos() (int64, error) {
return this.fp.Seek(0, io.SeekCurrent)
}
func (this *BlocksFile) sync(force bool) error {
if !force {
if this.writtenBytes < this.opt.BytesPerSync {
return nil
}
}
if this.writtenBytes > 0 {
AckWriteThread()
err := this.fp.Sync()
ReleaseWriteThread()
if err != nil {
return err
}
}
this.writtenBytes = 0
this.syncAt = time.Now()
if force {
return this.mFile.SyncUnsafe()
}
return nil
}
func (this *BlocksFile) closeReaderPool() {
for {
select {
case reader := <-this.readerPool:
if reader != nil {
_ = reader.Free()
}
default:
return
}
}
}

View File

@@ -0,0 +1,17 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs
type BlockFileOptions struct {
BytesPerSync int64
}
func (this *BlockFileOptions) EnsureDefaults() {
if this.BytesPerSync <= 0 {
this.BytesPerSync = 1 << 20
}
}
var DefaultBlockFileOptions = &BlockFileOptions{
BytesPerSync: 1 << 20,
}

View File

@@ -0,0 +1,86 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs_test
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/bfs"
"github.com/iwind/TeaGo/assert"
"os"
"testing"
)
func TestBlocksFile_CanClose(t *testing.T) {
var a = assert.NewAssertion(t)
bFile, openErr := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if openErr != nil {
if os.IsNotExist(openErr) {
return
}
t.Fatal(openErr)
}
reader, err := bFile.OpenFileReader(bfs.Hash("123456"), false)
if err != nil {
t.Fatal(err)
}
a.IsTrue(!bFile.CanClose())
err = reader.Close()
if err != nil {
t.Fatal(err)
}
// duplicated close
err = reader.Close()
if err != nil {
t.Fatal(err)
}
a.IsTrue(bFile.CanClose())
}
func TestBlocksFile_OpenFileWriter_SameHash(t *testing.T) {
bFile, openErr := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if openErr != nil {
if os.IsNotExist(openErr) {
return
}
t.Fatal(openErr)
}
{
writer, err := bFile.OpenFileWriter(bfs.Hash("123456"), -1, false)
if err != nil {
t.Fatal(err)
}
_ = writer.Close()
}
{
writer, err := bFile.OpenFileWriter(bfs.Hash("123456"), -1, false)
if err != nil {
t.Fatal(err)
}
_ = writer.Close()
}
}
func TestBlocksFile_RemoveAll(t *testing.T) {
bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if err != nil {
if os.IsNotExist(err) {
return
}
t.Fatal(err)
}
defer func() {
_ = bFile.Close()
}()
err = bFile.RemoveAll()
if err != nil {
t.Fatal(err)
}
}

View File

@@ -0,0 +1,20 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs
import (
"errors"
"os"
)
var ErrClosed = errors.New("the file closed")
var ErrInvalidHash = errors.New("invalid hash")
var ErrFileIsWriting = errors.New("the file is writing")
func IsWritingErr(err error) bool {
return err != nil && errors.Is(err, ErrFileIsWriting)
}
func IsNotExist(err error) bool {
return err != nil && os.IsNotExist(err)
}

View File

@@ -0,0 +1,203 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs
import (
"encoding/json"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"sort"
)
type FileHeader struct {
Version int `json:"1,omitempty"`
ModifiedAt int64 `json:"2,omitempty"`
ExpiresAt int64 `json:"3,omitempty"`
Status int `json:"4,omitempty"`
HeaderSize int64 `json:"5,omitempty"`
BodySize int64 `json:"6,omitempty"`
ExpiredBodySize int64 `json:"7,omitempty"`
HeaderBlocks []BlockInfo `json:"8,omitempty"`
BodyBlocks []BlockInfo `json:"9,omitempty"`
IsCompleted bool `json:"10,omitempty"`
IsWriting bool `json:"11,omitempty"`
}
func (this *FileHeader) BlockAt(offset int64) (blockInfo BlockInfo, ok bool) {
var l = len(this.BodyBlocks)
if l == 1 {
if this.BodyBlocks[0].Contains(offset) {
return this.BodyBlocks[0], true
}
return
}
sort.Search(l, func(i int) bool {
if this.BodyBlocks[i].Contains(offset) {
blockInfo = this.BodyBlocks[i]
ok = true
return true
}
return this.BodyBlocks[i].OriginOffsetFrom > offset
})
return
}
func (this *FileHeader) MaxOffset() int64 {
var l = len(this.BodyBlocks)
if l > 0 {
return this.BodyBlocks[l-1].OriginOffsetTo
}
return 0
}
// Compact blocks
func (this *FileHeader) Compact() {
this.compactHeader()
this.compactBody()
}
// compact header blocks
func (this *FileHeader) compactHeader() {
var l = len(this.HeaderBlocks)
if l > 1 {
// 合并
var newBlocks []BlockInfo
var newIndex int
for index, currentBlock := range this.HeaderBlocks {
if index == 0 {
newBlocks = append(newBlocks, currentBlock)
newIndex++
continue
}
var lastBlock = newBlocks[newIndex-1]
if currentBlock.OriginOffsetFrom >= lastBlock.OriginOffsetFrom &&
currentBlock.OriginOffsetFrom <= /* MUST gte */ lastBlock.OriginOffsetTo &&
currentBlock.OriginOffsetFrom-lastBlock.OriginOffsetFrom == currentBlock.BFileOffsetFrom-lastBlock.BFileOffsetFrom /* 两侧距离一致 */ {
if currentBlock.OriginOffsetTo > lastBlock.OriginOffsetTo {
lastBlock.OriginOffsetTo = currentBlock.OriginOffsetTo
lastBlock.BFileOffsetTo = currentBlock.BFileOffsetTo
newBlocks[newIndex-1] = lastBlock
}
} else {
newBlocks = append(newBlocks, currentBlock)
newIndex++
}
}
this.HeaderBlocks = newBlocks
}
}
// sort and compact body blocks
func (this *FileHeader) compactBody() {
var l = len(this.BodyBlocks)
if l > 0 {
if l > 1 {
// 排序
sort.Slice(this.BodyBlocks, func(i, j int) bool {
var block1 = this.BodyBlocks[i]
var block2 = this.BodyBlocks[j]
if block1.OriginOffsetFrom == block1.OriginOffsetFrom {
return block1.OriginOffsetTo < block2.OriginOffsetTo
}
return block1.OriginOffsetFrom < block2.OriginOffsetFrom
})
// 合并
var newBlocks []BlockInfo
var newIndex int
for index, currentBlock := range this.BodyBlocks {
if index == 0 {
newBlocks = append(newBlocks, currentBlock)
newIndex++
continue
}
var lastBlock = newBlocks[newIndex-1]
if currentBlock.OriginOffsetFrom >= lastBlock.OriginOffsetFrom &&
currentBlock.OriginOffsetFrom <= /* MUST gte */ lastBlock.OriginOffsetTo &&
currentBlock.OriginOffsetFrom-lastBlock.OriginOffsetFrom == currentBlock.BFileOffsetFrom-lastBlock.BFileOffsetFrom /* 两侧距离一致 */ {
if currentBlock.OriginOffsetTo > lastBlock.OriginOffsetTo {
lastBlock.OriginOffsetTo = currentBlock.OriginOffsetTo
lastBlock.BFileOffsetTo = currentBlock.BFileOffsetTo
newBlocks[newIndex-1] = lastBlock
}
} else {
newBlocks = append(newBlocks, currentBlock)
newIndex++
}
}
this.BodyBlocks = newBlocks
l = len(this.BodyBlocks)
}
// 检查是否已完成
var isCompleted = true
if this.BodyBlocks[0].OriginOffsetFrom != 0 || this.BodyBlocks[len(this.BodyBlocks)-1].OriginOffsetTo != this.BodySize {
isCompleted = false
} else {
for index, block := range this.BodyBlocks {
// 是否有不连续的
if index > 0 && block.OriginOffsetFrom > this.BodyBlocks[index-1].OriginOffsetTo {
isCompleted = false
break
}
}
}
this.IsCompleted = isCompleted
}
}
// Clone current header
func (this *FileHeader) Clone() *FileHeader {
return &FileHeader{
Version: this.Version,
ModifiedAt: this.ModifiedAt,
ExpiresAt: this.ExpiresAt,
Status: this.Status,
HeaderSize: this.HeaderSize,
BodySize: this.BodySize,
ExpiredBodySize: this.ExpiredBodySize,
HeaderBlocks: this.HeaderBlocks,
BodyBlocks: this.BodyBlocks,
IsCompleted: this.IsCompleted,
IsWriting: this.IsWriting,
}
}
func (this *FileHeader) Encode(hash string) ([]byte, error) {
headerJSON, err := json.Marshal(this)
if err != nil {
return nil, err
}
// we do not compress data which size is less than 100 bytes
if len(headerJSON) < 100 {
return EncodeMetaBlock(MetaActionNew, hash, append([]byte("json:"), headerJSON...))
}
var buf = utils.SharedBufferPool.Get()
defer utils.SharedBufferPool.Put(buf)
compressor, err := SharedCompressPool.Get(buf)
if err != nil {
return nil, err
}
_, err = compressor.Write(headerJSON)
if err != nil {
_ = compressor.Close()
SharedCompressPool.Put(compressor)
return nil, err
}
err = compressor.Close()
SharedCompressPool.Put(compressor)
if err != nil {
return nil, err
}
return EncodeMetaBlock(MetaActionNew, hash, buf.Bytes())
}

View File

@@ -0,0 +1,67 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs
import (
"bytes"
"encoding/json"
)
// LazyFileHeader load file header lazily to save memory
type LazyFileHeader struct {
rawData []byte
fileHeader *FileHeader
}
func NewLazyFileHeaderFromData(rawData []byte) *LazyFileHeader {
return &LazyFileHeader{
rawData: rawData,
}
}
func NewLazyFileHeader(fileHeader *FileHeader) *LazyFileHeader {
return &LazyFileHeader{
fileHeader: fileHeader,
}
}
func (this *LazyFileHeader) FileHeaderUnsafe() (*FileHeader, error) {
if this.fileHeader != nil {
return this.fileHeader, nil
}
var jsonPrefix = []byte("json:")
var header = &FileHeader{}
// json
if bytes.HasPrefix(this.rawData, jsonPrefix) {
err := json.Unmarshal(this.rawData[len(jsonPrefix):], header)
if err != nil {
return nil, err
}
return header, nil
}
decompressor, err := SharedDecompressPool.Get(bytes.NewBuffer(this.rawData))
if err != nil {
return nil, err
}
defer func() {
_ = decompressor.Close()
SharedDecompressPool.Put(decompressor)
}()
err = json.NewDecoder(decompressor).Decode(header)
if err != nil {
return nil, err
}
header.IsWriting = false
this.fileHeader = header
this.rawData = nil
return header, nil
}

View File

@@ -0,0 +1,87 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs_test
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/bfs"
"runtime"
"testing"
)
func TestNewLazyFileHeaderFromData(t *testing.T) {
var header = &bfs.FileHeader{
Version: 1,
Status: 200,
BodyBlocks: []bfs.BlockInfo{
{
BFileOffsetFrom: 0,
BFileOffsetTo: 1 << 20,
},
},
}
blockBytes, err := header.Encode(bfs.Hash("123456"))
if err != nil {
t.Fatal(err)
}
_, _, rawData, err := bfs.DecodeMetaBlock(blockBytes)
if err != nil {
t.Fatal(err)
}
var lazyHeader = bfs.NewLazyFileHeaderFromData(rawData)
newHeader, err := lazyHeader.FileHeaderUnsafe()
if err != nil {
t.Fatal(err)
}
t.Log(newHeader)
}
func BenchmarkLazyFileHeader_Decode(b *testing.B) {
runtime.GOMAXPROCS(12)
var header = &bfs.FileHeader{
Version: 1,
Status: 200,
BodyBlocks: []bfs.BlockInfo{},
}
var offset int64
for {
var end = offset + 16<<10
if end > 1<<20 {
break
}
header.BodyBlocks = append(header.BodyBlocks, bfs.BlockInfo{
BFileOffsetFrom: offset,
BFileOffsetTo: end,
})
offset = end
}
var hash = bfs.Hash("123456")
blockBytes, err := header.Encode(hash)
if err != nil {
b.Fatal(err)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, _, rawData, decodeErr := bfs.DecodeMetaBlock(blockBytes)
if decodeErr != nil {
b.Fatal(decodeErr)
}
var lazyHeader = bfs.NewLazyFileHeaderFromData(rawData)
_, decodeErr = lazyHeader.FileHeaderUnsafe()
if decodeErr != nil {
b.Fatal(decodeErr)
}
}
})
}

View File

@@ -0,0 +1,432 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs_test
import (
"encoding/json"
"github.com/TeaOSLab/EdgeNode/internal/utils/bfs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
"github.com/iwind/TeaGo/assert"
"github.com/iwind/TeaGo/logs"
"math/rand"
"runtime"
"testing"
)
func TestFileHeader_Compact(t *testing.T) {
var a = assert.NewAssertion(t)
{
var header = &bfs.FileHeader{
Version: 1,
Status: 200,
BodySize: 100,
BodyBlocks: []bfs.BlockInfo{
{
OriginOffsetFrom: 0,
OriginOffsetTo: 100,
},
},
}
header.Compact()
a.IsTrue(header.IsCompleted)
}
{
var header = &bfs.FileHeader{
Version: 1,
Status: 200,
BodySize: 200,
BodyBlocks: []bfs.BlockInfo{
{
OriginOffsetFrom: 100,
OriginOffsetTo: 200,
},
{
OriginOffsetFrom: 0,
OriginOffsetTo: 100,
},
},
}
header.Compact()
a.IsTrue(header.IsCompleted)
}
{
var header = &bfs.FileHeader{
Version: 1,
Status: 200,
BodySize: 200,
BodyBlocks: []bfs.BlockInfo{
{
OriginOffsetFrom: 10,
OriginOffsetTo: 99,
},
{
OriginOffsetFrom: 110,
OriginOffsetTo: 200,
},
{
OriginOffsetFrom: 88,
OriginOffsetTo: 120,
},
{
OriginOffsetFrom: 0,
OriginOffsetTo: 100,
},
},
}
header.Compact()
a.IsTrue(header.IsCompleted)
}
{
var header = &bfs.FileHeader{
Version: 1,
Status: 200,
BodySize: 100,
BodyBlocks: []bfs.BlockInfo{
{
OriginOffsetFrom: 10,
OriginOffsetTo: 100,
},
{
OriginOffsetFrom: 100,
OriginOffsetTo: 200,
},
},
}
header.Compact()
a.IsFalse(header.IsCompleted)
}
{
var header = &bfs.FileHeader{
Version: 1,
Status: 200,
BodySize: 200,
BodyBlocks: []bfs.BlockInfo{
{
OriginOffsetFrom: 0,
OriginOffsetTo: 100,
},
{
OriginOffsetFrom: 100,
OriginOffsetTo: 199,
},
},
}
header.Compact()
a.IsFalse(header.IsCompleted)
}
{
var header = &bfs.FileHeader{
Version: 1,
Status: 200,
BodySize: 200,
BodyBlocks: []bfs.BlockInfo{
{
OriginOffsetFrom: 0,
OriginOffsetTo: 100,
},
{
OriginOffsetFrom: 101,
OriginOffsetTo: 200,
},
},
}
header.Compact()
a.IsFalse(header.IsCompleted)
}
}
func TestFileHeader_Compact_Merge(t *testing.T) {
var a = assert.NewAssertion(t)
var header = &bfs.FileHeader{
Version: 1,
Status: 200,
HeaderBlocks: []bfs.BlockInfo{
{
BFileOffsetFrom: 1000,
BFileOffsetTo: 1100,
OriginOffsetFrom: 1200,
OriginOffsetTo: 1300,
},
{
BFileOffsetFrom: 1100,
BFileOffsetTo: 1200,
OriginOffsetFrom: 1300,
OriginOffsetTo: 1400,
},
},
BodyBlocks: []bfs.BlockInfo{
{
BFileOffsetFrom: 0,
BFileOffsetTo: 100,
OriginOffsetFrom: 200,
OriginOffsetTo: 300,
},
{
BFileOffsetFrom: 100,
BFileOffsetTo: 200,
OriginOffsetFrom: 300,
OriginOffsetTo: 400,
},
{
BFileOffsetFrom: 200,
BFileOffsetTo: 300,
OriginOffsetFrom: 400,
OriginOffsetTo: 500,
},
},
}
header.Compact()
logs.PrintAsJSON(header.HeaderBlocks)
logs.PrintAsJSON(header.BodyBlocks)
a.IsTrue(len(header.HeaderBlocks) == 1)
a.IsTrue(len(header.BodyBlocks) == 1)
}
func TestFileHeader_Compact_Merge2(t *testing.T) {
var header = &bfs.FileHeader{
Version: 1,
Status: 200,
BodyBlocks: []bfs.BlockInfo{
{
BFileOffsetFrom: 0,
BFileOffsetTo: 100,
OriginOffsetFrom: 200,
OriginOffsetTo: 300,
},
{
BFileOffsetFrom: 101,
BFileOffsetTo: 200,
OriginOffsetFrom: 301,
OriginOffsetTo: 400,
},
{
BFileOffsetFrom: 200,
BFileOffsetTo: 300,
OriginOffsetFrom: 400,
OriginOffsetTo: 500,
},
},
}
header.Compact()
logs.PrintAsJSON(header.BodyBlocks)
}
func TestFileHeader_Clone(t *testing.T) {
var a = assert.NewAssertion(t)
var header = &bfs.FileHeader{
Version: 1,
Status: 200,
BodyBlocks: []bfs.BlockInfo{
{
BFileOffsetFrom: 0,
BFileOffsetTo: 100,
},
},
}
var clonedHeader = header.Clone()
t.Log("=== cloned header ===")
logs.PrintAsJSON(clonedHeader, t)
a.IsTrue(len(clonedHeader.BodyBlocks) == 1)
header.BodyBlocks = append(header.BodyBlocks, bfs.BlockInfo{
BFileOffsetFrom: 100,
BFileOffsetTo: 200,
})
header.BodyBlocks = append(header.BodyBlocks, bfs.BlockInfo{
BFileOffsetFrom: 300,
BFileOffsetTo: 400,
})
clonedHeader.BodyBlocks[0].OriginOffsetFrom = 100000000
t.Log("=== after changed ===")
logs.PrintAsJSON(clonedHeader, t)
a.IsTrue(len(clonedHeader.BodyBlocks) == 1)
t.Log("=== original header ===")
logs.PrintAsJSON(header, t)
a.IsTrue(header.BodyBlocks[0].OriginOffsetFrom != clonedHeader.BodyBlocks[0].OriginOffsetFrom)
}
func TestFileHeader_Encode(t *testing.T) {
{
var header = &bfs.FileHeader{
Version: 1,
Status: 200,
ModifiedAt: fasttime.Now().Unix(),
ExpiresAt: fasttime.Now().Unix() + 3600,
BodySize: 1 << 20,
HeaderSize: 1 << 10,
BodyBlocks: []bfs.BlockInfo{
{
BFileOffsetFrom: 1 << 10,
BFileOffsetTo: 1 << 20,
},
},
}
data, err := header.Encode(bfs.Hash("123456"))
if err != nil {
t.Fatal(err)
}
jsonBytes, _ := json.Marshal(header)
t.Log(len(header.BodyBlocks), "blocks", len(data), "bytes", "json:", len(jsonBytes), "bytes")
_, _, _, err = bfs.DecodeMetaBlock(data)
if err != nil {
t.Fatal(err)
}
}
{
var header = &bfs.FileHeader{
Version: 1,
Status: 200,
BodyBlocks: []bfs.BlockInfo{},
}
var offset int64
for {
var end = offset + 16<<10
if end > 256<<10 {
break
}
header.BodyBlocks = append(header.BodyBlocks, bfs.BlockInfo{
BFileOffsetFrom: offset,
BFileOffsetTo: end,
})
offset = end
}
data, err := header.Encode(bfs.Hash("123456"))
if err != nil {
t.Fatal(err)
}
jsonBytes, _ := json.Marshal(header)
t.Log(len(header.BodyBlocks), "blocks", len(data), "bytes", "json:", len(jsonBytes), "bytes")
}
{
var header = &bfs.FileHeader{
Version: 1,
Status: 200,
BodyBlocks: []bfs.BlockInfo{},
}
var offset int64
for {
var end = offset + 16<<10
if end > 512<<10 {
break
}
header.BodyBlocks = append(header.BodyBlocks, bfs.BlockInfo{
BFileOffsetFrom: offset,
BFileOffsetTo: end,
})
offset = end
}
data, err := header.Encode(bfs.Hash("123456"))
if err != nil {
t.Fatal(err)
}
jsonBytes, _ := json.Marshal(header)
t.Log(len(header.BodyBlocks), "blocks", len(data), "bytes", "json:", len(jsonBytes), "bytes")
}
{
var header = &bfs.FileHeader{
Version: 1,
Status: 200,
BodyBlocks: []bfs.BlockInfo{},
}
var offset int64
for {
var end = offset + 16<<10
if end > 1<<20 {
break
}
header.BodyBlocks = append(header.BodyBlocks, bfs.BlockInfo{
BFileOffsetFrom: offset,
BFileOffsetTo: end,
})
offset = end
}
data, err := header.Encode(bfs.Hash("123456"))
if err != nil {
t.Fatal(err)
}
jsonBytes, _ := json.Marshal(header)
t.Log(len(header.BodyBlocks), "blocks", len(data), "bytes", "json:", len(jsonBytes), "bytes")
}
}
func BenchmarkFileHeader_Compact(b *testing.B) {
for i := 0; i < b.N; i++ {
var header = &bfs.FileHeader{
Version: 1,
Status: 200,
BodySize: 200,
BodyBlocks: nil,
}
for j := 0; j < 100; j++ {
header.BodyBlocks = append(header.BodyBlocks, bfs.BlockInfo{
OriginOffsetFrom: int64(j * 100),
OriginOffsetTo: int64(j * 200),
BFileOffsetFrom: 0,
BFileOffsetTo: 0,
})
}
header.Compact()
}
}
func BenchmarkFileHeader_Encode(b *testing.B) {
runtime.GOMAXPROCS(12)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var header = &bfs.FileHeader{
Version: 1,
Status: 200,
ModifiedAt: rand.Int63(),
BodySize: rand.Int63(),
BodyBlocks: []bfs.BlockInfo{},
}
var offset int64
for {
var end = offset + 16<<10
if end > 2<<20 {
break
}
header.BodyBlocks = append(header.BodyBlocks, bfs.BlockInfo{
BFileOffsetFrom: offset + int64(rand.Int()%1000000),
BFileOffsetTo: end + int64(rand.Int()%1000000),
})
offset = end
}
var hash = bfs.Hash("123456")
_, err := header.Encode(hash)
if err != nil {
b.Fatal(err)
}
}
})
}

View File

@@ -0,0 +1,88 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs
import (
"errors"
"github.com/iwind/TeaGo/types"
"io"
"os"
)
type FileReader struct {
bFile *BlocksFile
fp *os.File
fileHeader *FileHeader
pos int64
isClosed bool
}
func NewFileReader(bFile *BlocksFile, fp *os.File, fileHeader *FileHeader) *FileReader {
return &FileReader{
bFile: bFile,
fp: fp,
fileHeader: fileHeader,
}
}
func (this *FileReader) FileHeader() *FileHeader {
return this.fileHeader
}
func (this *FileReader) Read(b []byte) (n int, err error) {
n, err = this.ReadAt(b, this.pos)
this.pos += int64(n)
return
}
func (this *FileReader) ReadAt(b []byte, offset int64) (n int, err error) {
if offset >= this.fileHeader.MaxOffset() {
err = io.EOF
return
}
blockInfo, ok := this.fileHeader.BlockAt(offset)
if !ok {
err = errors.New("could not find block at '" + types.String(offset) + "'")
return
}
var delta = offset - blockInfo.OriginOffsetFrom
var bFrom = blockInfo.BFileOffsetFrom + delta
var bTo = blockInfo.BFileOffsetTo
if bFrom > bTo {
err = errors.New("invalid block information")
return
}
var bufLen = len(b)
if int64(bufLen) > bTo-bFrom {
bufLen = int(bTo - bFrom)
}
AckReadThread()
n, err = this.fp.ReadAt(b[:bufLen], bFrom)
ReleaseReadThread()
return
}
func (this *FileReader) Reset(fileHeader *FileHeader) {
this.fileHeader = fileHeader
this.pos = 0
}
func (this *FileReader) Close() error {
if this.isClosed {
return nil
}
this.isClosed = true
return this.bFile.CloseFileReader(this)
}
func (this *FileReader) Free() error {
return this.fp.Close()
}

View File

@@ -0,0 +1,237 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs_test
import (
"fmt"
"github.com/TeaOSLab/EdgeNode/internal/utils/bfs"
"io"
"os"
"testing"
"time"
)
func TestFileReader_Read_SmallBuf(t *testing.T) {
bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if err != nil {
t.Fatal(err)
}
reader, err := bFile.OpenFileReader(bfs.Hash("123456"), false)
if err != nil {
if os.IsNotExist(err) {
t.Log(err)
return
}
t.Fatal(err)
}
defer func() {
_ = reader.Close()
}()
var buf = make([]byte, 3)
for {
n, readErr := reader.Read(buf)
if n > 0 {
t.Log(string(buf[:n]))
}
if readErr != nil {
if readErr == io.EOF {
break
}
t.Fatal(readErr)
}
}
}
func TestFileReader_Read_LargeBuff(t *testing.T) {
bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if err != nil {
if os.IsNotExist(err) {
t.Log(err)
return
}
t.Fatal(err)
}
reader, err := bFile.OpenFileReader(bfs.Hash("123456"), false)
if err != nil {
if os.IsNotExist(err) {
t.Log(err)
return
}
t.Fatal(err)
}
defer func() {
_ = reader.Close()
}()
var buf = make([]byte, 128)
for {
n, readErr := reader.Read(buf)
if n > 0 {
t.Log(string(buf[:n]))
}
if readErr != nil {
if readErr == io.EOF {
break
}
t.Fatal(readErr)
}
}
}
func TestFileReader_Read_LargeFile(t *testing.T) {
bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if err != nil {
if os.IsNotExist(err) {
t.Log(err)
return
}
t.Fatal(err)
}
reader, err := bFile.OpenFileReader(bfs.Hash("123456@LARGE"), false)
if err != nil {
if os.IsNotExist(err) {
t.Log(err)
return
}
t.Fatal(err)
}
defer func() {
_ = reader.Close()
}()
var buf = make([]byte, 16<<10)
var totalSize int64
var before = time.Now()
for {
n, readErr := reader.Read(buf)
if n > 0 {
totalSize += int64(n)
}
if readErr != nil {
if readErr == io.EOF {
break
}
t.Fatal(readErr)
}
}
t.Log("totalSize:", totalSize>>20, "MiB", "cost:", fmt.Sprintf("%.4fms", time.Since(before).Seconds()*1000))
}
func TestFileReader_ReadAt(t *testing.T) {
bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if err != nil {
if os.IsNotExist(err) {
t.Log(err)
return
}
t.Fatal(err)
}
reader, err := bFile.OpenFileReader(bfs.Hash("123456"), false)
if err != nil {
if os.IsNotExist(err) {
t.Log(err)
return
}
t.Fatal(err)
}
defer func() {
_ = reader.Close()
}()
{
var buf = make([]byte, 3)
n, readErr := reader.ReadAt(buf, 0)
if n > 0 {
t.Log(string(buf[:n]))
}
if readErr != nil && readErr != io.EOF {
t.Fatal(readErr)
}
}
{
var buf = make([]byte, 3)
n, readErr := reader.ReadAt(buf, 3)
if n > 0 {
t.Log(string(buf[:n]))
}
if readErr != nil && readErr != io.EOF {
t.Fatal(readErr)
}
}
{
var buf = make([]byte, 11)
n, readErr := reader.ReadAt(buf, 3)
if n > 0 {
t.Log(string(buf[:n]))
}
if readErr != nil && readErr != io.EOF {
t.Fatal(readErr)
}
}
{
var buf = make([]byte, 3)
n, readErr := reader.ReadAt(buf, 11)
if n > 0 {
t.Log(string(buf[:n]))
}
if readErr != nil && readErr != io.EOF {
t.Fatal(readErr)
}
}
{
var buf = make([]byte, 3)
n, readErr := reader.ReadAt(buf, 1000)
if n > 0 {
t.Log(string(buf[:n]))
} else {
t.Log("EOF")
}
if readErr != nil && readErr != io.EOF {
t.Fatal(readErr)
}
}
}
func TestFileReader_Pool(t *testing.T) {
bFile, openErr := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if openErr != nil {
if os.IsNotExist(openErr) {
t.Log(openErr)
return
}
t.Fatal(openErr)
}
for i := 0; i < 10; i++ {
reader, err := bFile.OpenFileReader(bfs.Hash("123456"), false)
if err != nil {
if os.IsNotExist(err) {
continue
}
t.Fatal(err)
}
go func() {
err = reader.Close()
if err != nil {
t.Log(err)
}
}()
}
time.Sleep(100 * time.Millisecond)
t.Log(len(bFile.TestReaderPool()))
}

View File

@@ -0,0 +1,112 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs
import "errors"
// FileWriter file writer
// not thread-safe
type FileWriter struct {
bFile *BlocksFile
hasMeta bool
hash string
bodySize int64
originOffset int64
realHeaderSize int64
realBodySize int64
isPartial bool
}
func NewFileWriter(bFile *BlocksFile, hash string, bodySize int64, isPartial bool) (*FileWriter, error) {
if isPartial && bodySize <= 0 {
return nil, errors.New("invalid body size for partial content")
}
return &FileWriter{
bFile: bFile,
hash: hash,
bodySize: bodySize,
isPartial: isPartial,
}, nil
}
func (this *FileWriter) WriteMeta(status int, expiresAt int64, expectedFileSize int64) error {
this.hasMeta = true
return this.bFile.mFile.WriteMeta(this.hash, status, expiresAt, expectedFileSize)
}
func (this *FileWriter) WriteHeader(b []byte) (n int, err error) {
if !this.isPartial && !this.hasMeta {
err = errors.New("no meta found")
return
}
n, err = this.bFile.Write(this.hash, BlockTypeHeader, b, -1)
this.realHeaderSize += int64(n)
return
}
func (this *FileWriter) WriteBody(b []byte) (n int, err error) {
if !this.isPartial && !this.hasMeta {
err = errors.New("no meta found")
return
}
n, err = this.bFile.Write(this.hash, BlockTypeBody, b, this.originOffset)
this.originOffset += int64(n)
this.realBodySize += int64(n)
return
}
func (this *FileWriter) WriteBodyAt(b []byte, offset int64) (n int, err error) {
if !this.hasMeta {
err = errors.New("no meta found")
return
}
if !this.isPartial {
err = errors.New("can not write body at specified offset: it is not a partial file")
return
}
// still 'Write()' NOT 'WriteAt()'
this.originOffset = offset
n, err = this.bFile.Write(this.hash, BlockTypeBody, b, offset)
this.originOffset += int64(n)
return
}
func (this *FileWriter) Close() error {
defer func() {
this.bFile.removeWritingFile(this.hash)
}()
if !this.isPartial && !this.hasMeta {
return errors.New("no meta found")
}
if this.isPartial {
if this.originOffset > this.bodySize {
return errors.New("unexpected body size")
}
this.realBodySize = this.bodySize
} else {
if this.bodySize > 0 && this.bodySize != this.realBodySize {
return errors.New("unexpected body size")
}
}
err := this.bFile.mFile.WriteClose(this.hash, this.realHeaderSize, this.realBodySize)
if err != nil {
return err
}
return this.bFile.Sync()
}
func (this *FileWriter) Discard() error {
// TODO 需要测试
return this.bFile.mFile.RemoveFile(this.hash)
}

View File

@@ -0,0 +1,134 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs_test
import (
"bytes"
"github.com/TeaOSLab/EdgeNode/internal/utils/bfs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
"github.com/iwind/TeaGo/logs"
"net/http"
"testing"
"time"
)
func TestNewFileWriter(t *testing.T) {
bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if err != nil {
t.Fatal(err)
}
defer func() {
if !testutils.IsSingleTesting() {
_ = bFile.RemoveAll()
} else {
_ = bFile.Close()
}
}()
writer, err := bFile.OpenFileWriter(bfs.Hash("123456"), -1, false)
if err != nil {
t.Fatal(err)
}
err = writer.WriteMeta(http.StatusOK, fasttime.Now().Unix()+3600, -1)
if err != nil {
t.Fatal(err)
}
_, err = writer.WriteHeader([]byte("Content-Type: text/html; charset=utf-8"))
if err != nil {
t.Fatal(err)
}
for i := 0; i < 3; i++ {
n, writeErr := writer.WriteBody([]byte("Hello,World"))
if writeErr != nil {
t.Fatal(writeErr)
}
t.Log("wrote:", n, "bytes")
}
err = writer.Close()
if err != nil {
t.Fatal(err)
}
}
func TestNewFileWriter_LargeFile(t *testing.T) {
bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if err != nil {
t.Fatal(err)
}
defer func() {
if !testutils.IsSingleTesting() {
_ = bFile.RemoveAll()
} else {
_ = bFile.Close()
}
}()
writer, err := bFile.OpenFileWriter(bfs.Hash("123456@LARGE"), -1, false)
if err != nil {
t.Fatal(err)
}
err = writer.WriteMeta(http.StatusOK, fasttime.Now().Unix()+86400, -1)
if err != nil {
t.Fatal(err)
}
var countBlocks = 1 << 10
if !testutils.IsSingleTesting() {
countBlocks = 2
}
var data = bytes.Repeat([]byte{'A'}, 16<<10)
var before = time.Now()
for i := 0; i < countBlocks; i++ {
_, err = writer.WriteBody(data)
if err != nil {
t.Fatal(err)
}
}
err = writer.Close()
if err != nil {
t.Fatal(err)
}
logs.Println("cost:", time.Since(before).Seconds()*1000, "ms")
}
func TestFileWriter_WriteBodyAt(t *testing.T) {
bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if err != nil {
t.Fatal(err)
}
defer func() {
if !testutils.IsSingleTesting() {
_ = bFile.RemoveAll()
} else {
_ = bFile.Close()
}
}()
writer, err := bFile.OpenFileWriter(bfs.Hash("123456"), 1<<20, true)
if err != nil {
t.Fatal(err)
}
{
n, writeErr := writer.WriteBodyAt([]byte("Hello,World"), 1024)
if writeErr != nil {
t.Fatal(writeErr)
}
t.Log("wrote:", n, "bytes")
}
}

442
internal/utils/bfs/fs.go Normal file
View File

@@ -0,0 +1,442 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs
import (
"errors"
"github.com/TeaOSLab/EdgeNode/internal/goman"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/TeaOSLab/EdgeNode/internal/utils/linkedlist"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"log"
"runtime"
"sync"
"time"
)
func IsEnabled() bool {
return runtime.GOARCH == "amd64" || runtime.GOARCH == "arm64"
}
// FS 文件系统对象
type FS struct {
dir string
opt *FSOptions
bMap map[string]*BlocksFile // name => *BlocksFile
bList *linkedlist.List[string] // [bName]
bItemMap map[string]*linkedlist.Item[string]
closingBMap map[string]zero.Zero // filename => Zero
closingBChan chan *BlocksFile
mu *sync.RWMutex
isClosed bool
syncTicker *time.Ticker
locker *fsutils.Locker
}
// OpenFS 打开文件系统
func OpenFS(dir string, options *FSOptions) (*FS, error) {
if !IsEnabled() {
return nil, errors.New("the fs only works under 64 bit system")
}
if options == nil {
options = DefaultFSOptions
} else {
options.EnsureDefaults()
}
var locker = fsutils.NewLocker(dir + "/fs")
err := locker.Lock()
if err != nil {
return nil, err
}
var fs = &FS{
dir: dir,
bMap: map[string]*BlocksFile{},
bList: linkedlist.NewList[string](),
bItemMap: map[string]*linkedlist.Item[string]{},
closingBMap: map[string]zero.Zero{},
closingBChan: make(chan *BlocksFile, 32),
mu: &sync.RWMutex{},
opt: options,
syncTicker: time.NewTicker(1 * time.Second),
locker: locker,
}
go fs.init()
return fs, nil
}
func (this *FS) init() {
go func() {
// sync in background
for range this.syncTicker.C {
this.syncLoop()
}
}()
go func() {
for {
this.processClosingBFiles()
}
}()
}
// OpenFileWriter 打开文件写入器
func (this *FS) OpenFileWriter(hash string, bodySize int64, isPartial bool) (*FileWriter, error) {
if this.isClosed {
return nil, errors.New("the fs closed")
}
if isPartial && bodySize <= 0 {
return nil, errors.New("invalid body size for partial content")
}
bFile, err := this.openBFileForHashWriting(hash)
if err != nil {
return nil, err
}
return bFile.OpenFileWriter(hash, bodySize, isPartial)
}
// OpenFileReader 打开文件读取器
func (this *FS) OpenFileReader(hash string, isPartial bool) (*FileReader, error) {
if this.isClosed {
return nil, errors.New("the fs closed")
}
bFile, err := this.openBFileForHashReading(hash)
if err != nil {
return nil, err
}
return bFile.OpenFileReader(hash, isPartial)
}
func (this *FS) ExistFile(hash string) (bool, error) {
if this.isClosed {
return false, errors.New("the fs closed")
}
bFile, err := this.openBFileForHashReading(hash)
if err != nil {
return false, err
}
return bFile.ExistFile(hash), nil
}
func (this *FS) RemoveFile(hash string) error {
if this.isClosed {
return errors.New("the fs closed")
}
bFile, err := this.openBFileForHashWriting(hash)
if err != nil {
return err
}
return bFile.RemoveFile(hash)
}
func (this *FS) Close() error {
if this.isClosed {
return nil
}
this.isClosed = true
close(this.closingBChan)
this.syncTicker.Stop()
var lastErr error
this.mu.Lock()
if len(this.bMap) > 0 {
var g = goman.NewTaskGroup()
for _, bFile := range this.bMap {
var bFileCopy = bFile
g.Run(func() {
err := bFileCopy.Close()
if err != nil {
lastErr = err
}
})
}
g.Wait()
}
this.mu.Unlock()
err := this.locker.Release()
if err != nil {
lastErr = err
}
return lastErr
}
func (this *FS) TestBMap() map[string]*BlocksFile {
return this.bMap
}
func (this *FS) TestBList() *linkedlist.List[string] {
return this.bList
}
func (this *FS) bPathForHash(hash string) (path string, bName string, err error) {
err = CheckHashErr(hash)
if err != nil {
return "", "", err
}
return this.dir + "/" + hash[:2] + "/" + hash[2:4] + BFileExt, hash[:4], nil
}
func (this *FS) syncLoop() {
if this.isClosed {
return
}
if this.opt.SyncTimeout <= 0 {
return
}
var maxSyncFiles = this.opt.MaxSyncFiles
if maxSyncFiles <= 0 {
maxSyncFiles = 32
}
var bFiles []*BlocksFile
this.mu.RLock()
for _, bFile := range this.bMap {
if time.Since(bFile.SyncAt()) > this.opt.SyncTimeout {
bFiles = append(bFiles, bFile)
maxSyncFiles--
if maxSyncFiles <= 0 {
break
}
}
}
this.mu.RUnlock()
for _, bFile := range bFiles {
if bFile.IsClosing() {
continue
}
err := bFile.ForceSync()
if err != nil {
// check again
if bFile.IsClosing() {
continue
}
// TODO 可以在options自定义一个logger
log.Println("BFS", "sync failed: "+err.Error())
}
}
}
func (this *FS) openBFileForHashWriting(hash string) (*BlocksFile, error) {
err := CheckHashErr(hash)
if err != nil {
return nil, err
}
bPath, bName, err := this.bPathForHash(hash)
if err != nil {
return nil, err
}
this.mu.RLock()
bFile, ok := this.bMap[bName]
this.mu.RUnlock()
if ok {
// 调整当前BFile所在位置
this.mu.Lock()
if bFile.IsClosing() {
// TODO 需要重新等待打开
}
item, itemOk := this.bItemMap[bName]
if itemOk {
this.bList.Remove(item)
this.bList.Push(item)
}
this.mu.Unlock()
return bFile, nil
}
return this.openBFile(bPath, bName)
}
func (this *FS) openBFileForHashReading(hash string) (*BlocksFile, error) {
err := CheckHashErr(hash)
if err != nil {
return nil, err
}
bPath, bName, err := this.bPathForHash(hash)
if err != nil {
return nil, err
}
err = this.waitBFile(bPath)
if err != nil {
return nil, err
}
this.mu.Lock()
bFile, ok := this.bMap[bName]
if ok {
// 调整当前BFile所在位置
item, itemOk := this.bItemMap[bName]
if itemOk {
this.bList.Remove(item)
this.bList.Push(item)
}
this.mu.Unlock()
return bFile, nil
}
this.mu.Unlock()
return this.openBFile(bPath, bName)
}
func (this *FS) openBFile(bPath string, bName string) (*BlocksFile, error) {
// check closing queue
err := this.waitBFile(bPath)
if err != nil {
return nil, err
}
this.mu.Lock()
defer this.mu.Unlock()
// lookup again
bFile, ok := this.bMap[bName]
if ok {
return bFile, nil
}
// TODO 不要把 OpenBlocksFile 放入到 mu 中?
bFile, err = OpenBlocksFile(bPath, &BlockFileOptions{
BytesPerSync: this.opt.BytesPerSync,
})
if err != nil {
return nil, err
}
// 防止被关闭
bFile.IncrRef()
defer bFile.DecrRef()
this.bMap[bName] = bFile
// 加入到列表中
var item = linkedlist.NewItem(bName)
this.bList.Push(item)
this.bItemMap[bName] = item
// 检查是否超出maxOpenFiles
if this.bList.Len() > this.opt.MaxOpenFiles {
this.shiftOpenFiles()
}
return bFile, nil
}
// 处理关闭中的 BFile 们
func (this *FS) processClosingBFiles() {
if this.isClosed {
return
}
var bFile = <-this.closingBChan
if bFile == nil {
return
}
_ = bFile.Close()
this.mu.Lock()
delete(this.closingBMap, bFile.Filename())
this.mu.Unlock()
}
// 弹出超出BFile数量限制的BFile
func (this *FS) shiftOpenFiles() {
var l = this.bList.Len()
var count = l - this.opt.MaxOpenFiles
if count <= 0 {
return
}
var bNames []string
var searchCount int
this.bList.Range(func(item *linkedlist.Item[string]) (goNext bool) {
searchCount++
var bName = item.Value
var bFile = this.bMap[bName]
if bFile.CanClose() {
bNames = append(bNames, bName)
count--
}
return count > 0 && searchCount < 8 && searchCount < l-8
})
for _, bName := range bNames {
var bFile = this.bMap[bName]
var item = this.bItemMap[bName]
// clean
delete(this.bMap, bName)
delete(this.bItemMap, bName)
this.bList.Remove(item)
// add to closing queue
this.closingBMap[bFile.Filename()] = zero.Zero{}
// MUST run in goroutine
go func(bFile *BlocksFile) {
// 因为 closingBChan 可能已经关闭
defer func() {
recover()
}()
this.closingBChan <- bFile
}(bFile)
}
}
func (this *FS) waitBFile(bPath string) error {
this.mu.RLock()
_, isClosing := this.closingBMap[bPath]
this.mu.RUnlock()
if !isClosing {
return nil
}
var maxWaits = 30_000
for {
this.mu.RLock()
_, isClosing = this.closingBMap[bPath]
this.mu.RUnlock()
if !isClosing {
break
}
time.Sleep(1 * time.Millisecond)
maxWaits--
if maxWaits < 0 {
return errors.New("open blocks file timeout")
}
}
return nil
}

View File

@@ -0,0 +1,47 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs
import (
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
"time"
)
type FSOptions struct {
MaxOpenFiles int
BytesPerSync int64
SyncTimeout time.Duration
MaxSyncFiles int
}
func (this *FSOptions) EnsureDefaults() {
if this.MaxOpenFiles <= 0 {
// 根据内存计算最大打开文件数
var maxOpenFiles = memutils.SystemMemoryGB() * 128
if maxOpenFiles > (8 << 10) {
maxOpenFiles = 8 << 10
}
this.MaxOpenFiles = maxOpenFiles
}
if this.BytesPerSync <= 0 {
if fsutils.DiskIsFast() {
this.BytesPerSync = 1 << 20 // TODO 根据硬盘实际写入速度进行调整
} else {
this.BytesPerSync = 512 << 10
}
}
if this.SyncTimeout <= 0 {
this.SyncTimeout = 1 * time.Second
}
if this.MaxSyncFiles <= 0 {
this.MaxSyncFiles = 32
}
}
var DefaultFSOptions = &FSOptions{
MaxOpenFiles: 1 << 10,
BytesPerSync: 512 << 10,
SyncTimeout: 1 * time.Second,
MaxSyncFiles: 32,
}

View File

@@ -0,0 +1,197 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs_test
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/bfs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
"github.com/TeaOSLab/EdgeNode/internal/utils/linkedlist"
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
"github.com/iwind/TeaGo/Tea"
_ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/types"
"io"
"os"
"testing"
)
func TestFS_OpenFileWriter(t *testing.T) {
fs, openErr := bfs.OpenFS(Tea.Root+"/data/bfs/test", bfs.DefaultFSOptions)
if openErr != nil {
t.Fatal(openErr)
}
defer func() {
_ = fs.Close()
}()
{
writer, err := fs.OpenFileWriter(bfs.Hash("123456"), -1, false)
if err != nil {
t.Fatal(err)
}
err = writer.WriteMeta(200, fasttime.Now().Unix()+3600, -1)
if err != nil {
t.Fatal(err)
}
_, err = writer.WriteBody([]byte("Hello, World"))
if err != nil {
t.Fatal(err)
}
err = writer.Close()
if err != nil {
t.Fatal(err)
}
}
{
writer, err := fs.OpenFileWriter(bfs.Hash("654321"), 100, true)
if err != nil {
t.Fatal(err)
}
_, err = writer.WriteBody([]byte("Hello, World"))
if err != nil {
t.Fatal(err)
}
}
}
func TestFS_OpenFileReader(t *testing.T) {
fs, openErr := bfs.OpenFS(Tea.Root+"/data/bfs/test", bfs.DefaultFSOptions)
if openErr != nil {
t.Fatal(openErr)
}
defer func() {
_ = fs.Close()
}()
reader, err := fs.OpenFileReader(bfs.Hash("123456"), false)
if err != nil {
if bfs.IsNotExist(err) {
t.Log(err)
return
}
t.Fatal(err)
}
data, err := io.ReadAll(reader)
if err != nil {
t.Fatal(err)
}
t.Log(string(data))
logs.PrintAsJSON(reader.FileHeader(), t)
}
func TestFS_ExistFile(t *testing.T) {
fs, openErr := bfs.OpenFS(Tea.Root+"/data/bfs/test", bfs.DefaultFSOptions)
if openErr != nil {
t.Fatal(openErr)
}
defer func() {
_ = fs.Close()
}()
exist, err := fs.ExistFile(bfs.Hash("123456"))
if err != nil {
t.Fatal(err)
}
t.Log("exist:", exist)
}
func TestFS_RemoveFile(t *testing.T) {
fs, openErr := bfs.OpenFS(Tea.Root+"/data/bfs/test", bfs.DefaultFSOptions)
if openErr != nil {
t.Fatal(openErr)
}
defer func() {
_ = fs.Close()
}()
var hash = bfs.Hash("123456")
err := fs.RemoveFile(hash)
if err != nil {
t.Fatal(err)
}
exist, err := fs.ExistFile(bfs.Hash("123456"))
if err != nil {
t.Fatal(err)
}
t.Log("exist:", exist)
}
func TestFS_OpenFileWriter_Close(t *testing.T) {
if !testutils.IsSingleTesting() {
return
}
fs, openErr := bfs.OpenFS(Tea.Root+"/data/bfs/test", &bfs.FSOptions{
MaxOpenFiles: 99,
})
if openErr != nil {
t.Fatal(openErr)
}
defer func() {
_ = fs.Close()
}()
var count = 2
if testutils.IsSingleTesting() {
count = 100
}
for i := 0; i < count; i++ {
//t.Log("open", i)
writer, err := fs.OpenFileWriter(bfs.Hash(types.String(i)), -1, false)
if err != nil {
t.Fatal(err)
}
_ = writer.Close()
}
t.Log(len(fs.TestBMap()), "block files, pid:", os.Getpid())
var p = func() {
var bNames []string
fs.TestBList().Range(func(item *linkedlist.Item[string]) (goNext bool) {
bNames = append(bNames, item.Value)
return true
})
if len(bNames) != len(fs.TestBMap()) {
t.Fatal("len(bNames)!=len(bMap)")
}
if len(bNames) < 10 {
t.Log("["+types.String(len(bNames))+"]", bNames)
} else {
t.Log("["+types.String(len(bNames))+"]", bNames[:10], "...")
}
}
p()
{
writer, err := fs.OpenFileWriter(bfs.Hash(types.String(10)), -1, false)
if err != nil {
t.Fatal(err)
}
_ = writer.Close()
}
p()
// testing closing
for i := 0; i < 3; i++ {
writer, err := fs.OpenFileWriter(bfs.Hash(types.String(0)), -1, false)
if err != nil {
t.Fatal(err)
}
_ = writer.Close()
}
p()
}

View File

@@ -0,0 +1,66 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/percpu"
"github.com/klauspost/compress/gzip"
"io"
"runtime"
)
var SharedDecompressPool = NewGzipReaderPool()
type GzipReaderPool struct {
c chan *gzip.Reader
cList []chan *gzip.Reader
}
func NewGzipReaderPool() *GzipReaderPool {
const poolSize = 16
var countProcs = runtime.GOMAXPROCS(0)
if countProcs <= 0 {
countProcs = runtime.NumCPU()
}
countProcs *= 4
var cList []chan *gzip.Reader
for i := 0; i < countProcs; i++ {
cList = append(cList, make(chan *gzip.Reader, poolSize))
}
return &GzipReaderPool{
c: make(chan *gzip.Reader, poolSize),
cList: cList,
}
}
func (this *GzipReaderPool) Get(rawReader io.Reader) (*gzip.Reader, error) {
select {
case w := <-this.getC():
err := w.Reset(rawReader)
if err != nil {
return nil, err
}
return w, nil
default:
return gzip.NewReader(rawReader)
}
}
func (this *GzipReaderPool) Put(reader *gzip.Reader) {
select {
case this.getC() <- reader:
default:
// 不需要close因为已经在使用的时候调用了
}
}
func (this *GzipReaderPool) getC() chan *gzip.Reader {
var procId = percpu.GetProcId()
if procId < len(this.cList) {
return this.cList[procId]
}
return this.c
}

View File

@@ -0,0 +1,63 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/percpu"
"github.com/klauspost/compress/gzip"
"io"
"runtime"
)
var SharedCompressPool = NewGzipWriterPool()
type GzipWriterPool struct {
c chan *gzip.Writer
cList []chan *gzip.Writer
}
func NewGzipWriterPool() *GzipWriterPool {
const poolSize = 16
var countProcs = runtime.GOMAXPROCS(0)
if countProcs <= 0 {
countProcs = runtime.NumCPU()
}
countProcs *= 4
var cList []chan *gzip.Writer
for i := 0; i < countProcs; i++ {
cList = append(cList, make(chan *gzip.Writer, poolSize))
}
return &GzipWriterPool{
c: make(chan *gzip.Writer, poolSize),
cList: cList,
}
}
func (this *GzipWriterPool) Get(rawWriter io.Writer) (*gzip.Writer, error) {
select {
case w := <-this.getC():
w.Reset(rawWriter)
return w, nil
default:
return gzip.NewWriterLevel(rawWriter, gzip.BestSpeed)
}
}
func (this *GzipWriterPool) Put(writer *gzip.Writer) {
select {
case this.getC() <- writer:
default:
// 不需要close因为已经在使用的时候调用了
}
}
func (this *GzipWriterPool) getC() chan *gzip.Writer {
var procId = percpu.GetProcId()
if procId < len(this.cList) {
return this.cList[procId]
}
return this.c
}

View File

@@ -0,0 +1,36 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs
import (
"fmt"
stringutil "github.com/iwind/TeaGo/utils/string"
)
var HashLen = 32
// CheckHash check hash string format
func CheckHash(hash string) bool {
if len(hash) != HashLen {
return false
}
for _, b := range hash {
if !((b >= '0' && b <= '9') || (b >= 'a' && b <= 'f')) {
return false
}
}
return true
}
func CheckHashErr(hash string) error {
if CheckHash(hash) {
return nil
}
return fmt.Errorf("check hash '%s' failed: %w", hash, ErrInvalidHash)
}
func Hash(s string) string {
return stringutil.Md5(s)
}

View File

@@ -0,0 +1,27 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs_test
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/bfs"
"github.com/iwind/TeaGo/assert"
"math/rand"
"strconv"
"strings"
"testing"
)
func TestCheckHash(t *testing.T) {
var a = assert.NewAssertion(t)
a.IsFalse(bfs.CheckHash("123456"))
a.IsFalse(bfs.CheckHash(strings.Repeat("A", 32)))
a.IsTrue(bfs.CheckHash(strings.Repeat("a", 32)))
a.IsTrue(bfs.CheckHash(bfs.Hash("123456")))
}
func BenchmarkCheckHashErr(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = bfs.CheckHash(bfs.Hash(strconv.Itoa(rand.Int())))
}
}

View File

@@ -0,0 +1,52 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs
import (
"encoding/binary"
"errors"
)
type MetaAction = byte
const (
MetaActionNew MetaAction = '+'
MetaActionRemove MetaAction = '-'
)
func EncodeMetaBlock(action MetaAction, hash string, data []byte) ([]byte, error) {
var hl = len(hash)
if hl != HashLen {
return nil, errors.New("invalid hash length")
}
var l = 1 /** Action **/ + hl /** Hash **/ + len(data)
var b = make([]byte, 4 /** Len **/ +l)
binary.BigEndian.PutUint32(b, uint32(l))
b[4] = action
copy(b[5:], hash)
copy(b[5+hl:], data)
return b, nil
}
func DecodeMetaBlock(blockBytes []byte) (action MetaAction, hash string, data []byte, err error) {
var dataOffset = 4 /** Len **/ + HashLen + 1 /** Action **/
if len(blockBytes) < dataOffset {
err = errors.New("decode failed: invalid block data")
return
}
action = blockBytes[4]
hash = string(blockBytes[5 : 5+HashLen])
if action == MetaActionNew {
var rawData = blockBytes[dataOffset:]
if len(rawData) > 0 {
data = make([]byte, len(rawData))
copy(data, rawData)
}
}
return
}

View File

@@ -0,0 +1,52 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs_test
import (
"bytes"
"github.com/TeaOSLab/EdgeNode/internal/utils/bfs"
"github.com/iwind/TeaGo/assert"
"testing"
)
func TestMetaBlock(t *testing.T) {
var a = assert.NewAssertion(t)
{
var srcHash = bfs.Hash("a")
b, err := bfs.EncodeMetaBlock(bfs.MetaActionNew, srcHash, []byte{1, 2, 3})
if err != nil {
t.Fatal(err)
}
t.Log(b)
{
action, hash, data, decodeErr := bfs.DecodeMetaBlock(b)
if decodeErr != nil {
t.Fatal(err)
}
a.IsTrue(action == bfs.MetaActionNew)
a.IsTrue(hash == srcHash)
a.IsTrue(bytes.Equal(data, []byte{1, 2, 3}))
}
}
{
var srcHash = bfs.Hash("bcd")
b, err := bfs.EncodeMetaBlock(bfs.MetaActionRemove, srcHash, []byte{1, 2, 3})
if err != nil {
t.Fatal(err)
}
t.Log(b)
{
action, hash, data, decodeErr := bfs.DecodeMetaBlock(b)
if decodeErr != nil {
t.Fatal(err)
}
a.IsTrue(action == bfs.MetaActionRemove)
a.IsTrue(hash == srcHash)
a.IsTrue(len(data) == 0)
}
}
}

View File

@@ -0,0 +1,380 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs
import (
"bytes"
"encoding/binary"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"io"
"os"
"sync"
)
const MFileExt = ".m"
const Version1 = 1
type MetaFile struct {
fp *os.File
filename string
headerMap map[string]*LazyFileHeader // hash => *LazyFileHeader
mu *sync.RWMutex // TODO 考虑单独一个不要和bFile共享
isModified bool
modifiedHashMap map[string]zero.Zero // hash => Zero
}
func OpenMetaFile(filename string, mu *sync.RWMutex) (*MetaFile, error) {
fp, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return nil, err
}
var mFile = &MetaFile{
filename: filename,
fp: fp,
headerMap: map[string]*LazyFileHeader{},
mu: mu,
modifiedHashMap: map[string]zero.Zero{},
}
// 从文件中加载已有的文件头信息
err = mFile.load()
if err != nil {
return nil, err
}
return mFile, nil
}
func (this *MetaFile) load() error {
AckReadThread()
_, err := this.fp.Seek(0, io.SeekStart)
ReleaseReadThread()
if err != nil {
return err
}
// TODO 检查文件是否完整
var buf = make([]byte, 4<<10)
var blockBytes []byte
for {
AckReadThread()
n, readErr := this.fp.Read(buf)
ReleaseReadThread()
if n > 0 {
blockBytes = append(blockBytes, buf[:n]...)
for len(blockBytes) > 4 {
var l = int(binary.BigEndian.Uint32(blockBytes[:4])) + 4 /* Len **/
if len(blockBytes) < l {
break
}
action, hash, data, decodeErr := DecodeMetaBlock(blockBytes[:l])
if decodeErr != nil {
return decodeErr
}
switch action {
case MetaActionNew:
this.headerMap[hash] = NewLazyFileHeaderFromData(data)
case MetaActionRemove:
delete(this.headerMap, hash)
}
blockBytes = blockBytes[l:]
}
}
if readErr != nil {
if readErr == io.EOF {
break
}
return readErr
}
}
return nil
}
func (this *MetaFile) WriteMeta(hash string, status int, expiresAt int64, expectedFileSize int64) error {
this.mu.Lock()
defer this.mu.Unlock()
this.headerMap[hash] = NewLazyFileHeader(&FileHeader{
Version: Version1,
ExpiresAt: expiresAt,
Status: status,
ExpiredBodySize: expectedFileSize,
IsWriting: true,
})
this.modifiedHashMap[hash] = zero.Zero{}
return nil
}
func (this *MetaFile) WriteHeaderBlockUnsafe(hash string, bOffsetFrom int64, bOffsetTo int64) error {
lazyHeader, ok := this.headerMap[hash]
if !ok {
return nil
}
header, err := lazyHeader.FileHeaderUnsafe()
if err != nil {
return err
}
// TODO 合并相邻block
header.HeaderBlocks = append(header.HeaderBlocks, BlockInfo{
BFileOffsetFrom: bOffsetFrom,
BFileOffsetTo: bOffsetTo,
})
this.modifiedHashMap[hash] = zero.Zero{}
return nil
}
func (this *MetaFile) WriteBodyBlockUnsafe(hash string, bOffsetFrom int64, bOffsetTo int64, originOffsetFrom int64, originOffsetTo int64) error {
lazyHeader, ok := this.headerMap[hash]
if !ok {
return nil
}
header, err := lazyHeader.FileHeaderUnsafe()
if err != nil {
return err
}
// TODO 合并相邻block
header.BodyBlocks = append(header.BodyBlocks, BlockInfo{
OriginOffsetFrom: originOffsetFrom,
OriginOffsetTo: originOffsetTo,
BFileOffsetFrom: bOffsetFrom,
BFileOffsetTo: bOffsetTo,
})
this.modifiedHashMap[hash] = zero.Zero{}
return nil
}
func (this *MetaFile) WriteClose(hash string, headerSize int64, bodySize int64) error {
// TODO 考虑单个hash多次重复调用的情况
this.mu.Lock()
lazyHeader, ok := this.headerMap[hash]
if !ok {
this.mu.Unlock()
return nil
}
header, err := lazyHeader.FileHeaderUnsafe()
if err != nil {
return err
}
this.mu.Unlock()
// TODO 检查bodySize和expectedBodySize是否一致如果不一致则从headerMap中删除
header.ModifiedAt = fasttime.Now().Unix()
header.HeaderSize = headerSize
header.BodySize = bodySize
header.Compact()
blockBytes, err := header.Encode(hash)
if err != nil {
return err
}
this.mu.Lock()
defer this.mu.Unlock()
AckReadThread()
_, err = this.fp.Seek(0, io.SeekEnd)
ReleaseReadThread()
if err != nil {
return err
}
AckWriteThread()
_, err = this.fp.Write(blockBytes)
ReleaseWriteThread()
this.isModified = true
return err
}
func (this *MetaFile) RemoveFile(hash string) error {
this.mu.Lock()
defer this.mu.Unlock()
_, ok := this.headerMap[hash]
if ok {
delete(this.headerMap, hash)
}
if ok {
blockBytes, err := EncodeMetaBlock(MetaActionRemove, hash, nil)
if err != nil {
return err
}
AckWriteThread()
_, err = this.fp.Write(blockBytes)
ReleaseWriteThread()
if err != nil {
return err
}
this.isModified = true
}
return nil
}
func (this *MetaFile) FileHeader(hash string) (header *FileHeader, ok bool) {
this.mu.RLock()
defer this.mu.RUnlock()
lazyHeader, ok := this.headerMap[hash]
if ok {
var err error
header, err = lazyHeader.FileHeaderUnsafe()
if err != nil {
ok = false
}
}
return
}
func (this *MetaFile) FileHeaderUnsafe(hash string) (header *FileHeader, ok bool) {
lazyHeader, ok := this.headerMap[hash]
if ok {
var err error
header, err = lazyHeader.FileHeaderUnsafe()
if err != nil {
ok = false
}
}
return
}
func (this *MetaFile) CloneFileHeader(hash string) (header *FileHeader, ok bool) {
this.mu.RLock()
defer this.mu.RUnlock()
lazyHeader, ok := this.headerMap[hash]
if !ok {
return
}
var err error
header, err = lazyHeader.FileHeaderUnsafe()
if err != nil {
ok = false
return
}
header = header.Clone()
return
}
func (this *MetaFile) FileHeaders() map[string]*LazyFileHeader {
this.mu.RLock()
defer this.mu.RUnlock()
return this.headerMap
}
func (this *MetaFile) ExistFile(hash string) bool {
this.mu.RLock()
defer this.mu.RUnlock()
_, ok := this.headerMap[hash]
return ok
}
// Compact the meta file
// TODO 考虑自动Compact的时机脏数据比例
func (this *MetaFile) Compact() error {
this.mu.Lock()
defer this.mu.Unlock()
var buf = bytes.NewBuffer(nil)
for hash, lazyHeader := range this.headerMap {
header, err := lazyHeader.FileHeaderUnsafe()
if err != nil {
return err
}
blockBytes, err := header.Encode(hash)
if err != nil {
return err
}
buf.Write(blockBytes)
}
AckWriteThread()
err := this.fp.Truncate(int64(buf.Len()))
ReleaseWriteThread()
if err != nil {
return err
}
AckReadThread()
_, err = this.fp.Seek(0, io.SeekStart)
ReleaseReadThread()
if err != nil {
return err
}
AckWriteThread()
_, err = this.fp.Write(buf.Bytes())
ReleaseWriteThread()
this.isModified = true
return err
}
func (this *MetaFile) SyncUnsafe() error {
if !this.isModified {
return nil
}
AckWriteThread()
err := this.fp.Sync()
ReleaseWriteThread()
if err != nil {
return err
}
for hash := range this.modifiedHashMap {
lazyHeader, ok := this.headerMap[hash]
if ok {
header, decodeErr := lazyHeader.FileHeaderUnsafe()
if decodeErr != nil {
return decodeErr
}
header.IsWriting = false
}
}
this.isModified = false
this.modifiedHashMap = map[string]zero.Zero{}
return nil
}
// Close 关闭当前文件
func (this *MetaFile) Close() error {
return this.fp.Close()
}
// RemoveAll 删除所有数据
func (this *MetaFile) RemoveAll() error {
_ = this.fp.Close()
return os.Remove(this.fp.Name())
}

View File

@@ -0,0 +1,196 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs_test
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/bfs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
"github.com/iwind/TeaGo/logs"
"sync"
"testing"
"time"
)
func TestNewMetaFile(t *testing.T) {
mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{})
if err != nil {
t.Fatal(err)
}
defer func() {
_ = mFile.Close()
}()
var header, _ = mFile.FileHeader(bfs.Hash("123456"))
logs.PrintAsJSON(header, t)
//logs.PrintAsJSON(mFile.Headers(), t)
}
func TestNewMetaFile_Large(t *testing.T) {
var count = 2
if testutils.IsSingleTesting() {
count = 100
}
var before = time.Now()
for i := 0; i < count; i++ {
mFile, err := bfs.OpenMetaFile("testdata/test2.m", &sync.RWMutex{})
if err != nil {
if bfs.IsNotExist(err) {
continue
}
t.Fatal(err)
}
_ = mFile.Close()
}
var costMs = time.Since(before).Seconds() * 1000
t.Logf("cost: %.2fms, qps: %.2fms/file", costMs, costMs/float64(count))
}
func TestNewMetaFile_Memory(t *testing.T) {
var count = 2
if testutils.IsSingleTesting() {
count = 100
}
var stat1 = testutils.ReadMemoryStat()
var mFiles []*bfs.MetaFile
for i := 0; i < count; i++ {
mFile, err := bfs.OpenMetaFile("testdata/test2.m", &sync.RWMutex{})
if err != nil {
if bfs.IsNotExist(err) {
continue
}
t.Fatal(err)
}
_ = mFile.Close()
mFiles = append(mFiles, mFile)
}
var stat2 = testutils.ReadMemoryStat()
t.Log((stat2.HeapInuse-stat1.HeapInuse)>>20, "MiB")
}
func TestMetaFile_FileHeaders(t *testing.T) {
mFile, openErr := bfs.OpenMetaFile("testdata/test2.m", &sync.RWMutex{})
if openErr != nil {
if bfs.IsNotExist(openErr) {
return
}
t.Fatal(openErr)
}
_ = mFile.Close()
for hash, lazyHeader := range mFile.FileHeaders() {
header, err := lazyHeader.FileHeaderUnsafe()
if err != nil {
t.Fatal(err)
}
t.Log(hash, header.ModifiedAt, header.BodySize)
}
}
func TestMetaFile_WriteMeta(t *testing.T) {
mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{})
if err != nil {
t.Fatal(err)
}
defer func() {
_ = mFile.Close()
}()
var hash = bfs.Hash("123456")
err = mFile.WriteMeta(hash, 200, fasttime.Now().Unix()+3600, -1)
if err != nil {
t.Fatal(err)
}
err = mFile.WriteHeaderBlockUnsafe(hash, 123, 223)
if err != nil {
t.Fatal(err)
}
err = mFile.WriteBodyBlockUnsafe(hash, 223, 323, 0, 100)
if err != nil {
t.Fatal(err)
}
err = mFile.WriteBodyBlockUnsafe(hash, 323, 423, 100, 200)
if err != nil {
t.Fatal(err)
}
err = mFile.WriteClose(hash, 100, 200)
if err != nil {
t.Fatal(err)
}
//logs.PrintAsJSON(mFile.Header(hash), t)
}
func TestMetaFile_Write(t *testing.T) {
mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{})
if err != nil {
t.Fatal(err)
}
defer func() {
_ = mFile.Close()
}()
var hash = bfs.Hash("123456")
err = mFile.WriteBodyBlockUnsafe(hash, 0, 100, 0, 100)
if err != nil {
t.Fatal(err)
}
err = mFile.WriteClose(hash, 0, 100)
if err != nil {
t.Fatal(err)
}
}
func TestMetaFile_RemoveFile(t *testing.T) {
mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{})
if err != nil {
t.Fatal(err)
}
defer func() {
_ = mFile.Close()
}()
err = mFile.RemoveFile(bfs.Hash("123456"))
if err != nil {
t.Fatal(err)
}
}
func TestMetaFile_Compact(t *testing.T) {
mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{})
if err != nil {
t.Fatal(err)
}
defer func() {
_ = mFile.Close()
}()
err = mFile.Compact()
if err != nil {
t.Fatal(err)
}
}
func TestMetaFile_RemoveAll(t *testing.T) {
mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{})
if err != nil {
t.Fatal(err)
}
err = mFile.RemoveAll()
if err != nil {
t.Fatal(err)
}
}

View File

@@ -0,0 +1,25 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs
import "github.com/TeaOSLab/EdgeNode/internal/zero"
// TODO 线程数可以根据硬盘数量动态调整?
var readThreadsLimiter = make(chan zero.Zero, 8)
var writeThreadsLimiter = make(chan zero.Zero, 8)
func AckReadThread() {
readThreadsLimiter <- zero.Zero{}
}
func ReleaseReadThread() {
<-readThreadsLimiter
}
func AckWriteThread() {
writeThreadsLimiter <- zero.Zero{}
}
func ReleaseWriteThread() {
<-writeThreadsLimiter
}

View File

@@ -39,9 +39,9 @@ func (this *Stmt) ExecContext(ctx context.Context, args ...any) (result sql.Resu
if this.enableStat {
defer SharedQueryStatManager.AddQuery(this.query).End()
}
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
result, err = this.rawStmt.ExecContext(ctx, args...)
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
return
}
@@ -57,9 +57,9 @@ func (this *Stmt) Exec(args ...any) (result sql.Result, err error) {
defer SharedQueryStatManager.AddQuery(this.query).End()
}
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
result, err = this.rawStmt.Exec(args...)
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
return
}

View File

@@ -91,7 +91,6 @@ func CheckDiskIsFast() (speedMB float64, isFast bool, err error) {
} else {
DiskSpeed = SpeedExtremelySlow
}
calculateDiskMaxWrites()
DiskSpeedMB = speedMB

94
internal/utils/fs/file.go Normal file
View File

@@ -0,0 +1,94 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package fsutils
import "os"
const FlagRead = 0x1
const FlagWrite = 0x2
type File struct {
rawFile *os.File
readonly bool
}
func NewFile(rawFile *os.File, flags int) *File {
return &File{
rawFile: rawFile,
readonly: flags&FlagRead == FlagRead,
}
}
func (this *File) Name() string {
return this.rawFile.Name()
}
func (this *File) Fd() uintptr {
return this.rawFile.Fd()
}
func (this *File) Raw() *os.File {
return this.rawFile
}
func (this *File) Stat() (os.FileInfo, error) {
return this.rawFile.Stat()
}
func (this *File) Seek(offset int64, whence int) (ret int64, err error) {
ret, err = this.rawFile.Seek(offset, whence)
return
}
func (this *File) Read(b []byte) (n int, err error) {
ReaderLimiter.Ack()
n, err = this.rawFile.Read(b)
ReaderLimiter.Release()
return
}
func (this *File) ReadAt(b []byte, off int64) (n int, err error) {
ReaderLimiter.Ack()
n, err = this.rawFile.ReadAt(b, off)
ReaderLimiter.Release()
return
}
func (this *File) Write(b []byte) (n int, err error) {
WriterLimiter.Ack()
n, err = this.rawFile.Write(b)
WriterLimiter.Release()
return
}
func (this *File) WriteAt(b []byte, off int64) (n int, err error) {
WriterLimiter.Ack()
n, err = this.rawFile.WriteAt(b, off)
WriterLimiter.Release()
return
}
func (this *File) Sync() (err error) {
WriterLimiter.Ack()
err = this.rawFile.Sync()
WriterLimiter.Release()
return
}
func (this *File) Truncate(size int64) (err error) {
WriterLimiter.Ack()
err = this.rawFile.Truncate(size)
WriterLimiter.Release()
return
}
func (this *File) Close() (err error) {
if !this.readonly {
WriterLimiter.Ack()
}
err = this.rawFile.Close()
if !this.readonly {
WriterLimiter.Release()
}
return
}

View File

@@ -0,0 +1,16 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package fsutils_test
import (
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/iwind/TeaGo/assert"
"testing"
)
func TestFileFlags(t *testing.T) {
var a = assert.NewAssertion(t)
a.IsTrue(fsutils.FlagRead&fsutils.FlagRead == fsutils.FlagRead)
a.IsTrue(fsutils.FlagWrite&fsutils.FlagWrite != fsutils.FlagRead)
a.IsTrue((fsutils.FlagWrite|fsutils.FlagRead)&fsutils.FlagRead == fsutils.FlagRead)
}

View File

@@ -0,0 +1,100 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package fsutils
import (
"runtime"
"time"
)
var maxThreads = runtime.NumCPU()
var WriterLimiter = NewLimiter(max(maxThreads, 8))
var ReaderLimiter = NewLimiter(max(maxThreads, 8))
type Limiter struct {
threads chan struct{}
count int
countDefault int
timers chan *time.Timer
}
func NewLimiter(threads int) *Limiter {
if threads < 4 {
threads = 4
}
if threads > 64 {
threads = 64
}
var threadsChan = make(chan struct{}, threads)
for i := 0; i < threads; i++ {
threadsChan <- struct{}{}
}
return &Limiter{
countDefault: threads,
count: threads,
threads: threadsChan,
timers: make(chan *time.Timer, 4096),
}
}
func (this *Limiter) SetThreads(newThreads int) {
if newThreads <= 0 {
newThreads = this.countDefault
}
if newThreads != this.count {
var threadsChan = make(chan struct{}, newThreads)
for i := 0; i < newThreads; i++ {
threadsChan <- struct{}{}
}
this.threads = threadsChan
this.count = newThreads
}
}
func (this *Limiter) Ack() {
<-this.threads
}
func (this *Limiter) TryAck() bool {
const timeoutDuration = 500 * time.Millisecond
var timeout *time.Timer
select {
case timeout = <-this.timers:
timeout.Reset(timeoutDuration)
default:
timeout = time.NewTimer(timeoutDuration)
}
defer func() {
timeout.Stop()
select {
case this.timers <- timeout:
default:
}
}()
select {
case <-this.threads:
return true
case <-timeout.C:
return false
}
}
func (this *Limiter) Release() {
select {
case this.threads <- struct{}{}:
default:
// 由于容量可能有变化这里忽略多余的thread
}
}
func (this *Limiter) FreeThreads() int {
return len(this.threads)
}

View File

@@ -0,0 +1,123 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package fsutils_test
import (
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
"github.com/iwind/TeaGo/assert"
"math/rand"
"sync"
"testing"
"time"
)
func TestLimiter_SetThreads(t *testing.T) {
var limiter = fsutils.NewLimiter(4)
var concurrent = 1024
var wg = sync.WaitGroup{}
wg.Add(concurrent)
for i := 0; i < concurrent; i++ {
go func() {
defer wg.Done()
limiter.SetThreads(rand.Int() % 128)
limiter.TryAck()
}()
}
wg.Wait()
}
func TestLimiter_Ack(t *testing.T) {
var a = assert.NewAssertion(t)
{
var limiter = fsutils.NewLimiter(4)
a.IsTrue(limiter.FreeThreads() == 4)
limiter.Ack()
a.IsTrue(limiter.FreeThreads() == 3)
limiter.Ack()
a.IsTrue(limiter.FreeThreads() == 2)
limiter.Release()
a.IsTrue(limiter.FreeThreads() == 3)
limiter.Release()
a.IsTrue(limiter.FreeThreads() == 4)
}
}
func TestLimiter_TryAck(t *testing.T) {
var a = assert.NewAssertion(t)
{
var limiter = fsutils.NewLimiter(4)
var count = limiter.FreeThreads()
a.IsTrue(count == 4)
for i := 0; i < count; i++ {
limiter.Ack()
}
a.IsTrue(limiter.FreeThreads() == 0)
a.IsFalse(limiter.TryAck())
a.IsTrue(limiter.FreeThreads() == 0)
}
{
var limiter = fsutils.NewLimiter(4)
var count = limiter.FreeThreads()
a.IsTrue(count == 4)
for i := 0; i < count-1; i++ {
limiter.Ack()
}
a.IsTrue(limiter.FreeThreads() == 1)
a.IsTrue(limiter.TryAck())
a.IsTrue(limiter.FreeThreads() == 0)
}
}
func TestLimiter_TryAck2(t *testing.T) {
if !testutils.IsSingleTesting() {
return
}
var a = assert.NewAssertion(t)
{
var limiter = fsutils.NewLimiter(4)
var count = limiter.FreeThreads()
a.IsTrue(count == 4)
for i := 0; i < count-1; i++ {
limiter.Ack()
}
a.IsTrue(limiter.FreeThreads() == 1)
a.IsTrue(limiter.TryAck())
a.IsFalse(limiter.TryAck())
a.IsFalse(limiter.TryAck())
limiter.Release()
a.IsTrue(limiter.TryAck())
}
}
func TestLimiter_Timout(t *testing.T) {
var timeout = time.NewTimer(100 * time.Millisecond)
var r = make(chan bool, 1)
r <- true
var before = time.Now()
select {
case <-r:
case <-timeout.C:
}
t.Log(time.Since(before).Seconds()*1000, "ms")
timeout.Stop()
before = time.Now()
timeout.Reset(100 * time.Millisecond)
<-timeout.C
t.Log(time.Since(before).Seconds()*1000, "ms")
}

56
internal/utils/fs/os.go Normal file
View File

@@ -0,0 +1,56 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package fsutils
import (
"os"
)
func Remove(filename string) (err error) {
WriterLimiter.Ack()
err = os.Remove(filename)
WriterLimiter.Release()
return
}
func Rename(oldPath string, newPath string) (err error) {
WriterLimiter.Ack()
err = os.Rename(oldPath, newPath)
WriterLimiter.Release()
return
}
func ReadFile(filename string) (data []byte, err error) {
ReaderLimiter.Ack()
data, err = os.ReadFile(filename)
ReaderLimiter.Release()
return
}
func WriteFile(filename string, data []byte, perm os.FileMode) (err error) {
WriterLimiter.Ack()
err = os.WriteFile(filename, data, perm)
WriterLimiter.Release()
return
}
func OpenFile(name string, flag int, perm os.FileMode) (f *os.File, err error) {
if flag&os.O_RDONLY == os.O_RDONLY {
ReaderLimiter.Ack()
}
f, err = os.OpenFile(name, flag, perm)
if flag&os.O_RDONLY == os.O_RDONLY {
ReaderLimiter.Release()
}
return
}
func Open(name string) (f *os.File, err error) {
ReaderLimiter.Ack()
f, err = os.Open(name)
ReaderLimiter.Release()
return
}

View File

@@ -0,0 +1,17 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package fsutils_test
import (
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"os"
"testing"
)
func TestOpenFile(t *testing.T) {
f, err := fsutils.OpenFile("./os_test.go", os.O_RDONLY, 0444)
if err != nil {
t.Fatal(err)
}
_ = f.Close()
}

View File

@@ -9,7 +9,6 @@ import (
"github.com/iwind/TeaGo/Tea"
"github.com/shirou/gopsutil/v3/load"
"os"
"sync/atomic"
"time"
)
@@ -37,9 +36,8 @@ const (
)
var (
DiskSpeed = SpeedLow
DiskMaxWrites int32 = 32
DiskSpeedMB float64
DiskSpeed = SpeedLow
DiskSpeedMB float64
)
var IsInHighLoad = false
@@ -65,7 +63,6 @@ func init() {
if err == nil && cache.SpeedMB > 0 {
DiskSpeedMB = cache.SpeedMB
DiskSpeed = cache.Speed
calculateDiskMaxWrites()
}
}
@@ -109,39 +106,6 @@ func DiskIsExtremelyFast() bool {
return DiskSpeed == SpeedExtremelyFast
}
var countWrites int32 = 0
func WriteReady() bool {
if IsInExtremelyHighLoad {
return false
}
return atomic.LoadInt32(&countWrites) < DiskMaxWrites
}
func WriteBegin() {
atomic.AddInt32(&countWrites, 1)
}
func WriteEnd() {
atomic.AddInt32(&countWrites, -1)
}
func calculateDiskMaxWrites() {
switch DiskSpeed {
case SpeedExtremelyFast:
DiskMaxWrites = 32
case SpeedFast:
DiskMaxWrites = 16
case SpeedLow:
DiskMaxWrites = 8
case SpeedExtremelySlow:
DiskMaxWrites = 4
default:
DiskMaxWrites = 4
}
}
// WaitLoad wait system load to downgrade
func WaitLoad(maxLoad float64, maxLoops int, delay time.Duration) {
for i := 0; i < maxLoops; i++ {

View File

@@ -4,33 +4,10 @@ package fsutils_test
import (
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/iwind/TeaGo/assert"
"testing"
"time"
)
func TestWrites(t *testing.T) {
var a = assert.NewAssertion(t)
for i := 0; i < int(fsutils.DiskMaxWrites); i++ {
fsutils.WriteBegin()
}
a.IsFalse(fsutils.WriteReady())
fsutils.WriteEnd()
a.IsTrue(fsutils.WriteReady())
}
func TestWaitLoad(t *testing.T) {
fsutils.WaitLoad(100, 5, 1*time.Minute)
}
func BenchmarkWrites(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
fsutils.WriteReady()
fsutils.WriteBegin()
fsutils.WriteEnd()
}
})
}

View File

@@ -2,6 +2,11 @@
package kvstore
import (
"fmt"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
)
type Logger struct {
}
@@ -10,8 +15,13 @@ func NewLogger() *Logger {
}
func (this *Logger) Infof(format string, args ...any) {
// stub
}
func (this *Logger) Errorf(format string, args ...any) {
remotelogs.Error("KV", fmt.Sprintf(format, args...))
}
func (this *Logger) Fatalf(format string, args ...any) {
remotelogs.Error("KV", fmt.Sprintf(format, args...))
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/iwind/TeaGo/Tea"
"io"
"os"
"path/filepath"
"strings"
"sync"
)
@@ -52,6 +53,31 @@ func NewStore(storeName string) (*Store, error) {
}, nil
}
// NewStoreWithPath create store with path
func NewStoreWithPath(path string) (*Store, error) {
if !strings.HasSuffix(path, ".store") {
return nil, errors.New("store path must contains a '.store' suffix")
}
_, err := os.Stat(path)
if err != nil && os.IsNotExist(err) {
_ = os.MkdirAll(path, 0777)
}
var storeName = filepath.Base(path)
storeName = strings.TrimSuffix(storeName, ".store")
if !IsValidName(storeName) {
return nil, errors.New("invalid store name '" + storeName + "'")
}
return &Store{
name: storeName,
path: path,
locker: fsutils.NewLocker(path + "/.fs"),
}, nil
}
func OpenStore(storeName string) (*Store, error) {
store, err := NewStore(storeName)
if err != nil {
@@ -117,6 +143,10 @@ func DefaultStore() (*Store, error) {
return defaultSore, resultErr
}
func (this *Store) Path() string {
return this.path
}
func (this *Store) Open() error {
err := this.locker.Lock()
if err != nil {
@@ -127,7 +157,11 @@ func (this *Store) Open() error {
Logger: NewLogger(),
}
var memoryMB = memutils.SystemMemoryGB() * 1
if fsutils.DiskIsFast() {
opt.BytesPerSync = 1 << 20
}
var memoryMB = memutils.SystemMemoryGB() * 2
if memoryMB > 256 {
memoryMB = 256
}

View File

@@ -196,12 +196,15 @@ func (this *Table[T]) ReadTx(fn func(tx *Tx[T]) error) error {
return NewTableClosedErr(this.name)
}
var tx = NewTx[T](this, true)
tx, err := NewTx[T](this, true)
if err != nil {
return err
}
defer func() {
_ = tx.Close()
}()
err := fn(tx)
err = fn(tx)
if err != nil {
return err
}
@@ -214,12 +217,15 @@ func (this *Table[T]) WriteTx(fn func(tx *Tx[T]) error) error {
return NewTableClosedErr(this.name)
}
var tx = NewTx[T](this, false)
tx, err := NewTx[T](this, false)
if err != nil {
return err
}
defer func() {
_ = tx.Close()
}()
err := fn(tx)
err = fn(tx)
if err != nil {
return err
}
@@ -232,12 +238,15 @@ func (this *Table[T]) WriteTxSync(fn func(tx *Tx[T]) error) error {
return NewTableClosedErr(this.name)
}
var tx = NewTx[T](this, false)
tx, err := NewTx[T](this, false)
if err != nil {
return err
}
defer func() {
_ = tx.Close()
}()
err := fn(tx)
err = fn(tx)
if err != nil {
return err
}

View File

@@ -15,12 +15,22 @@ type Tx[T any] struct {
batch *pebble.Batch
}
func NewTx[T any](table *Table[T], readOnly bool) *Tx[T] {
func NewTx[T any](table *Table[T], readOnly bool) (*Tx[T], error) {
if table.db == nil {
return nil, errors.New("the table has not been added to a db")
}
if table.db.store == nil {
return nil, errors.New("the db has not been added to a store")
}
if table.db.store.rawDB == nil {
return nil, errors.New("the store has not been opened")
}
return &Tx[T]{
table: table,
readOnly: readOnly,
batch: table.db.store.rawDB.NewIndexedBatch(),
}
}, nil
}
func (this *Tx[T]) Set(key string, value T) error {

View File

@@ -10,7 +10,13 @@ func NewBytesValueEncoder[T []byte]() *BytesValueEncoder[T] {
}
func (this *BytesValueEncoder[T]) Encode(value T) ([]byte, error) {
return value, nil
if len(value) == 0 {
return nil, nil
}
var resultValue = make([]byte, len(value))
copy(resultValue, value)
return resultValue, nil
}
func (this *BytesValueEncoder[T]) EncodeField(value T, fieldName string) ([]byte, error) {
@@ -19,6 +25,11 @@ func (this *BytesValueEncoder[T]) EncodeField(value T, fieldName string) ([]byte
}
func (this *BytesValueEncoder[T]) Decode(valueData []byte) (value T, err error) {
value = valueData
if len(valueData) == 0 {
return
}
value = make([]byte, len(valueData))
copy(value, valueData)
return
}

View File

@@ -0,0 +1,36 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package percpu
import (
"runtime"
)
type Chan[T any] struct {
c chan T
count int
cList []chan T
}
func NewChan[T any](size int) *Chan[T] {
var count = max(runtime.NumCPU(), runtime.GOMAXPROCS(0))
var cList []chan T
for i := 0; i < count; i++ {
cList = append(cList, make(chan T, size))
}
return &Chan[T]{
c: make(chan T, size),
count: count,
cList: cList,
}
}
func (this *Chan[T]) C() chan T {
var procId = GetProcId()
if procId < this.count {
return this.cList[procId]
}
return this.c
}

View File

@@ -0,0 +1,23 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package percpu_test
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/percpu"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"testing"
)
func TestChan_C(t *testing.T) {
var c = percpu.NewChan[zero.Zero](10)
c.C() <- zero.Zero{}
t.Log(<-c.C())
select {
case <-c.C():
t.Fatal("should not return from here")
default:
t.Log("ok")
}
}

View File

@@ -0,0 +1,19 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package percpu
import (
_ "unsafe"
)
//go:linkname runtime_procPin runtime.procPin
func runtime_procPin() int
//go:linkname runtime_procUnpin runtime.procUnpin
func runtime_procUnpin() int
func GetProcId() int {
var pid = runtime_procPin()
runtime_procUnpin()
return pid
}