Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e42c8452ff | ||
|
|
fe6e5ba5f9 | ||
|
|
406d5de482 | ||
|
|
f66e672bb4 | ||
|
|
7252d00c8e | ||
|
|
7ec4656aea | ||
|
|
33156fed2a | ||
|
|
e8538d4e34 |
@@ -11,7 +11,7 @@ import (
|
||||
)
|
||||
|
||||
type FileReader struct {
|
||||
fp *os.File
|
||||
fp *fsutils.File
|
||||
|
||||
openFile *OpenFile
|
||||
openFileCache *OpenFileCache
|
||||
@@ -29,7 +29,7 @@ type FileReader struct {
|
||||
isClosed bool
|
||||
}
|
||||
|
||||
func NewFileReader(fp *os.File) *FileReader {
|
||||
func NewFileReader(fp *fsutils.File) *FileReader {
|
||||
return &FileReader{fp: fp}
|
||||
}
|
||||
|
||||
@@ -175,9 +175,7 @@ func (this *FileReader) ReadHeader(buf []byte, callback ReaderFunc) error {
|
||||
var headerSize = this.headerSize
|
||||
|
||||
for {
|
||||
fsutils.ReaderLimiter.Ack()
|
||||
n, err := this.fp.Read(buf)
|
||||
fsutils.ReaderLimiter.Release()
|
||||
if n > 0 {
|
||||
if n < headerSize {
|
||||
goNext, e := callback(n)
|
||||
@@ -239,9 +237,7 @@ func (this *FileReader) ReadBody(buf []byte, callback ReaderFunc) error {
|
||||
}
|
||||
|
||||
for {
|
||||
fsutils.ReaderLimiter.Ack()
|
||||
n, err := this.fp.Read(buf)
|
||||
fsutils.ReaderLimiter.Release()
|
||||
if n > 0 {
|
||||
goNext, e := callback(n)
|
||||
if e != nil {
|
||||
@@ -272,9 +268,7 @@ func (this *FileReader) Read(buf []byte) (n int, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
fsutils.ReaderLimiter.Ack()
|
||||
n, err = this.fp.Read(buf)
|
||||
fsutils.ReaderLimiter.Release()
|
||||
if err != nil && err != io.EOF {
|
||||
_ = this.discard()
|
||||
}
|
||||
@@ -306,17 +300,14 @@ func (this *FileReader) ReadBodyRange(buf []byte, start int64, end int64, callba
|
||||
isOk = true
|
||||
return ErrInvalidRange
|
||||
}
|
||||
fsutils.ReaderLimiter.Ack()
|
||||
_, err := this.fp.Seek(offset, io.SeekStart)
|
||||
fsutils.ReaderLimiter.Release()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
fsutils.ReaderLimiter.Ack()
|
||||
n, err := this.fp.Read(buf)
|
||||
fsutils.ReaderLimiter.Release()
|
||||
var n int
|
||||
n, err = this.fp.Read(buf)
|
||||
if n > 0 {
|
||||
var n2 = int(end-offset) + 1
|
||||
if n2 <= n {
|
||||
@@ -362,7 +353,7 @@ func (this *FileReader) ContainsRange(r rangeutils.Range) (r2 rangeutils.Range,
|
||||
|
||||
// FP 原始的文件句柄
|
||||
func (this *FileReader) FP() *os.File {
|
||||
return this.fp
|
||||
return this.fp.Raw()
|
||||
}
|
||||
|
||||
func (this *FileReader) Close() error {
|
||||
@@ -377,7 +368,7 @@ func (this *FileReader) Close() error {
|
||||
} else {
|
||||
var cacheMeta = make([]byte, len(this.meta))
|
||||
copy(cacheMeta, this.meta)
|
||||
this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp, cacheMeta, this.header, this.LastModified(), this.bodySize))
|
||||
this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp.Raw(), cacheMeta, this.header, this.LastModified(), this.bodySize))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -385,10 +376,8 @@ func (this *FileReader) Close() error {
|
||||
return this.fp.Close()
|
||||
}
|
||||
|
||||
func (this *FileReader) readToBuff(fp *os.File, buf []byte) (ok bool, err error) {
|
||||
fsutils.ReaderLimiter.Ack()
|
||||
func (this *FileReader) readToBuff(fp *fsutils.File, buf []byte) (ok bool, err error) {
|
||||
n, err := fp.Read(buf)
|
||||
fsutils.ReaderLimiter.Release()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
rangeutils "github.com/TeaOSLab/EdgeNode/internal/utils/ranges"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
type PartialFileReader struct {
|
||||
@@ -18,7 +17,7 @@ type PartialFileReader struct {
|
||||
rangePath string
|
||||
}
|
||||
|
||||
func NewPartialFileReader(fp *os.File) *PartialFileReader {
|
||||
func NewPartialFileReader(fp *fsutils.File) *PartialFileReader {
|
||||
return &PartialFileReader{
|
||||
FileReader: NewFileReader(fp),
|
||||
rangePath: PartialRangesFilePath(fp.Name()),
|
||||
|
||||
@@ -380,6 +380,7 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
|
||||
|
||||
// 检查文件记录是否已过期
|
||||
var estimatedSize int64
|
||||
var existInList bool
|
||||
if !useStale {
|
||||
exists, filesize, err := this.list.Exist(hash)
|
||||
if err != nil {
|
||||
@@ -389,15 +390,12 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
estimatedSize = filesize
|
||||
existInList = true
|
||||
}
|
||||
|
||||
// 尝试通过MMAP读取
|
||||
if estimatedSize > 0 {
|
||||
if !fsutils.ReaderLimiter.TryAck() {
|
||||
return nil, ErrServerIsBusy
|
||||
}
|
||||
reader, err := this.tryMMAPReader(isPartial, estimatedSize, path)
|
||||
fsutils.ReaderLimiter.Release()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -416,11 +414,13 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
|
||||
|
||||
var err error
|
||||
if openFile == nil {
|
||||
if !fsutils.ReaderLimiter.TryAck() {
|
||||
return nil, ErrServerIsBusy
|
||||
if existInList {
|
||||
fsutils.ReaderLimiter.Ack()
|
||||
}
|
||||
fp, err = os.OpenFile(path, os.O_RDONLY, 0444)
|
||||
fsutils.ReaderLimiter.Release()
|
||||
if existInList {
|
||||
fsutils.ReaderLimiter.Release()
|
||||
}
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
@@ -439,12 +439,12 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
|
||||
|
||||
var reader Reader
|
||||
if isPartial {
|
||||
var partialFileReader = NewPartialFileReader(fp)
|
||||
var partialFileReader = NewPartialFileReader(fsutils.NewFile(fp, fsutils.FlagRead))
|
||||
partialFileReader.openFile = openFile
|
||||
partialFileReader.openFileCache = openFileCache
|
||||
reader = partialFileReader
|
||||
} else {
|
||||
var fileReader = NewFileReader(fp)
|
||||
var fileReader = NewFileReader(fsutils.NewFile(fp, fsutils.FlagRead))
|
||||
fileReader.openFile = openFile
|
||||
fileReader.openFileCache = openFileCache
|
||||
reader = fileReader
|
||||
@@ -591,13 +591,9 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
|
||||
// 数据库中是否存在
|
||||
existsCacheItem, _, _ := this.list.Exist(hash)
|
||||
if existsCacheItem {
|
||||
if !fsutils.ReaderLimiter.TryAck() {
|
||||
return nil, ErrServerIsBusy
|
||||
}
|
||||
readerFp, err := os.OpenFile(tmpPath, os.O_RDONLY, 0444)
|
||||
fsutils.ReaderLimiter.Release()
|
||||
readerFp, err := fsutils.OpenFile(tmpPath, os.O_RDONLY, 0444)
|
||||
if err == nil {
|
||||
var partialReader = NewPartialFileReader(readerFp)
|
||||
var partialReader = NewPartialFileReader(fsutils.NewFile(readerFp, fsutils.FlagRead))
|
||||
err = partialReader.Init()
|
||||
_ = partialReader.Close()
|
||||
if err == nil && partialReader.bodyOffset > 0 {
|
||||
@@ -628,18 +624,22 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
|
||||
if isNewCreated && existsFile {
|
||||
flags |= os.O_TRUNC
|
||||
}
|
||||
if !fsutils.WriterLimiter.TryAck() {
|
||||
return nil, ErrServerIsBusy
|
||||
if !isFlushing {
|
||||
if !fsutils.WriterLimiter.TryAck() {
|
||||
return nil, ErrServerIsBusy
|
||||
}
|
||||
}
|
||||
fp, err := os.OpenFile(tmpPath, flags, 0666)
|
||||
if !isFlushing {
|
||||
fsutils.WriterLimiter.Release()
|
||||
}
|
||||
writer, err := os.OpenFile(tmpPath, flags, 0666)
|
||||
fsutils.WriterLimiter.Release()
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
_ = os.MkdirAll(dir, 0777)
|
||||
|
||||
// open file again
|
||||
fsutils.WriterLimiter.Ack()
|
||||
writer, err = os.OpenFile(tmpPath, flags, 0666)
|
||||
fp, err = os.OpenFile(tmpPath, flags, 0666)
|
||||
fsutils.WriterLimiter.Release()
|
||||
}
|
||||
if err != nil {
|
||||
@@ -647,6 +647,8 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
|
||||
}
|
||||
}
|
||||
|
||||
var writer = fsutils.NewFile(fp, fsutils.FlagWrite)
|
||||
|
||||
var removeOnFailure = true
|
||||
defer func() {
|
||||
if err != nil {
|
||||
@@ -663,7 +665,9 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
|
||||
}()
|
||||
|
||||
// 尝试锁定,如果锁定失败,则直接返回
|
||||
fsutils.WriterLimiter.Ack()
|
||||
err = syscall.Flock(int(writer.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
|
||||
fsutils.WriterLimiter.Release()
|
||||
if err != nil {
|
||||
removeOnFailure = false
|
||||
return nil, fmt.Errorf("%w (003)", ErrFileIsWriting)
|
||||
@@ -700,9 +704,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
|
||||
metaBodySize = bodySize
|
||||
}
|
||||
|
||||
fsutils.WriterLimiter.Ack()
|
||||
_, err = writer.Write(metaBytes)
|
||||
fsutils.WriterLimiter.Release()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -6,14 +6,13 @@ import (
|
||||
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type FileWriter struct {
|
||||
storage StorageInterface
|
||||
rawWriter *os.File
|
||||
rawWriter *fsutils.File
|
||||
key string
|
||||
|
||||
metaHeaderSize int
|
||||
@@ -26,9 +25,11 @@ type FileWriter struct {
|
||||
maxSize int64
|
||||
endFunc func()
|
||||
once sync.Once
|
||||
|
||||
modifiedBytes int
|
||||
}
|
||||
|
||||
func NewFileWriter(storage StorageInterface, rawWriter *os.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, maxSize int64, endFunc func()) *FileWriter {
|
||||
func NewFileWriter(storage StorageInterface, rawWriter *fsutils.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, maxSize int64, endFunc func()) *FileWriter {
|
||||
return &FileWriter{
|
||||
storage: storage,
|
||||
key: key,
|
||||
@@ -43,9 +44,7 @@ func NewFileWriter(storage StorageInterface, rawWriter *os.File, key string, exp
|
||||
|
||||
// WriteHeader 写入数据
|
||||
func (this *FileWriter) WriteHeader(data []byte) (n int, err error) {
|
||||
fsutils.WriterLimiter.Ack()
|
||||
n, err = this.rawWriter.Write(data)
|
||||
fsutils.WriterLimiter.Release()
|
||||
this.headerSize += int64(n)
|
||||
if err != nil {
|
||||
_ = this.Discard()
|
||||
@@ -79,7 +78,7 @@ func (this *FileWriter) Write(data []byte) (n int, err error) {
|
||||
var l = len(data)
|
||||
if l > (2 << 20) {
|
||||
var offset = 0
|
||||
const bufferSize = 256 << 10
|
||||
const bufferSize = 64 << 10
|
||||
for {
|
||||
var end = offset + bufferSize
|
||||
if end > l {
|
||||
@@ -145,24 +144,18 @@ func (this *FileWriter) Close() error {
|
||||
|
||||
err := this.WriteHeaderLength(types.Int(this.headerSize))
|
||||
if err != nil {
|
||||
fsutils.WriterLimiter.Ack()
|
||||
_ = this.rawWriter.Close()
|
||||
fsutils.WriterLimiter.Release()
|
||||
_ = fsutils.Remove(path)
|
||||
return err
|
||||
}
|
||||
err = this.WriteBodyLength(this.bodySize)
|
||||
if err != nil {
|
||||
fsutils.WriterLimiter.Ack()
|
||||
_ = this.rawWriter.Close()
|
||||
fsutils.WriterLimiter.Release()
|
||||
_ = fsutils.Remove(path)
|
||||
return err
|
||||
}
|
||||
|
||||
fsutils.WriterLimiter.Ack()
|
||||
err = this.rawWriter.Close()
|
||||
fsutils.WriterLimiter.Release()
|
||||
if err != nil {
|
||||
_ = fsutils.Remove(path)
|
||||
} else if strings.HasSuffix(path, FileTmpSuffix) {
|
||||
@@ -181,9 +174,7 @@ func (this *FileWriter) Discard() error {
|
||||
this.endFunc()
|
||||
})
|
||||
|
||||
fsutils.WriterLimiter.Ack()
|
||||
_ = this.rawWriter.Close()
|
||||
fsutils.WriterLimiter.Release()
|
||||
|
||||
err := fsutils.Remove(this.rawWriter.Name())
|
||||
return err
|
||||
@@ -211,9 +202,7 @@ func (this *FileWriter) ItemType() ItemType {
|
||||
}
|
||||
|
||||
func (this *FileWriter) write(data []byte) (n int, err error) {
|
||||
fsutils.WriterLimiter.Ack()
|
||||
n, err = this.rawWriter.Write(data)
|
||||
fsutils.WriterLimiter.Release()
|
||||
this.bodySize += int64(n)
|
||||
|
||||
if this.maxSize > 0 && this.bodySize > this.maxSize {
|
||||
@@ -227,5 +216,6 @@ func (this *FileWriter) write(data []byte) (n int, err error) {
|
||||
if err != nil {
|
||||
_ = this.Discard()
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -7,12 +7,11 @@ import (
|
||||
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type PartialFileWriter struct {
|
||||
rawWriter *os.File
|
||||
rawWriter *fsutils.File
|
||||
key string
|
||||
|
||||
metaHeaderSize int
|
||||
@@ -33,7 +32,7 @@ type PartialFileWriter struct {
|
||||
rangePath string
|
||||
}
|
||||
|
||||
func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, isNew bool, isPartial bool, bodyOffset int64, ranges *PartialRanges, endFunc func()) *PartialFileWriter {
|
||||
func NewPartialFileWriter(rawWriter *fsutils.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, isNew bool, isPartial bool, bodyOffset int64, ranges *PartialRanges, endFunc func()) *PartialFileWriter {
|
||||
return &PartialFileWriter{
|
||||
key: key,
|
||||
rawWriter: rawWriter,
|
||||
@@ -54,9 +53,7 @@ func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) {
|
||||
if !this.isNew {
|
||||
return
|
||||
}
|
||||
fsutils.WriterLimiter.Ack()
|
||||
n, err = this.rawWriter.Write(data)
|
||||
fsutils.WriterLimiter.Release()
|
||||
this.headerSize += int64(n)
|
||||
if err != nil {
|
||||
_ = this.Discard()
|
||||
@@ -65,9 +62,7 @@ func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
func (this *PartialFileWriter) AppendHeader(data []byte) error {
|
||||
fsutils.WriterLimiter.Ack()
|
||||
_, err := this.rawWriter.Write(data)
|
||||
fsutils.WriterLimiter.Release()
|
||||
if err != nil {
|
||||
_ = this.Discard()
|
||||
} else {
|
||||
@@ -94,9 +89,7 @@ func (this *PartialFileWriter) WriteHeaderLength(headerLength int) error {
|
||||
_ = this.Discard()
|
||||
return err
|
||||
}
|
||||
fsutils.WriterLimiter.Ack()
|
||||
_, err = this.rawWriter.Write(bytes4)
|
||||
fsutils.WriterLimiter.Release()
|
||||
if err != nil {
|
||||
_ = this.Discard()
|
||||
return err
|
||||
@@ -106,9 +99,7 @@ func (this *PartialFileWriter) WriteHeaderLength(headerLength int) error {
|
||||
|
||||
// Write 写入数据
|
||||
func (this *PartialFileWriter) Write(data []byte) (n int, err error) {
|
||||
fsutils.WriterLimiter.Ack()
|
||||
n, err = this.rawWriter.Write(data)
|
||||
fsutils.WriterLimiter.Release()
|
||||
this.bodySize += int64(n)
|
||||
if err != nil {
|
||||
_ = this.Discard()
|
||||
@@ -132,11 +123,14 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error {
|
||||
// prevent extending too much space in a single writing
|
||||
var maxOffset = this.ranges.Max()
|
||||
if offset-maxOffset > 16<<20 {
|
||||
var extendSizePerStep int64 = 1 << 20
|
||||
var maxExtendSize int64 = 32 << 20
|
||||
if fsutils.DiskIsExtremelyFast() {
|
||||
maxExtendSize = 128 << 20
|
||||
extendSizePerStep = 4 << 20
|
||||
} else if fsutils.DiskIsFast() {
|
||||
maxExtendSize = 64 << 20
|
||||
extendSizePerStep = 2 << 20
|
||||
}
|
||||
if offset-maxOffset > maxExtendSize {
|
||||
stat, err := this.rawWriter.Stat()
|
||||
@@ -145,11 +139,8 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error {
|
||||
}
|
||||
|
||||
// extend min size to prepare for file tail
|
||||
const extendSizePerStep = 8 << 20
|
||||
if stat.Size()+extendSizePerStep <= this.bodyOffset+offset+int64(len(data)) {
|
||||
fsutils.WriterLimiter.Ack()
|
||||
_ = this.rawWriter.Truncate(stat.Size() + extendSizePerStep)
|
||||
fsutils.WriterLimiter.Release()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -163,9 +154,7 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error {
|
||||
this.bodyOffset = SizeMeta + int64(keyLength) + this.headerSize
|
||||
}
|
||||
|
||||
fsutils.WriterLimiter.Ack()
|
||||
_, err := this.rawWriter.WriteAt(data, this.bodyOffset+offset)
|
||||
fsutils.WriterLimiter.Release()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -192,9 +181,7 @@ func (this *PartialFileWriter) WriteBodyLength(bodyLength int64) error {
|
||||
_ = this.Discard()
|
||||
return err
|
||||
}
|
||||
fsutils.WriterLimiter.Ack()
|
||||
_, err = this.rawWriter.Write(bytes8)
|
||||
fsutils.WriterLimiter.Release()
|
||||
if err != nil {
|
||||
_ = this.Discard()
|
||||
return err
|
||||
@@ -211,9 +198,7 @@ func (this *PartialFileWriter) Close() error {
|
||||
this.ranges.BodySize = this.bodySize
|
||||
err := this.ranges.WriteToFile(this.rangePath)
|
||||
if err != nil {
|
||||
fsutils.WriterLimiter.Ack()
|
||||
_ = this.rawWriter.Close()
|
||||
fsutils.WriterLimiter.Release()
|
||||
this.remove()
|
||||
return err
|
||||
}
|
||||
@@ -222,25 +207,19 @@ func (this *PartialFileWriter) Close() error {
|
||||
if this.isNew {
|
||||
err = this.WriteHeaderLength(types.Int(this.headerSize))
|
||||
if err != nil {
|
||||
fsutils.WriterLimiter.Ack()
|
||||
_ = this.rawWriter.Close()
|
||||
fsutils.WriterLimiter.Release()
|
||||
this.remove()
|
||||
return err
|
||||
}
|
||||
err = this.WriteBodyLength(this.bodySize)
|
||||
if err != nil {
|
||||
fsutils.WriterLimiter.Ack()
|
||||
_ = this.rawWriter.Close()
|
||||
fsutils.WriterLimiter.Release()
|
||||
this.remove()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
fsutils.WriterLimiter.Ack()
|
||||
err = this.rawWriter.Close()
|
||||
fsutils.WriterLimiter.Release()
|
||||
if err != nil {
|
||||
this.remove()
|
||||
}
|
||||
@@ -254,9 +233,7 @@ func (this *PartialFileWriter) Discard() error {
|
||||
this.endFunc()
|
||||
})
|
||||
|
||||
fsutils.WriterLimiter.Ack()
|
||||
_ = this.rawWriter.Close()
|
||||
fsutils.WriterLimiter.Release()
|
||||
|
||||
SharedPartialRangesQueue.Delete(this.rangePath)
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package teaconst
|
||||
|
||||
const (
|
||||
Version = "1.3.7"
|
||||
Version = "1.3.8.1"
|
||||
|
||||
ProductName = "Edge Node"
|
||||
ProcessName = "edge-node"
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
_ "github.com/TeaOSLab/EdgeNode/internal/utils/agents" // 引入Agent管理器
|
||||
_ "github.com/TeaOSLab/EdgeNode/internal/utils/clock" // 触发时钟更新
|
||||
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/jsonutils"
|
||||
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/waf"
|
||||
@@ -880,6 +881,10 @@ func (this *Node) onReload(config *nodeconfigs.NodeConfig, reloadAll bool) {
|
||||
nodeconfigs.ResetNodeConfig(config)
|
||||
sharedNodeConfig = config
|
||||
|
||||
// 并发读写数
|
||||
fsutils.ReaderLimiter.SetThreads(config.MaxConcurrentReads)
|
||||
fsutils.WriterLimiter.SetThreads(config.MaxConcurrentWrites)
|
||||
|
||||
if reloadAll {
|
||||
// 缓存策略
|
||||
var subDirs = config.CacheDiskSubDirs
|
||||
|
||||
@@ -316,7 +316,8 @@ func (this *BlocksFile) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
_ = this.sync(true)
|
||||
// TODO 决定是否同步
|
||||
//_ = this.sync(true)
|
||||
|
||||
this.isClosed = true
|
||||
|
||||
|
||||
@@ -256,6 +256,11 @@ func (this *FS) openBFileForHashWriting(hash string) (*BlocksFile, error) {
|
||||
if ok {
|
||||
// 调整当前BFile所在位置
|
||||
this.mu.Lock()
|
||||
|
||||
if bFile.IsClosing() {
|
||||
// TODO 需要重新等待打开
|
||||
}
|
||||
|
||||
item, itemOk := this.bItemMap[bName]
|
||||
if itemOk {
|
||||
this.bList.Remove(item)
|
||||
|
||||
@@ -18,9 +18,9 @@ type FSOptions struct {
|
||||
func (this *FSOptions) EnsureDefaults() {
|
||||
if this.MaxOpenFiles <= 0 {
|
||||
// 根据内存计算最大打开文件数
|
||||
var maxOpenFiles = memutils.SystemMemoryGB() * 64
|
||||
if maxOpenFiles > (4 << 10) {
|
||||
maxOpenFiles = 4 << 10
|
||||
var maxOpenFiles = memutils.SystemMemoryGB() * 128
|
||||
if maxOpenFiles > (8 << 10) {
|
||||
maxOpenFiles = 8 << 10
|
||||
}
|
||||
this.MaxOpenFiles = maxOpenFiles
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ package bfs
|
||||
|
||||
import "github.com/TeaOSLab/EdgeNode/internal/zero"
|
||||
|
||||
// TODO 使用atomic代替channel?需要使用基准测试对比性能
|
||||
// TODO 线程数可以根据硬盘数量动态调整?
|
||||
var readThreadsLimiter = make(chan zero.Zero, 8)
|
||||
var writeThreadsLimiter = make(chan zero.Zero, 8)
|
||||
|
||||
94
internal/utils/fs/file.go
Normal file
94
internal/utils/fs/file.go
Normal file
@@ -0,0 +1,94 @@
|
||||
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
|
||||
package fsutils
|
||||
|
||||
import "os"
|
||||
|
||||
const FlagRead = 0x1
|
||||
const FlagWrite = 0x2
|
||||
|
||||
type File struct {
|
||||
rawFile *os.File
|
||||
readonly bool
|
||||
}
|
||||
|
||||
func NewFile(rawFile *os.File, flags int) *File {
|
||||
return &File{
|
||||
rawFile: rawFile,
|
||||
readonly: flags&FlagRead == FlagRead,
|
||||
}
|
||||
}
|
||||
|
||||
func (this *File) Name() string {
|
||||
return this.rawFile.Name()
|
||||
}
|
||||
|
||||
func (this *File) Fd() uintptr {
|
||||
return this.rawFile.Fd()
|
||||
}
|
||||
|
||||
func (this *File) Raw() *os.File {
|
||||
return this.rawFile
|
||||
}
|
||||
|
||||
func (this *File) Stat() (os.FileInfo, error) {
|
||||
return this.rawFile.Stat()
|
||||
}
|
||||
|
||||
func (this *File) Seek(offset int64, whence int) (ret int64, err error) {
|
||||
ret, err = this.rawFile.Seek(offset, whence)
|
||||
return
|
||||
}
|
||||
|
||||
func (this *File) Read(b []byte) (n int, err error) {
|
||||
ReaderLimiter.Ack()
|
||||
n, err = this.rawFile.Read(b)
|
||||
ReaderLimiter.Release()
|
||||
return
|
||||
}
|
||||
|
||||
func (this *File) ReadAt(b []byte, off int64) (n int, err error) {
|
||||
ReaderLimiter.Ack()
|
||||
n, err = this.rawFile.ReadAt(b, off)
|
||||
ReaderLimiter.Release()
|
||||
return
|
||||
}
|
||||
|
||||
func (this *File) Write(b []byte) (n int, err error) {
|
||||
WriterLimiter.Ack()
|
||||
n, err = this.rawFile.Write(b)
|
||||
WriterLimiter.Release()
|
||||
return
|
||||
}
|
||||
|
||||
func (this *File) WriteAt(b []byte, off int64) (n int, err error) {
|
||||
WriterLimiter.Ack()
|
||||
n, err = this.rawFile.WriteAt(b, off)
|
||||
WriterLimiter.Release()
|
||||
return
|
||||
}
|
||||
|
||||
func (this *File) Sync() (err error) {
|
||||
WriterLimiter.Ack()
|
||||
err = this.rawFile.Sync()
|
||||
WriterLimiter.Release()
|
||||
return
|
||||
}
|
||||
|
||||
func (this *File) Truncate(size int64) (err error) {
|
||||
WriterLimiter.Ack()
|
||||
err = this.rawFile.Truncate(size)
|
||||
WriterLimiter.Release()
|
||||
return
|
||||
}
|
||||
|
||||
func (this *File) Close() (err error) {
|
||||
if !this.readonly {
|
||||
WriterLimiter.Ack()
|
||||
}
|
||||
err = this.rawFile.Close()
|
||||
if !this.readonly {
|
||||
WriterLimiter.Release()
|
||||
}
|
||||
return
|
||||
}
|
||||
16
internal/utils/fs/file_test.go
Normal file
16
internal/utils/fs/file_test.go
Normal file
@@ -0,0 +1,16 @@
|
||||
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
|
||||
package fsutils_test
|
||||
|
||||
import (
|
||||
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
|
||||
"github.com/iwind/TeaGo/assert"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestFileFlags(t *testing.T) {
|
||||
var a = assert.NewAssertion(t)
|
||||
a.IsTrue(fsutils.FlagRead&fsutils.FlagRead == fsutils.FlagRead)
|
||||
a.IsTrue(fsutils.FlagWrite&fsutils.FlagWrite != fsutils.FlagRead)
|
||||
a.IsTrue((fsutils.FlagWrite|fsutils.FlagRead)&fsutils.FlagRead == fsutils.FlagRead)
|
||||
}
|
||||
@@ -3,26 +3,27 @@
|
||||
package fsutils
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"runtime"
|
||||
"time"
|
||||
)
|
||||
|
||||
var maxThreads = runtime.NumCPU()
|
||||
var WriterLimiter = NewLimiter(maxThreads)
|
||||
var ReaderLimiter = NewLimiter(maxThreads)
|
||||
var WriterLimiter = NewLimiter(max(maxThreads, 8))
|
||||
var ReaderLimiter = NewLimiter(max(maxThreads, 8))
|
||||
|
||||
type Limiter struct {
|
||||
threads chan struct{}
|
||||
timers chan *time.Timer
|
||||
threads chan struct{}
|
||||
count int
|
||||
countDefault int
|
||||
timers chan *time.Timer
|
||||
}
|
||||
|
||||
func NewLimiter(threads int) *Limiter {
|
||||
if threads < 4 {
|
||||
threads = 4
|
||||
}
|
||||
if threads > 32 {
|
||||
threads = 32
|
||||
if threads > 64 {
|
||||
threads = 64
|
||||
}
|
||||
|
||||
var threadsChan = make(chan struct{}, threads)
|
||||
@@ -31,8 +32,26 @@ func NewLimiter(threads int) *Limiter {
|
||||
}
|
||||
|
||||
return &Limiter{
|
||||
threads: threadsChan,
|
||||
timers: make(chan *time.Timer, 2048),
|
||||
countDefault: threads,
|
||||
count: threads,
|
||||
threads: threadsChan,
|
||||
timers: make(chan *time.Timer, 4096),
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Limiter) SetThreads(newThreads int) {
|
||||
if newThreads <= 0 {
|
||||
newThreads = this.countDefault
|
||||
}
|
||||
|
||||
if newThreads != this.count {
|
||||
var threadsChan = make(chan struct{}, newThreads)
|
||||
for i := 0; i < newThreads; i++ {
|
||||
threadsChan <- struct{}{}
|
||||
}
|
||||
|
||||
this.threads = threadsChan
|
||||
this.count = newThreads
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,7 +91,7 @@ func (this *Limiter) Release() {
|
||||
select {
|
||||
case this.threads <- struct{}{}:
|
||||
default:
|
||||
remotelogs.Error("FS_LIMITER", "Limiter Ack()/Release() should appeared as a pair")
|
||||
// 由于容量可能有变化,这里忽略多余的thread
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,10 +4,34 @@ package fsutils_test
|
||||
|
||||
import (
|
||||
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
|
||||
"github.com/iwind/TeaGo/assert"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestLimiter_SetThreads(t *testing.T) {
|
||||
var limiter = fsutils.NewLimiter(4)
|
||||
|
||||
var concurrent = 1024
|
||||
|
||||
var wg = sync.WaitGroup{}
|
||||
wg.Add(concurrent)
|
||||
|
||||
for i := 0; i < concurrent; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
limiter.SetThreads(rand.Int() % 128)
|
||||
limiter.TryAck()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestLimiter_Ack(t *testing.T) {
|
||||
var a = assert.NewAssertion(t)
|
||||
|
||||
@@ -52,3 +76,48 @@ func TestLimiter_TryAck(t *testing.T) {
|
||||
a.IsTrue(limiter.FreeThreads() == 0)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLimiter_TryAck2(t *testing.T) {
|
||||
if !testutils.IsSingleTesting() {
|
||||
return
|
||||
}
|
||||
|
||||
var a = assert.NewAssertion(t)
|
||||
|
||||
{
|
||||
var limiter = fsutils.NewLimiter(4)
|
||||
var count = limiter.FreeThreads()
|
||||
a.IsTrue(count == 4)
|
||||
for i := 0; i < count-1; i++ {
|
||||
limiter.Ack()
|
||||
}
|
||||
a.IsTrue(limiter.FreeThreads() == 1)
|
||||
a.IsTrue(limiter.TryAck())
|
||||
a.IsFalse(limiter.TryAck())
|
||||
a.IsFalse(limiter.TryAck())
|
||||
|
||||
limiter.Release()
|
||||
a.IsTrue(limiter.TryAck())
|
||||
}
|
||||
}
|
||||
|
||||
func TestLimiter_Timout(t *testing.T) {
|
||||
var timeout = time.NewTimer(100 * time.Millisecond)
|
||||
|
||||
var r = make(chan bool, 1)
|
||||
r <- true
|
||||
|
||||
var before = time.Now()
|
||||
select {
|
||||
case <-r:
|
||||
case <-timeout.C:
|
||||
}
|
||||
t.Log(time.Since(before).Seconds()*1000, "ms")
|
||||
|
||||
timeout.Stop()
|
||||
|
||||
before = time.Now()
|
||||
timeout.Reset(100 * time.Millisecond)
|
||||
<-timeout.C
|
||||
t.Log(time.Since(before).Seconds()*1000, "ms")
|
||||
}
|
||||
|
||||
@@ -2,7 +2,9 @@
|
||||
|
||||
package fsutils
|
||||
|
||||
import "os"
|
||||
import (
|
||||
"os"
|
||||
)
|
||||
|
||||
func Remove(filename string) (err error) {
|
||||
WriterLimiter.Ack()
|
||||
@@ -26,6 +28,29 @@ func ReadFile(filename string) (data []byte, err error) {
|
||||
}
|
||||
|
||||
func WriteFile(filename string, data []byte, perm os.FileMode) (err error) {
|
||||
WriterLimiter.Ack()
|
||||
err = os.WriteFile(filename, data, perm)
|
||||
WriterLimiter.Release()
|
||||
return
|
||||
}
|
||||
|
||||
func OpenFile(name string, flag int, perm os.FileMode) (f *os.File, err error) {
|
||||
if flag&os.O_RDONLY == os.O_RDONLY {
|
||||
ReaderLimiter.Ack()
|
||||
}
|
||||
|
||||
f, err = os.OpenFile(name, flag, perm)
|
||||
|
||||
if flag&os.O_RDONLY == os.O_RDONLY {
|
||||
ReaderLimiter.Release()
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func Open(name string) (f *os.File, err error) {
|
||||
ReaderLimiter.Ack()
|
||||
f, err = os.Open(name)
|
||||
ReaderLimiter.Release()
|
||||
return
|
||||
}
|
||||
|
||||
17
internal/utils/fs/os_test.go
Normal file
17
internal/utils/fs/os_test.go
Normal file
@@ -0,0 +1,17 @@
|
||||
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
|
||||
package fsutils_test
|
||||
|
||||
import (
|
||||
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestOpenFile(t *testing.T) {
|
||||
f, err := fsutils.OpenFile("./os_test.go", os.O_RDONLY, 0444)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_ = f.Close()
|
||||
}
|
||||
36
internal/utils/percpu/chan.go
Normal file
36
internal/utils/percpu/chan.go
Normal file
@@ -0,0 +1,36 @@
|
||||
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
|
||||
package percpu
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
)
|
||||
|
||||
type Chan[T any] struct {
|
||||
c chan T
|
||||
|
||||
count int
|
||||
cList []chan T
|
||||
}
|
||||
|
||||
func NewChan[T any](size int) *Chan[T] {
|
||||
var count = max(runtime.NumCPU(), runtime.GOMAXPROCS(0))
|
||||
var cList []chan T
|
||||
for i := 0; i < count; i++ {
|
||||
cList = append(cList, make(chan T, size))
|
||||
}
|
||||
|
||||
return &Chan[T]{
|
||||
c: make(chan T, size),
|
||||
count: count,
|
||||
cList: cList,
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Chan[T]) C() chan T {
|
||||
var procId = GetProcId()
|
||||
if procId < this.count {
|
||||
return this.cList[procId]
|
||||
}
|
||||
return this.c
|
||||
}
|
||||
23
internal/utils/percpu/chan_test.go
Normal file
23
internal/utils/percpu/chan_test.go
Normal file
@@ -0,0 +1,23 @@
|
||||
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
|
||||
package percpu_test
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/percpu"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/zero"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestChan_C(t *testing.T) {
|
||||
var c = percpu.NewChan[zero.Zero](10)
|
||||
c.C() <- zero.Zero{}
|
||||
|
||||
t.Log(<-c.C())
|
||||
|
||||
select {
|
||||
case <-c.C():
|
||||
t.Fatal("should not return from here")
|
||||
default:
|
||||
t.Log("ok")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user