Compare commits

...

14 Commits

Author SHA1 Message Date
刘祥超
51dd778ca7 更改为v0.4.2 2022-02-21 17:52:02 +08:00
刘祥超
b5f706686c 修复热点数据从文件系统转移到内存时可能不完整的Bug/实现部分Partial Content功能 2022-02-21 17:33:58 +08:00
刘祥超
b67c2ec39c 缓存关闭X-Cache显示时从Header中删除X-Cache 2022-02-21 16:55:25 +08:00
刘祥超
d40bc4e72b URL跳转可以设置是否保留参数 2022-02-20 09:17:50 +08:00
刘祥超
94d0fc7e88 当压缩格式不在Accept-Encoding中时自动解压 2022-02-18 11:05:09 +08:00
刘祥超
ceaeba7089 修复文件句柄缓存可能重复加入的Bug 2022-02-17 17:38:56 +08:00
刘祥超
a1e868bf29 读取缓存错误更详细 2022-02-17 17:24:35 +08:00
刘祥超
e60af85819 修复从缓存文件中读取压缩内容时可能失败的Bug 2022-02-17 16:56:13 +08:00
刘祥超
7bd24fcc81 检查是否压缩的时候,如果content-type为空,则默认为text/html 2022-02-15 18:31:37 +08:00
刘祥超
4331223916 优化代码 2022-02-15 16:44:39 +08:00
刘祥超
f50113517a 重构对HTTP请求的处理方法:缓存、压缩、WebP、限速 2022-02-15 14:55:49 +08:00
刘祥超
6d6e25f298 WAF规则提示错误时增加分组ID、规则集ID、规则描述 2022-01-29 21:43:42 +08:00
刘祥超
69c89fda48 支持单个服务更新配置 2022-01-19 22:16:46 +08:00
刘祥超
9a56671457 修改版本为v0.4.1 2022-01-17 10:53:23 +08:00
42 changed files with 1844 additions and 887 deletions

View File

@@ -11,7 +11,7 @@ func TestOpenFilePool_Get(t *testing.T) {
var pool = caches.NewOpenFilePool("a")
t.Log(pool.Filename())
t.Log(pool.Get())
t.Log(pool.Put(caches.NewOpenFile(nil, nil)))
t.Log(pool.Put(caches.NewOpenFile(nil, nil, []byte{})))
t.Log(pool.Get())
t.Log(pool.Get())
}

View File

@@ -0,0 +1,143 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
import (
"encoding/json"
)
// PartialRanges 内容分区范围定义
type PartialRanges struct {
ranges [][2]int64
}
// NewPartialRanges 获取新对象
func NewPartialRanges() *PartialRanges {
return &PartialRanges{ranges: [][2]int64{}}
}
// NewPartialRangesFromJSON 从JSON中解析范围
func NewPartialRangesFromJSON(data []byte) (*PartialRanges, error) {
var rs = [][2]int64{}
err := json.Unmarshal(data, &rs)
if err != nil {
return nil, err
}
var r = NewPartialRanges()
r.ranges = rs
return r, nil
}
// Add 添加新范围
func (this *PartialRanges) Add(begin int64, end int64) {
if begin > end {
begin, end = end, begin
}
var nr = [2]int64{begin, end}
var count = len(this.ranges)
if count == 0 {
this.ranges = [][2]int64{nr}
return
}
// insert
// TODO 将来使用二分法改进
var index = -1
for i, r := range this.ranges {
if r[0] > begin || (r[0] == begin && r[1] >= end) {
index = i
this.ranges = append(this.ranges, [2]int64{})
copy(this.ranges[index+1:], this.ranges[index:])
this.ranges[index] = nr
break
}
}
if index == -1 {
index = count
this.ranges = append(this.ranges, nr)
}
this.merge(index)
}
// Ranges 获取所有范围
func (this *PartialRanges) Ranges() [][2]int64 {
return this.ranges
}
// Contains 检查是否包含某个范围
func (this *PartialRanges) Contains(begin int64, end int64) bool {
if len(this.ranges) == 0 {
return true
}
// TODO 使用二分法查找改进性能
for _, r2 := range this.ranges {
if r2[0] <= begin && r2[1] >= end {
return true
}
}
return false
}
// AsJSON 转换为JSON
func (this *PartialRanges) AsJSON() ([]byte, error) {
return json.Marshal(this.ranges)
}
func (this *PartialRanges) merge(index int) {
// forward
var lastIndex = index
for i := index; i >= 1; i-- {
var curr = this.ranges[i]
var prev = this.ranges[i-1]
var w1 = this.w(curr)
var w2 = this.w(prev)
if w1+w2 >= this.max(curr[1], prev[1])-this.min(curr[0], prev[0])-1 {
prev = [2]int64{this.min(curr[0], prev[0]), this.max(curr[1], prev[1])}
this.ranges[i-1] = prev
this.ranges = append(this.ranges[:i], this.ranges[i+1:]...)
lastIndex = i - 1
} else {
break
}
}
// backward
index = lastIndex
for index < len(this.ranges)-1 {
var curr = this.ranges[index]
var next = this.ranges[index+1]
var w1 = this.w(curr)
var w2 = this.w(next)
if w1+w2 >= this.max(curr[1], next[1])-this.min(curr[0], next[0])-1 {
curr = [2]int64{this.min(curr[0], next[0]), this.max(curr[1], next[1])}
this.ranges = append(this.ranges[:index], this.ranges[index+1:]...)
this.ranges[index] = curr
} else {
break
}
}
}
func (this *PartialRanges) w(r [2]int64) int64 {
return r[1] - r[0]
}
func (this *PartialRanges) min(n1 int64, n2 int64) int64 {
if n1 <= n2 {
return n1
}
return n2
}
func (this *PartialRanges) max(n1 int64, n2 int64) int64 {
if n1 >= n2 {
return n1
}
return n2
}

View File

@@ -0,0 +1,124 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches_test
import (
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/iwind/TeaGo/assert"
"github.com/iwind/TeaGo/logs"
"testing"
)
func TestNewPartialRanges(t *testing.T) {
var r = caches.NewPartialRanges()
r.Add(1, 100)
r.Add(50, 300)
r.Add(30, 80)
r.Add(30, 100)
r.Add(30, 400)
r.Add(1000, 10000)
r.Add(200, 1000)
r.Add(200, 10040)
logs.PrintAsJSON(r.Ranges())
}
func TestNewPartialRanges1(t *testing.T) {
var a = assert.NewAssertion(t)
var r = caches.NewPartialRanges()
r.Add(1, 100)
r.Add(1, 101)
r.Add(1, 102)
r.Add(2, 103)
r.Add(200, 300)
r.Add(1, 1000)
var rs = r.Ranges()
logs.PrintAsJSON(rs, t)
a.IsTrue(len(rs) == 1)
if len(rs) == 1 {
a.IsTrue(rs[0][0] == 1)
a.IsTrue(rs[0][1] == 1000)
}
}
func TestNewPartialRanges2(t *testing.T) {
// low -> high
var r = caches.NewPartialRanges()
r.Add(1, 100)
r.Add(1, 101)
r.Add(1, 102)
r.Add(2, 103)
r.Add(200, 300)
r.Add(301, 302)
r.Add(303, 304)
r.Add(250, 400)
var rs = r.Ranges()
logs.PrintAsJSON(rs, t)
}
func TestNewPartialRanges3(t *testing.T) {
// high -> low
var r = caches.NewPartialRanges()
r.Add(301, 302)
r.Add(303, 304)
r.Add(200, 300)
r.Add(250, 400)
var rs = r.Ranges()
logs.PrintAsJSON(rs, t)
}
func TestNewPartialRanges4(t *testing.T) {
// nearby
var r = caches.NewPartialRanges()
r.Add(301, 302)
r.Add(303, 304)
r.Add(305, 306)
r.Add(417, 417)
r.Add(410, 415)
r.Add(400, 409)
var rs = r.Ranges()
logs.PrintAsJSON(rs, t)
t.Log(r.Contains(400, 416))
}
func TestNewPartialRanges5(t *testing.T) {
var r = caches.NewPartialRanges()
for j := 0; j < 1000; j++ {
r.Add(int64(j), int64(j+100))
}
logs.PrintAsJSON(r.Ranges(), t)
}
func TestNewPartialRanges_AsJSON(t *testing.T) {
var r = caches.NewPartialRanges()
for j := 0; j < 1000; j++ {
r.Add(int64(j), int64(j+100))
}
data, err := r.AsJSON()
if err != nil {
t.Fatal(err)
}
t.Log(string(data))
r2, err := caches.NewPartialRangesFromJSON(data)
if err != nil {
t.Fatal(err)
}
t.Log(r2.Ranges())
}
func BenchmarkNewPartialRanges(b *testing.B) {
for i := 0; i < b.N; i++ {
var r = caches.NewPartialRanges()
for j := 0; j < 1000; j++ {
r.Add(int64(j), int64(j+100))
}
}
}

View File

@@ -24,8 +24,7 @@ type FileReader struct {
bodySize int64
bodyOffset int64
bodyBufLen int
bodyBuf []byte
isClosed bool
}
func NewFileReader(fp *os.File) *FileReader {
@@ -33,6 +32,10 @@ func NewFileReader(fp *os.File) *FileReader {
}
func (this *FileReader) Init() error {
return this.InitAutoDiscard(true)
}
func (this *FileReader) InitAutoDiscard(autoDiscard bool) error {
if this.openFile != nil {
this.meta = this.openFile.meta
this.header = this.openFile.header
@@ -40,11 +43,13 @@ func (this *FileReader) Init() error {
isOk := false
defer func() {
if !isOk {
_ = this.discard()
}
}()
if autoDiscard {
defer func() {
if !isOk {
_ = this.discard()
}
}()
}
var buf = this.meta
if len(buf) == 0 {
@@ -79,13 +84,13 @@ func (this *FileReader) Init() error {
this.headerOffset = int64(SizeMeta) + int64(urlLength)
// body
this.bodyOffset = this.headerOffset + int64(headerSize)
bodySize := int(binary.BigEndian.Uint64(buf[SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength : SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength+SizeBodyLength]))
if bodySize == 0 {
isOk = true
return nil
}
this.bodySize = int64(bodySize)
this.bodyOffset = this.headerOffset + int64(headerSize)
// read header
if this.openFileCache != nil && len(this.header) == 0 {
@@ -181,10 +186,6 @@ func (this *FileReader) ReadHeader(buf []byte, callback ReaderFunc) error {
}
headerSize -= n
} else {
if n > headerSize {
this.bodyBuf = buf[headerSize:]
this.bodyBufLen = n - headerSize
}
_, e := callback(headerSize)
if e != nil {
isOk = true
@@ -203,6 +204,12 @@ func (this *FileReader) ReadHeader(buf []byte, callback ReaderFunc) error {
isOk = true
// 移动到Body位置
_, err = this.fp.Seek(this.bodyOffset, io.SeekStart)
if err != nil {
return err
}
return nil
}
@@ -215,27 +222,7 @@ func (this *FileReader) ReadBody(buf []byte, callback ReaderFunc) error {
}
}()
offset := this.bodyOffset
// 直接返回从Header中剩余的
if this.bodyBufLen > 0 && len(buf) >= this.bodyBufLen {
offset += int64(this.bodyBufLen)
copy(buf, this.bodyBuf)
isOk = true
goNext, err := callback(this.bodyBufLen)
if err != nil {
return err
}
if !goNext {
return nil
}
if this.bodySize <= int64(this.bodyBufLen) {
return nil
}
}
var offset = this.bodyOffset
// 开始读Body部分
_, err := this.fp.Seek(offset, io.SeekStart)
@@ -269,32 +256,9 @@ func (this *FileReader) ReadBody(buf []byte, callback ReaderFunc) error {
}
func (this *FileReader) Read(buf []byte) (n int, err error) {
var isOk = false
defer func() {
if !isOk {
_ = this.discard()
}
}()
// 直接返回从Header中剩余的
if this.bodyBufLen > 0 && len(buf) >= this.bodyBufLen {
copy(buf, this.bodyBuf)
isOk = true
n = this.bodyBufLen
if this.bodySize <= int64(this.bodyBufLen) {
err = io.EOF
return
}
this.bodyBufLen = 0
return
}
n, err = this.fp.Read(buf)
if err == nil || err == io.EOF {
isOk = true
if err != nil && err != io.EOF {
_ = this.discard()
}
return
}
@@ -370,6 +334,11 @@ func (this *FileReader) ReadBodyRange(buf []byte, start int64, end int64, callba
func (this *FileReader) Close() error {
if this.openFileCache != nil {
if this.isClosed {
return nil
}
this.isClosed = true
if this.openFile != nil {
this.openFileCache.Put(this.fp.Name(), this.openFile)
} else {
@@ -391,5 +360,6 @@ func (this *FileReader) readToBuff(fp *os.File, buf []byte) (ok bool, err error)
func (this *FileReader) discard() error {
_ = this.fp.Close()
this.isClosed = true
return os.Remove(this.fp.Name())
}

View File

@@ -54,6 +54,30 @@ func TestFileReader(t *testing.T) {
}
}
func TestFileReader_ReadHeader(t *testing.T) {
var path = "/Users/WorkSpace/EdgeProject/EdgeCache/p43/12/6b/126bbed90fc80f2bdfb19558948b0d49.cache"
fp, err := os.Open(path)
if err != nil {
t.Fatal(err)
}
defer func() {
_ = fp.Close()
}()
var reader = NewFileReader(fp)
err = reader.Init()
if err != nil {
t.Fatal(err)
}
var buf = make([]byte, 16*1024)
err = reader.ReadHeader(buf, func(n int) (goNext bool, err error) {
t.Log("header:", string(buf[:n]))
return
})
if err != nil {
t.Fatal(err)
}
}
func TestFileReader_Range(t *testing.T) {
storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{
Id: 1,

View File

@@ -325,10 +325,11 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool)
}
// OpenWriter 打开缓存文件等待写入
func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, size int64, isPartial bool) (Writer, error) {
// 先尝试内存缓存
if this.memoryStorage != nil {
writer, err := this.memoryStorage.OpenWriter(key, expiredAt, status)
// 我们限定仅小文件优先存在内存中
if !isPartial && this.memoryStorage != nil && size > 0 && size < 32*1024*1024 {
writer, err := this.memoryStorage.OpenWriter(key, expiredAt, status, size, false)
if err == nil {
return writer, nil
}
@@ -360,13 +361,13 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
if this.policy.MaxKeys > 0 && count > this.policy.MaxKeys {
return nil, NewCapacityError("write file cache failed: too many keys in cache storage")
}
capacityBytes := this.diskCapacityBytes()
var capacityBytes = this.diskCapacityBytes()
if capacityBytes > 0 && capacityBytes <= this.totalSize {
return nil, NewCapacityError("write file cache failed: over disk size, current total size: " + strconv.FormatInt(this.totalSize, 10) + " bytes, capacity: " + strconv.FormatInt(capacityBytes, 10))
}
hash := stringutil.Md5(key)
dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
var hash = stringutil.Md5(key)
var dir = this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
_, err = os.Stat(dir)
if err != nil {
if !os.IsNotExist(err) {
@@ -386,6 +387,9 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
return nil, ErrFileIsWriting
}
var tmpPath = cachePath + ".tmp"
if isPartial {
tmpPath = cachePath
}
// 先删除
err = this.list.Remove(hash)
@@ -420,70 +424,96 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
return nil, ErrFileIsWriting
}
err = writer.Truncate(0)
if err != nil {
return nil, err
}
// 写入过期时间
bytes4 := make([]byte, 4)
{
binary.BigEndian.PutUint32(bytes4, uint32(expiredAt))
_, err = writer.Write(bytes4)
if err != nil {
return nil, err
// 是否已经有内容
var isNewCreated = true
var partialBodyOffset int64
if isPartial {
partialFP, err := os.OpenFile(tmpPath, os.O_RDONLY, 0444)
if err == nil {
var partialReader = NewFileReader(partialFP)
err = partialReader.InitAutoDiscard(false)
if err == nil && partialReader.bodyOffset > 0 {
isNewCreated = false
partialBodyOffset = partialReader.bodyOffset
}
_ = partialReader.Close()
}
}
// 写入状态码
if status > 999 || status < 100 {
status = 200
}
_, err = writer.WriteString(strconv.Itoa(status))
if err != nil {
return nil, err
}
// 写入URL长度
{
binary.BigEndian.PutUint32(bytes4, uint32(len(key)))
_, err = writer.Write(bytes4)
if isNewCreated {
err = writer.Truncate(0)
if err != nil {
return nil, err
}
}
// 写入Header Length
{
binary.BigEndian.PutUint32(bytes4, uint32(0))
_, err = writer.Write(bytes4)
// 写入过期时间
bytes4 := make([]byte, 4)
{
binary.BigEndian.PutUint32(bytes4, uint32(expiredAt))
_, err = writer.Write(bytes4)
if err != nil {
return nil, err
}
}
// 写入状态码
if status > 999 || status < 100 {
status = 200
}
_, err = writer.WriteString(strconv.Itoa(status))
if err != nil {
return nil, err
}
}
// 写入Body Length
{
b := make([]byte, SizeBodyLength)
binary.BigEndian.PutUint64(b, uint64(0))
_, err = writer.Write(b)
// 写入URL长度
{
binary.BigEndian.PutUint32(bytes4, uint32(len(key)))
_, err = writer.Write(bytes4)
if err != nil {
return nil, err
}
}
// 写入Header Length
{
binary.BigEndian.PutUint32(bytes4, uint32(0))
_, err = writer.Write(bytes4)
if err != nil {
return nil, err
}
}
// 写入Body Length
{
b := make([]byte, SizeBodyLength)
binary.BigEndian.PutUint64(b, uint64(0))
_, err = writer.Write(b)
if err != nil {
return nil, err
}
}
// 写入URL
_, err = writer.WriteString(key)
if err != nil {
return nil, err
}
}
// 写入URL
_, err = writer.WriteString(key)
if err != nil {
return nil, err
}
isOk = true
return NewFileWriter(writer, key, expiredAt, func() {
sharedWritingFileKeyLocker.Lock()
delete(sharedWritingFileKeyMap, key)
sharedWritingFileKeyLocker.Unlock()
}), nil
if isPartial {
return NewPartialFileWriter(writer, key, expiredAt, isNewCreated, isPartial, partialBodyOffset, func() {
sharedWritingFileKeyLocker.Lock()
delete(sharedWritingFileKeyMap, key)
sharedWritingFileKeyLocker.Unlock()
}), nil
} else {
return NewFileWriter(writer, key, expiredAt, func() {
sharedWritingFileKeyLocker.Lock()
delete(sharedWritingFileKeyMap, key)
sharedWritingFileKeyLocker.Unlock()
}), nil
}
}
// AddToList 添加到List
@@ -904,7 +934,7 @@ func (this *FileStorage) hotLoop() {
continue
}
writer, err := this.memoryStorage.openWriter(item.Key, item.ExpiresAt, item.Status, false)
writer, err := this.memoryStorage.openWriter(item.Key, item.ExpiresAt, item.Status, reader.BodySize(), false)
if err != nil {
if !CanIgnoreErr(err) {
remotelogs.Error("CACHE", "transfer hot item failed: "+err.Error())
@@ -929,6 +959,9 @@ func (this *FileStorage) hotLoop() {
err = reader.ReadBody(buf, func(n int) (goNext bool, err error) {
_, err = writer.Write(buf[:n])
if err == nil {
goNext = true
}
return
})
if err != nil {

View File

@@ -62,7 +62,7 @@ func TestFileStorage_OpenWriter(t *testing.T) {
header := []byte("Header")
body := []byte("This is Body")
writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200)
writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1, false)
if err != nil {
t.Fatal(err)
}
@@ -87,6 +87,41 @@ func TestFileStorage_OpenWriter(t *testing.T) {
t.Log("ok")
}
func TestFileStorage_OpenWriter_Partial(t *testing.T) {
var storage = NewFileStorage(&serverconfigs.HTTPCachePolicy{
Id: 2,
IsOn: true,
Options: map[string]interface{}{
"dir": Tea.Root + "/caches",
},
})
err := storage.Init()
if err != nil {
t.Fatal(err)
}
writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1, true)
if err != nil {
t.Fatal(err)
}
_, err = writer.WriteHeader([]byte("Content-Type:text/html; charset=utf-8"))
if err != nil {
t.Fatal(err)
}
err = writer.WriteAt([]byte("Hello, World"), 0)
if err != nil {
t.Fatal(err)
}
err = writer.Close()
if err != nil {
t.Fatal(err)
}
t.Log(writer)
}
func TestFileStorage_OpenWriter_HTTP(t *testing.T) {
storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{
Id: 1,
@@ -104,7 +139,7 @@ func TestFileStorage_OpenWriter_HTTP(t *testing.T) {
t.Log(time.Since(now).Seconds()*1000, "ms")
}()
writer, err := storage.OpenWriter("my-http-response", time.Now().Unix()+86400, 200)
writer, err := storage.OpenWriter("my-http-response", time.Now().Unix()+86400, 200, -1, false)
if err != nil {
t.Fatal(err)
}
@@ -177,10 +212,11 @@ func TestFileStorage_Concurrent_Open_DifferentFile(t *testing.T) {
go func(i int) {
defer wg.Done()
writer, err := storage.OpenWriter("abc"+strconv.Itoa(i), time.Now().Unix()+3600, 200)
writer, err := storage.OpenWriter("abc"+strconv.Itoa(i), time.Now().Unix()+3600, 200, -1, false)
if err != nil {
if err != ErrFileIsWriting {
t.Fatal(err)
t.Error(err)
return
}
return
}
@@ -188,7 +224,8 @@ func TestFileStorage_Concurrent_Open_DifferentFile(t *testing.T) {
_, err = writer.Write([]byte("Hello,World"))
if err != nil {
t.Fatal(err)
t.Error(err)
return
}
// 故意造成慢速写入
@@ -196,7 +233,8 @@ func TestFileStorage_Concurrent_Open_DifferentFile(t *testing.T) {
err = writer.Close()
if err != nil {
t.Fatal(err)
t.Error(err)
return
}
}(i)
}
@@ -229,10 +267,11 @@ func TestFileStorage_Concurrent_Open_SameFile(t *testing.T) {
go func(i int) {
defer wg.Done()
writer, err := storage.OpenWriter("abc"+strconv.Itoa(0), time.Now().Unix()+3600, 200)
writer, err := storage.OpenWriter("abc"+strconv.Itoa(0), time.Now().Unix()+3600, 200, -1, false)
if err != nil {
if err != ErrFileIsWriting {
t.Fatal(err)
t.Error(err)
return
}
return
}
@@ -241,7 +280,8 @@ func TestFileStorage_Concurrent_Open_SameFile(t *testing.T) {
t.Log("writing")
_, err = writer.Write([]byte("Hello,World"))
if err != nil {
t.Fatal(err)
t.Error(err)
return
}
// 故意造成慢速写入
@@ -249,7 +289,8 @@ func TestFileStorage_Concurrent_Open_SameFile(t *testing.T) {
err = writer.Close()
if err != nil {
t.Fatal(err)
t.Error(err)
return
}
}(i)
}

View File

@@ -13,7 +13,7 @@ type StorageInterface interface {
OpenReader(key string, useStale bool) (reader Reader, err error)
// OpenWriter 打开缓存写入器等待写入
OpenWriter(key string, expiredAt int64, status int) (Writer, error)
OpenWriter(key string, expiredAt int64, status int, size int64, isPartial bool) (Writer, error)
// Delete 删除某个键值对应的缓存
Delete(key string) error

View File

@@ -145,11 +145,15 @@ func (this *MemoryStorage) OpenReader(key string, useStale bool) (Reader, error)
}
// OpenWriter 打开缓存写入器等待写入
func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
return this.openWriter(key, expiredAt, status, true)
func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int, size int64, isPartial bool) (Writer, error) {
// TODO 内存缓存暂时不支持分块内容存储
if isPartial {
return nil, ErrFileIsWriting
}
return this.openWriter(key, expiredAt, status, size, true)
}
func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, isDirty bool) (Writer, error) {
func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, size int64, isDirty bool) (Writer, error) {
this.locker.Lock()
defer this.locker.Unlock()
@@ -182,7 +186,10 @@ func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, i
return nil, NewCapacityError("write memory cache failed: too many keys in cache storage")
}
capacityBytes := this.memoryCapacityBytes()
if capacityBytes > 0 && capacityBytes <= this.totalSize {
if size < 0 {
size = 0
}
if capacityBytes > 0 && capacityBytes <= this.totalSize+size {
return nil, NewCapacityError("write memory cache failed: over memory size: " + strconv.FormatInt(capacityBytes, 10) + ", current size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
}
@@ -384,7 +391,7 @@ func (this *MemoryStorage) flushItem(key string) {
return
}
writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status)
writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status, -1, false)
if err != nil {
if !CanIgnoreErr(err) {
remotelogs.Error("CACHE", "flush items failed: open writer failed: "+err.Error())

View File

@@ -15,7 +15,7 @@ import (
func TestMemoryStorage_OpenWriter(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200)
writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, false)
if err != nil {
t.Fatal(err)
}
@@ -62,7 +62,7 @@ func TestMemoryStorage_OpenWriter(t *testing.T) {
}
}
writer, err = storage.OpenWriter("abc", time.Now().Unix()+60, 200)
writer, err = storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, false)
if err != nil {
t.Fatal(err)
}
@@ -103,7 +103,7 @@ func TestMemoryStorage_OpenReaderLock(t *testing.T) {
func TestMemoryStorage_Delete(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
{
writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200)
writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, false)
if err != nil {
t.Fatal(err)
}
@@ -111,7 +111,7 @@ func TestMemoryStorage_Delete(t *testing.T) {
t.Log(len(storage.valuesMap))
}
{
writer, err := storage.OpenWriter("abc1", time.Now().Unix()+60, 200)
writer, err := storage.OpenWriter("abc1", time.Now().Unix()+60, 200, -1, false)
if err != nil {
t.Fatal(err)
}
@@ -126,7 +126,7 @@ func TestMemoryStorage_Stat(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
expiredAt := time.Now().Unix() + 60
{
writer, err := storage.OpenWriter("abc", expiredAt, 200)
writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, false)
if err != nil {
t.Fatal(err)
}
@@ -139,7 +139,7 @@ func TestMemoryStorage_Stat(t *testing.T) {
})
}
{
writer, err := storage.OpenWriter("abc1", expiredAt, 200)
writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, false)
if err != nil {
t.Fatal(err)
}
@@ -163,7 +163,7 @@ func TestMemoryStorage_CleanAll(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
expiredAt := time.Now().Unix() + 60
{
writer, err := storage.OpenWriter("abc", expiredAt, 200)
writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, false)
if err != nil {
t.Fatal(err)
}
@@ -175,7 +175,7 @@ func TestMemoryStorage_CleanAll(t *testing.T) {
})
}
{
writer, err := storage.OpenWriter("abc1", expiredAt, 200)
writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, false)
if err != nil {
t.Fatal(err)
}
@@ -198,7 +198,7 @@ func TestMemoryStorage_Purge(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
expiredAt := time.Now().Unix() + 60
{
writer, err := storage.OpenWriter("abc", expiredAt, 200)
writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, false)
if err != nil {
t.Fatal(err)
}
@@ -210,7 +210,7 @@ func TestMemoryStorage_Purge(t *testing.T) {
})
}
{
writer, err := storage.OpenWriter("abc1", expiredAt, 200)
writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, false)
if err != nil {
t.Fatal(err)
}
@@ -241,7 +241,7 @@ func TestMemoryStorage_Expire(t *testing.T) {
for i := 0; i < 1000; i++ {
expiredAt := time.Now().Unix() + int64(rands.Int(0, 60))
key := "abc" + strconv.Itoa(i)
writer, err := storage.OpenWriter(key, expiredAt, 200)
writer, err := storage.OpenWriter(key, expiredAt, 200, -1, false)
if err != nil {
t.Fatal(err)
}

View File

@@ -8,6 +8,9 @@ type Writer interface {
// Write 写入Body数据
Write(data []byte) (n int, err error)
// WriteAt 在指定位置写入数据
WriteAt(data []byte, offset int64) error
// HeaderSize 写入的Header数据大小
HeaderSize() int64

View File

@@ -1,76 +0,0 @@
package caches
import (
"github.com/TeaOSLab/EdgeNode/internal/compressions"
)
type compressionWriter struct {
rawWriter Writer
writer compressions.Writer
key string
expiredAt int64
}
func NewCompressionWriter(gw Writer, cpWriter compressions.Writer, key string, expiredAt int64) Writer {
return &compressionWriter{
rawWriter: gw,
writer: cpWriter,
key: key,
expiredAt: expiredAt,
}
}
func (this *compressionWriter) WriteHeader(data []byte) (n int, err error) {
return this.writer.Write(data)
}
// WriteHeaderLength 写入Header长度数据
func (this *compressionWriter) WriteHeaderLength(headerLength int) error {
return nil
}
// WriteBodyLength 写入Body长度数据
func (this *compressionWriter) WriteBodyLength(bodyLength int64) error {
return nil
}
func (this *compressionWriter) Write(data []byte) (n int, err error) {
return this.writer.Write(data)
}
func (this *compressionWriter) Close() error {
err := this.writer.Close()
if err != nil {
return err
}
return this.rawWriter.Close()
}
func (this *compressionWriter) Discard() error {
err := this.writer.Close()
if err != nil {
return err
}
return this.rawWriter.Discard()
}
func (this *compressionWriter) Key() string {
return this.key
}
func (this *compressionWriter) ExpiredAt() int64 {
return this.expiredAt
}
func (this *compressionWriter) HeaderSize() int64 {
return this.rawWriter.HeaderSize()
}
func (this *compressionWriter) BodySize() int64 {
return this.rawWriter.BodySize()
}
// ItemType 内容类型
func (this *compressionWriter) ItemType() ItemType {
return this.rawWriter.ItemType()
}

View File

@@ -2,6 +2,7 @@ package caches
import (
"encoding/binary"
"errors"
"github.com/iwind/TeaGo/types"
"io"
"os"
@@ -65,6 +66,13 @@ func (this *FileWriter) Write(data []byte) (n int, err error) {
return
}
// WriteAt 在指定位置写入数据
func (this *FileWriter) WriteAt(data []byte, offset int64) error {
_ = data
_ = offset
return errors.New("not supported")
}
// WriteBodyLength 写入Body长度数据
func (this *FileWriter) WriteBodyLength(bodyLength int64) error {
bytes8 := make([]byte, 8)

View File

@@ -1,6 +1,7 @@
package caches
import (
"errors"
"github.com/cespare/xxhash"
"sync"
"time"
@@ -55,6 +56,13 @@ func (this *MemoryWriter) Write(data []byte) (n int, err error) {
return len(data), nil
}
// WriteAt 在指定位置写入数据
func (this *MemoryWriter) WriteAt(b []byte, offset int64) error {
_ = b
_ = offset
return errors.New("not supported")
}
// HeaderSize 数据尺寸
func (this *MemoryWriter) HeaderSize() int64 {
return this.headerSize

View File

@@ -0,0 +1,173 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
import (
"encoding/binary"
"github.com/iwind/TeaGo/types"
"io"
"os"
"strings"
"sync"
)
type PartialFileWriter struct {
rawWriter *os.File
key string
headerSize int64
bodySize int64
expiredAt int64
endFunc func()
once sync.Once
isNew bool
isPartial bool
bodyOffset int64
}
func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, isNew bool, isPartial bool, bodyOffset int64, endFunc func()) *PartialFileWriter {
return &PartialFileWriter{
key: key,
rawWriter: rawWriter,
expiredAt: expiredAt,
endFunc: endFunc,
isNew: isNew,
isPartial: isPartial,
bodyOffset: bodyOffset,
}
}
// WriteHeader 写入数据
func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) {
if !this.isNew {
return
}
n, err = this.rawWriter.Write(data)
this.headerSize += int64(n)
if err != nil {
_ = this.Discard()
}
return
}
// WriteHeaderLength 写入Header长度数据
func (this *PartialFileWriter) WriteHeaderLength(headerLength int) error {
bytes4 := make([]byte, 4)
binary.BigEndian.PutUint32(bytes4, uint32(headerLength))
_, err := this.rawWriter.Seek(SizeExpiresAt+SizeStatus+SizeURLLength, io.SeekStart)
if err != nil {
_ = this.Discard()
return err
}
_, err = this.rawWriter.Write(bytes4)
if err != nil {
_ = this.Discard()
return err
}
return nil
}
// Write 写入数据
func (this *PartialFileWriter) Write(data []byte) (n int, err error) {
n, err = this.rawWriter.Write(data)
this.bodySize += int64(n)
if err != nil {
_ = this.Discard()
}
return
}
// WriteAt 在指定位置写入数据
func (this *PartialFileWriter) WriteAt(data []byte, offset int64) error {
if this.bodyOffset == 0 {
this.bodyOffset = SizeMeta + int64(len(this.key)) + this.headerSize
}
_, err := this.rawWriter.WriteAt(data, this.bodyOffset+offset)
return err
}
// WriteBodyLength 写入Body长度数据
func (this *PartialFileWriter) WriteBodyLength(bodyLength int64) error {
bytes8 := make([]byte, 8)
binary.BigEndian.PutUint64(bytes8, uint64(bodyLength))
_, err := this.rawWriter.Seek(SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength, io.SeekStart)
if err != nil {
_ = this.Discard()
return err
}
_, err = this.rawWriter.Write(bytes8)
if err != nil {
_ = this.Discard()
return err
}
return nil
}
// Close 关闭
func (this *PartialFileWriter) Close() error {
defer this.once.Do(func() {
this.endFunc()
})
var path = this.rawWriter.Name()
if this.isNew {
err := this.WriteHeaderLength(types.Int(this.headerSize))
if err != nil {
_ = this.rawWriter.Close()
_ = os.Remove(path)
return err
}
err = this.WriteBodyLength(this.bodySize)
if err != nil {
_ = this.rawWriter.Close()
_ = os.Remove(path)
return err
}
}
err := this.rawWriter.Close()
if err != nil {
_ = os.Remove(path)
} else if !this.isPartial {
err = os.Rename(path, strings.Replace(path, ".tmp", "", 1))
if err != nil {
_ = os.Remove(path)
}
}
return err
}
// Discard 丢弃
func (this *PartialFileWriter) Discard() error {
defer this.once.Do(func() {
this.endFunc()
})
_ = this.rawWriter.Close()
err := os.Remove(this.rawWriter.Name())
return err
}
func (this *PartialFileWriter) HeaderSize() int64 {
return this.headerSize
}
func (this *PartialFileWriter) BodySize() int64 {
return this.bodySize
}
func (this *PartialFileWriter) ExpiredAt() int64 {
return this.expiredAt
}
func (this *PartialFileWriter) Key() string {
return this.key
}
// ItemType 获取内容类型
func (this *PartialFileWriter) ItemType() ItemType {
return ItemTypeFile
}

View File

@@ -0,0 +1,50 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches_test
import (
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/iwind/TeaGo/types"
"io/ioutil"
"os"
"testing"
"time"
)
func TestPartialFileWriter_SeekOffset(t *testing.T) {
var path = "/tmp/test@partial.cache"
_ = os.Remove(path)
var reader = func() {
data, err := ioutil.ReadFile(path)
if err != nil {
t.Fatal(err)
}
t.Log("["+types.String(len(data))+"]", string(data))
}
fp, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
t.Fatal(err)
}
var writer = caches.NewPartialFileWriter(fp, "test", time.Now().Unix()+86500, true, true, 0, func() {
t.Log("end")
})
_, err = writer.WriteHeader([]byte("header"))
if err != nil {
t.Fatal(err)
}
// 移动位置
err = writer.WriteAt([]byte("HELLO"), 100)
if err != nil {
t.Fatal(err)
}
err = writer.Close()
if err != nil {
t.Fatal(err)
}
reader()
}

View File

@@ -0,0 +1,68 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"bytes"
"errors"
"github.com/TeaOSLab/EdgeNode/internal/caches"
"io"
"os"
"testing"
)
func TestGzipReader(t *testing.T) {
fp, err := os.Open("/Users/WorkSpace/EdgeProject/EdgeCache/p43/36/7e/367e02720713fe05b66573a1d69b4f0a.cache")
if err != nil {
// not fatal
t.Log(err)
return
}
defer func() {
_ = fp.Close()
}()
var buf = make([]byte, 32*1024)
cacheReader := caches.NewFileReader(fp)
err = cacheReader.Init()
if err != nil {
t.Fatal(err)
}
var headerBuf = []byte{}
err = cacheReader.ReadHeader(buf, func(n int) (goNext bool, err error) {
headerBuf = append(headerBuf, buf[:n]...)
for {
nIndex := bytes.Index(headerBuf, []byte{'\n'})
if nIndex >= 0 {
row := headerBuf[:nIndex]
spaceIndex := bytes.Index(row, []byte{':'})
if spaceIndex <= 0 {
return false, errors.New("invalid header '" + string(row) + "'")
}
headerBuf = headerBuf[nIndex+1:]
} else {
break
}
}
return true, nil
})
reader, err := NewGzipReader(cacheReader)
if err != nil {
t.Fatal(err)
}
for {
n, err := reader.Read(buf)
if err != nil {
if err != io.EOF {
t.Fatal(err)
} else {
break
}
}
t.Log(string(buf[:n]))
_ = n
}
}

View File

@@ -1,7 +1,7 @@
package teaconst
const (
Version = "0.4.0"
Version = "0.4.2"
ProductName = "Edge Node"
ProcessName = "edge-node"

View File

@@ -182,7 +182,7 @@ func (this *APIStream) handleWriteCache(message *pb.NodeStreamMessage) error {
}
expiredAt := time.Now().Unix() + msg.LifeSeconds
writer, err := storage.OpenWriter(msg.Key, expiredAt, 200)
writer, err := storage.OpenWriter(msg.Key, expiredAt, 200, int64(len(msg.Value)), false)
if err != nil {
this.replyFail(message.RequestId, "prepare writing failed: "+err.Error())
return err
@@ -462,7 +462,7 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error {
}
expiredAt := time.Now().Unix() + 8600
writer, err := storage.OpenWriter(key, expiredAt, 200) // TODO 可以设置缓存过期时间
writer, err := storage.OpenWriter(key, expiredAt, 200, resp.ContentLength, false) // TODO 可以设置缓存过期时间
if err != nil {
locker.Lock()
errorMessages = append(errorMessages, "open cache writer failed: "+key+": "+err.Error())

View File

@@ -315,12 +315,12 @@ func (this *HTTPRequest) doEnd() {
// TODO 增加Header统计考虑从Conn中读取
if this.ReqServer != nil {
if this.isCached {
stats.SharedTrafficStatManager.Add(this.ReqServer.Id, this.ReqHost, this.writer.sentBodyBytes, this.writer.sentBodyBytes, 1, 1, 0, 0, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId())
stats.SharedTrafficStatManager.Add(this.ReqServer.Id, this.ReqHost, this.writer.SentBodyBytes(), this.writer.SentBodyBytes(), 1, 1, 0, 0, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId())
} else {
if this.isAttack {
stats.SharedTrafficStatManager.Add(this.ReqServer.Id, this.ReqHost, this.writer.sentBodyBytes, 0, 1, 0, 1, this.writer.sentBodyBytes, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId())
stats.SharedTrafficStatManager.Add(this.ReqServer.Id, this.ReqHost, this.writer.SentBodyBytes(), 0, 1, 0, 1, this.writer.SentBodyBytes(), this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId())
} else {
stats.SharedTrafficStatManager.Add(this.ReqServer.Id, this.ReqHost, this.writer.sentBodyBytes, 0, 1, 0, 0, 0, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId())
stats.SharedTrafficStatManager.Add(this.ReqServer.Id, this.ReqHost, this.writer.SentBodyBytes(), 0, 1, 0, 0, 0, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId())
}
}
}

View File

@@ -5,7 +5,6 @@ import (
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/compressions"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
@@ -22,7 +21,7 @@ import (
func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
this.cacheCanTryStale = false
cachePolicy := this.ReqServer.HTTPCachePolicy
var cachePolicy = this.ReqServer.HTTPCachePolicy
if cachePolicy == nil || !cachePolicy.IsOn {
return
}
@@ -162,11 +161,15 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
var err error
// 是否优先检查WebP
var isWebP = false
if this.web.WebP != nil &&
this.web.WebP.IsOn &&
this.web.WebP.MatchRequest(filepath.Ext(this.Path()), this.Format) &&
this.web.WebP.MatchAccept(this.requestHeader("Accept")) {
reader, _ = storage.OpenReader(key+webpSuffix, useStale)
if reader != nil {
isWebP = true
}
}
// 检查正常的文件
@@ -184,13 +187,16 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
}
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: open cache failed: "+err.Error())
}
return
}
}
defer func() {
_ = reader.Close()
if !this.writer.DelayRead() {
_ = reader.Close()
}
}()
if useStale {
@@ -231,7 +237,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
})
if err != nil {
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: read header failed: "+err.Error())
}
return
}
@@ -246,6 +252,8 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
} else {
this.writer.Header().Set("X-Cache", "HIT, "+refType+", "+reader.TypeName())
}
} else {
this.writer.Header().Del("X-Cache")
}
if this.web.Cache.AddAgeHeader {
this.writer.Header().Set("Age", age)
@@ -257,7 +265,11 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
var eTag = ""
var lastModifiedAt = reader.LastModified()
if lastModifiedAt > 0 {
eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "\""
if isWebP {
eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "_webp" + "\""
} else {
eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "\""
}
respHeader.Del("Etag")
respHeader["ETag"] = []string{eTag}
}
@@ -357,7 +369,6 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
}
}
respHeader := this.writer.Header()
if len(rangeSet) == 1 {
respHeader.Set("Content-Range", "bytes "+strconv.FormatInt(rangeSet[0][0], 10)+"-"+strconv.FormatInt(rangeSet[0][1], 10)+"/"+strconv.FormatInt(reader.BodySize(), 10))
respHeader.Set("Content-Length", strconv.FormatInt(rangeSet[0][1]-rangeSet[0][0]+1, 10))
@@ -379,7 +390,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
return true
}
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error())
}
return
}
@@ -425,7 +436,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
})
if err != nil {
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error())
}
return true
}
@@ -439,25 +450,11 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
return true
}
} else { // 没有Range
var body io.Reader = reader
var contentEncoding = this.writer.Header().Get("Content-Encoding")
if len(contentEncoding) > 0 && !httpAcceptEncoding(this.RawReq.Header.Get("Accept-Encoding"), contentEncoding) {
decompressReader, err := compressions.NewReader(body, contentEncoding)
if err == nil {
body = decompressReader
defer func() {
_ = decompressReader.Close()
}()
this.writer.Header().Del("Content-Encoding")
this.writer.Header().Del("Content-Length")
}
}
this.writer.PrepareCompression(reader.BodySize())
var resp = &http.Response{Body: reader}
this.writer.Prepare(resp, reader.BodySize(), reader.Status(), false)
this.writer.WriteHeader(reader.Status())
_, err = io.CopyBuffer(this.writer, body, buf)
_, err = io.CopyBuffer(this.writer, resp.Body, buf)
if err == io.EOF {
err = nil
}
@@ -465,7 +462,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
this.varMapping["cache.status"] = "MISS"
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: read body failed: "+err.Error())
}
return
}

View File

@@ -190,7 +190,7 @@ func (this *HTTPRequest) doFastcgi() (shouldStop bool) {
this.processResponseHeaders(resp.StatusCode)
// 准备
this.writer.Prepare(resp.ContentLength, resp.StatusCode)
this.writer.Prepare(resp, resp.ContentLength, resp.StatusCode, true)
// 设置响应代码
this.writer.WriteHeader(resp.StatusCode)

View File

@@ -73,6 +73,13 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
return false
}
if u.KeepArgs {
var qIndex = strings.Index(this.uri, "?")
if qIndex >= 0 {
afterURL += this.uri[qIndex:]
}
}
if u.Status <= 0 {
this.processResponseHeaders(http.StatusTemporaryRedirect)
http.Redirect(this.RawWriter, this.RawReq, afterURL, http.StatusTemporaryRedirect)
@@ -88,12 +95,20 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
return false
}
var afterURL = u.AfterURL
if u.KeepArgs {
var qIndex = strings.Index(this.uri, "?")
if qIndex >= 0 {
afterURL += this.uri[qIndex:]
}
}
if u.Status <= 0 {
this.processResponseHeaders(http.StatusTemporaryRedirect)
http.Redirect(this.RawWriter, this.RawReq, u.AfterURL, http.StatusTemporaryRedirect)
http.Redirect(this.RawWriter, this.RawReq, afterURL, http.StatusTemporaryRedirect)
} else {
this.processResponseHeaders(u.Status)
http.Redirect(this.RawWriter, this.RawReq, u.AfterURL, u.Status)
http.Redirect(this.RawWriter, this.RawReq, afterURL, u.Status)
}
return true
}

View File

@@ -61,11 +61,11 @@ func (this *HTTPRequest) doPage(status int) (shouldStop bool) {
if page.NewStatus > 0 {
// 自定义响应Headers
this.processResponseHeaders(page.NewStatus)
this.writer.Prepare(stat.Size(), page.NewStatus)
this.writer.Prepare(nil, stat.Size(), page.NewStatus, true)
this.writer.WriteHeader(page.NewStatus)
} else {
this.processResponseHeaders(status)
this.writer.Prepare(stat.Size(), status)
this.writer.Prepare(nil, stat.Size(), status, true)
this.writer.WriteHeader(status)
}
buf := utils.BytePool1k.Get()
@@ -100,11 +100,11 @@ func (this *HTTPRequest) doPage(status int) (shouldStop bool) {
if page.NewStatus > 0 {
// 自定义响应Headers
this.processResponseHeaders(page.NewStatus)
this.writer.Prepare(int64(len(content)), page.NewStatus)
this.writer.Prepare(nil, int64(len(content)), page.NewStatus, true)
this.writer.WriteHeader(page.NewStatus)
} else {
this.processResponseHeaders(status)
this.writer.Prepare(int64(len(content)), status)
this.writer.Prepare(nil, int64(len(content)), status, true)
this.writer.WriteHeader(status)
}

View File

@@ -5,7 +5,6 @@ import (
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
"github.com/TeaOSLab/EdgeNode/internal/compressions"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"io"
@@ -262,33 +261,15 @@ func (this *HTTPRequest) doReverseProxy() {
}
}
// 解压
if !resp.Uncompressed {
var contentEncoding = resp.Header.Get("Content-Encoding")
if len(contentEncoding) > 0 && !httpAcceptEncoding(this.RawReq.Header.Get("Accept-Encoding"), contentEncoding) {
reader, err := compressions.NewReader(resp.Body, contentEncoding)
if err == nil {
var body = resp.Body
defer func() {
_ = body.Close()
}()
resp.Body = reader
resp.Header.Del("Content-Encoding")
resp.Header.Del("Content-Length")
}
}
}
// 响应Header
this.writer.AddHeaders(resp.Header)
this.processResponseHeaders(resp.StatusCode)
// 是否需要刷新
shouldAutoFlush := this.reverseProxy.AutoFlush || this.RawReq.Header.Get("Accept") == "text/event-stream"
var shouldAutoFlush = this.reverseProxy.AutoFlush || this.RawReq.Header.Get("Accept") == "text/event-stream"
// 准备
delayHeaders := this.writer.Prepare(resp.ContentLength, resp.StatusCode)
var delayHeaders = this.writer.Prepare(resp, resp.ContentLength, resp.StatusCode, true)
// 设置响应代码
if !delayHeaders {

View File

@@ -302,7 +302,7 @@ func (this *HTTPRequest) doRoot() (isBreak bool) {
this.cacheRef = nil // 不支持缓存
}
this.writer.Prepare(fileSize, http.StatusOK)
this.writer.Prepare(nil, fileSize, http.StatusOK, true)
pool := this.bytePool(fileSize)
buf := pool.Get()

View File

@@ -54,9 +54,9 @@ func (this *HTTPRequest) doURL(method string, url string, host string, statusCod
}
this.writer.AddHeaders(resp.Header)
if statusCode <= 0 {
this.writer.Prepare(resp.ContentLength, resp.StatusCode)
this.writer.Prepare(resp, resp.ContentLength, resp.StatusCode, true)
} else {
this.writer.Prepare(resp.ContentLength, statusCode)
this.writer.Prepare(resp, resp.ContentLength, statusCode, true)
}
// 设置响应代码

File diff suppressed because it is too large Load Diff

View File

@@ -1,102 +0,0 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package nodes
import (
"bufio"
"github.com/iwind/TeaGo/types"
"net"
"net/http"
"time"
)
// HTTPRateWriter 限速写入
type HTTPRateWriter struct {
parentWriter http.ResponseWriter
rateBytes int
lastBytes int
timeCost time.Duration
}
func NewHTTPRateWriter(writer http.ResponseWriter, rateBytes int64) http.ResponseWriter {
return &HTTPRateWriter{
parentWriter: writer,
rateBytes: types.Int(rateBytes),
}
}
func (this *HTTPRateWriter) Header() http.Header {
return this.parentWriter.Header()
}
func (this *HTTPRateWriter) Write(data []byte) (int, error) {
if len(data) == 0 {
return 0, nil
}
var left = this.rateBytes - this.lastBytes
if left <= 0 {
if this.timeCost > 0 && this.timeCost < 1*time.Second {
time.Sleep(1*time.Second - this.timeCost)
}
this.lastBytes = 0
this.timeCost = 0
return this.Write(data)
}
var n = len(data)
// n <= left
if n <= left {
this.lastBytes += n
var before = time.Now()
defer func() {
this.timeCost += time.Since(before)
}()
return this.parentWriter.Write(data)
}
// n > left
var before = time.Now()
result, err := this.parentWriter.Write(data[:left])
this.timeCost += time.Since(before)
if err != nil {
return result, err
}
this.lastBytes += left
return this.Write(data[left:])
}
func (this *HTTPRateWriter) WriteHeader(statusCode int) {
this.parentWriter.WriteHeader(statusCode)
}
// Hijack Hijack
func (this *HTTPRateWriter) Hijack() (conn net.Conn, buf *bufio.ReadWriter, err error) {
if this.parentWriter == nil {
return
}
hijack, ok := this.parentWriter.(http.Hijacker)
if ok {
return hijack.Hijack()
}
return
}
// Flush Flush
func (this *HTTPRateWriter) Flush() {
if this.parentWriter == nil {
return
}
flusher, ok := this.parentWriter.(http.Flusher)
if ok {
flusher.Flush()
return
}
}

View File

@@ -56,13 +56,16 @@ type Node struct {
maxCPU int32
maxThreads int
timezone string
updatingServerMap map[int64]*serverconfigs.ServerConfig
}
func NewNode() *Node {
return &Node{
sock: gosock.NewTmpSock(teaconst.ProcessName),
maxThreads: -1,
maxCPU: -1,
sock: gosock.NewTmpSock(teaconst.ProcessName),
maxThreads: -1,
maxCPU: -1,
updatingServerMap: map[int64]*serverconfigs.ServerConfig{},
}
}
@@ -264,7 +267,7 @@ func (this *Node) loop() error {
defer tr.End()
// 检查api.yaml是否存在
apiConfigFile := Tea.ConfigFile("api.yaml")
var apiConfigFile = Tea.ConfigFile("api.yaml")
_, err := os.Stat(apiConfigFile)
if err != nil {
return nil
@@ -275,7 +278,7 @@ func (this *Node) loop() error {
return errors.New("create rpc client failed: " + err.Error())
}
nodeCtx := rpcClient.Context()
var nodeCtx = rpcClient.Context()
tasksResp, err := rpcClient.NodeTaskRPC().FindNodeTasks(nodeCtx, &pb.FindNodeTasksRequest{})
if err != nil {
return errors.New("read node tasks failed: " + err.Error())
@@ -295,11 +298,15 @@ func (this *Node) loop() error {
return err
}
case "configChanged":
if !task.IsPrimary {
// 我们等等主节点配置准备完毕
time.Sleep(2 * time.Second)
if task.ServerId > 0 {
err = this.syncServerConfig(task.ServerId)
} else {
if !task.IsPrimary {
// 我们等等主节点配置准备完毕
time.Sleep(2 * time.Second)
}
err = this.syncConfig(task.Version)
}
err := this.syncConfig(task.Version)
if err != nil {
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id,
@@ -316,6 +323,7 @@ func (this *Node) loop() error {
if err != nil {
return err
}
case "nodeVersionChanged":
goman.New(func() {
sharedUpgradeManager.Start()
@@ -419,22 +427,8 @@ func (this *Node) syncConfig(taskVersion int64) error {
remotelogs.Println("NODE", "loading config ...")
}
nodeconfigs.ResetNodeConfig(nodeConfig)
caches.SharedManager.MaxDiskCapacity = nodeConfig.MaxCacheDiskCapacity
caches.SharedManager.MaxMemoryCapacity = nodeConfig.MaxCacheMemoryCapacity
if len(nodeConfig.HTTPCachePolicies) > 0 {
caches.SharedManager.UpdatePolicies(nodeConfig.HTTPCachePolicies)
} else {
caches.SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{})
}
sharedWAFManager.UpdatePolicies(nodeConfig.FindAllFirewallPolicies())
iplibrary.SharedActionManager.UpdateActions(nodeConfig.FirewallActions)
sharedNodeConfig = nodeConfig
this.onReload(nodeConfig)
metrics.SharedManager.Update(nodeConfig.MetricItems)
// 发送事件
events.Notify(events.EventReload)
@@ -447,30 +441,96 @@ func (this *Node) syncConfig(taskVersion int64) error {
return nil
}
// 读取单个服务配置
func (this *Node) syncServerConfig(serverId int64) error {
rpcClient, err := rpc.SharedRPC()
if err != nil {
return err
}
resp, err := rpcClient.ServerRPC().ComposeServerConfig(rpcClient.Context(), &pb.ComposeServerConfigRequest{ServerId: serverId})
if err != nil {
return err
}
this.locker.Lock()
defer this.locker.Unlock()
if len(resp.ServerConfigJSON) == 0 {
this.updatingServerMap[serverId] = nil
} else {
var config = &serverconfigs.ServerConfig{}
err = json.Unmarshal(resp.ServerConfigJSON, config)
if err != nil {
return err
}
this.updatingServerMap[serverId] = config
}
return nil
}
// 启动同步计时器
func (this *Node) startSyncTimer() {
// TODO 这个时间间隔可以自行设置
ticker := time.NewTicker(60 * time.Second)
var taskTicker = time.NewTicker(60 * time.Second)
var serverChangeTicker = time.NewTicker(5 * time.Second)
events.OnKey(events.EventQuit, this, func() {
remotelogs.Println("NODE", "quit sync timer")
ticker.Stop()
taskTicker.Stop()
serverChangeTicker.Stop()
})
goman.New(func() {
for {
select {
case <-ticker.C:
case <-taskTicker.C: // 定期执行
err := this.loop()
if err != nil {
remotelogs.Error("NODE", "sync config error: "+err.Error())
continue
}
case <-nodeTaskNotify:
case <-serverChangeTicker.C: // 服务变化
this.locker.Lock()
if len(this.updatingServerMap) > 0 {
var updatingServerMap = this.updatingServerMap
this.updatingServerMap = map[int64]*serverconfigs.ServerConfig{}
newNodeConfig, err := nodeconfigs.CloneNodeConfig(sharedNodeConfig)
if err != nil {
remotelogs.Error("NODE", "apply server config error: "+err.Error())
continue
}
for serverId, serverConfig := range updatingServerMap {
if serverConfig != nil {
newNodeConfig.AddServer(serverConfig)
} else {
newNodeConfig.RemoveServer(serverId)
}
}
err, serverErrors := newNodeConfig.Init()
if err != nil {
remotelogs.Error("NODE", "apply server config error: "+err.Error())
continue
}
if len(serverErrors) > 0 {
for _, serverErr := range serverErrors {
remotelogs.ServerError(serverErr.Id, "NODE", serverErr.Message, nodeconfigs.NodeLogTypeServerConfigInitFailed, maps.Map{})
}
}
this.onReload(newNodeConfig)
err = sharedListenerManager.Start(newNodeConfig)
if err != nil {
remotelogs.Error("NODE", "apply server config error: "+err.Error())
}
}
this.locker.Unlock()
case <-nodeTaskNotify: // 有新的更新任务
err := this.loop()
if err != nil {
remotelogs.Error("NODE", "sync config error: "+err.Error())
continue
}
case <-nodeConfigChangedNotify:
case <-nodeConfigChangedNotify: // 节点变化通知
err := this.syncConfig(0)
if err != nil {
remotelogs.Error("NODE", "sync config error: "+err.Error())
@@ -701,6 +761,25 @@ func (this *Node) listenSock() error {
// 重载配置调用
func (this *Node) onReload(config *nodeconfigs.NodeConfig) {
nodeconfigs.ResetNodeConfig(config)
sharedNodeConfig = config
// 缓存策略
caches.SharedManager.MaxDiskCapacity = config.MaxCacheDiskCapacity
caches.SharedManager.MaxMemoryCapacity = config.MaxCacheMemoryCapacity
if len(config.HTTPCachePolicies) > 0 {
caches.SharedManager.UpdatePolicies(config.HTTPCachePolicies)
} else {
caches.SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{})
}
// WAF策略
sharedWAFManager.UpdatePolicies(config.FindAllFirewallPolicies())
iplibrary.SharedActionManager.UpdateActions(config.FirewallActions)
// 统计指标
metrics.SharedManager.Update(config.MetricItems)
// max cpu
if config.MaxCPU != this.maxCPU {
if config.MaxCPU > 0 && config.MaxCPU < int32(runtime.NumCPU()) {

View File

@@ -0,0 +1,26 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package readers
import "io"
type BytesCounterReader struct {
rawReader io.Reader
count int64
}
func NewBytesCounterReader(rawReader io.Reader) *BytesCounterReader {
return &BytesCounterReader{
rawReader: rawReader,
}
}
func (this *BytesCounterReader) Read(p []byte) (n int, err error) {
n, err = this.rawReader.Read(p)
this.count += int64(n)
return
}
func (this *BytesCounterReader) TotalBytes() int64 {
return this.count
}

View File

@@ -0,0 +1,34 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package readers
import "io"
type FilterFunc = func(p []byte, err error) error
type FilterReader struct {
rawReader io.Reader
filters []FilterFunc
}
func NewFilterReader(rawReader io.Reader) *FilterReader {
return &FilterReader{
rawReader: rawReader,
}
}
func (this *FilterReader) Add(filter FilterFunc) {
this.filters = append(this.filters, filter)
}
func (this *FilterReader) Read(p []byte) (n int, err error) {
n, err = this.rawReader.Read(p)
for _, filter := range this.filters {
filterErr := filter(p[:n], err)
if filterErr != nil {
err = filterErr
return
}
}
return
}

View File

@@ -0,0 +1,41 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package readers_test
import (
"bytes"
"errors"
"github.com/TeaOSLab/EdgeNode/internal/utils/readers"
"testing"
)
func TestNewFilterReader(t *testing.T) {
var reader = readers.NewFilterReader(bytes.NewBufferString("0123456789"))
reader.Add(func(p []byte, err error) error {
t.Log("filter1:", string(p), err)
return nil
})
reader.Add(func(p []byte, err error) error {
t.Log("filter2:", string(p), err)
if string(p) == "345" {
return errors.New("end")
}
return nil
})
reader.Add(func(p []byte, err error) error {
t.Log("filter3:", string(p), err)
return nil
})
var buf = make([]byte, 3)
for {
n, err := reader.Read(buf)
if n > 0 {
t.Log(string(buf[:n]))
}
if err != nil {
t.Log(err)
break
}
}
}

View File

@@ -0,0 +1,52 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package readers
import (
"io"
)
type TeeReader struct {
r io.Reader
w io.Writer
onFail func(err error)
onEOF func()
}
func NewTeeReader(reader io.Reader, writer io.Writer) *TeeReader {
return &TeeReader{
r: reader,
w: writer,
}
}
func (this *TeeReader) Read(p []byte) (n int, err error) {
n, err = this.r.Read(p)
if n > 0 {
_, wErr := this.w.Write(p[:n])
if err == nil && wErr != nil {
err = wErr
}
}
if err != nil {
if err == io.EOF {
if this.onEOF != nil {
this.onEOF()
}
} else {
if this.onFail != nil {
this.onFail(err)
}
}
}
return
}
func (this *TeeReader) OnFail(onFail func(err error)) {
this.onFail = onFail
}
func (this *TeeReader) OnEOF(onEOF func()) {
this.onEOF = onEOF
}

View File

@@ -0,0 +1,58 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package readers
import "io"
type TeeReaderCloser struct {
r io.Reader
w io.Writer
onFail func(err error)
onEOF func()
}
func NewTeeReaderCloser(reader io.Reader, writer io.Writer) *TeeReaderCloser {
return &TeeReaderCloser{
r: reader,
w: writer,
}
}
func (this *TeeReaderCloser) Read(p []byte) (n int, err error) {
n, err = this.r.Read(p)
if n > 0 {
_, wErr := this.w.Write(p[:n])
if err == nil && wErr != nil {
err = wErr
}
}
if err != nil {
if err == io.EOF {
if this.onEOF != nil {
this.onEOF()
}
} else {
if this.onFail != nil {
this.onFail(err)
}
}
}
return
}
func (this *TeeReaderCloser) Close() error {
r, ok := this.r.(io.Closer)
if ok {
return r.Close()
}
return nil
}
func (this *TeeReaderCloser) OnFail(onFail func(err error)) {
this.onFail = onFail
}
func (this *TeeReaderCloser) OnEOF(onEOF func()) {
this.onEOF = onEOF
}

View File

@@ -0,0 +1,28 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package writers
import "io"
type BytesCounterWriter struct {
writer io.Writer
count int64
}
func NewBytesCounterWriter(rawWriter io.Writer) *BytesCounterWriter {
return &BytesCounterWriter{writer: rawWriter}
}
func (this *BytesCounterWriter) Write(p []byte) (n int, err error) {
n, err = this.writer.Write(p)
this.count += int64(n)
return
}
func (this *BytesCounterWriter) Close() error {
return nil
}
func (this *BytesCounterWriter) TotalBytes() int64 {
return this.count
}

View File

@@ -0,0 +1,87 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package writers
import (
"github.com/iwind/TeaGo/types"
"io"
"time"
)
// RateLimitWriter 限速写入
type RateLimitWriter struct {
rawWriter io.WriteCloser
rateBytes int
written int
before time.Time
}
func NewRateLimitWriter(rawWriter io.WriteCloser, rateBytes int64) io.WriteCloser {
return &RateLimitWriter{
rawWriter: rawWriter,
rateBytes: types.Int(rateBytes),
before: time.Now(),
}
}
func (this *RateLimitWriter) Write(p []byte) (n int, err error) {
if this.rateBytes <= 0 {
return this.write(p)
}
var size = len(p)
if size == 0 {
return 0, nil
}
if size <= this.rateBytes {
return this.write(p)
}
for {
size = len(p)
var limit = this.rateBytes
if limit > size {
limit = size
}
n1, wErr := this.write(p[:limit])
n += n1
if wErr != nil {
return n, wErr
}
if size > limit {
p = p[limit:]
} else {
break
}
}
return
}
func (this *RateLimitWriter) Close() error {
return this.rawWriter.Close()
}
func (this *RateLimitWriter) write(p []byte) (n int, err error) {
n, err = this.rawWriter.Write(p)
if err == nil {
this.written += n
if this.written >= this.rateBytes {
var duration = 1*time.Second - time.Now().Sub(this.before)
if duration > 0 {
time.Sleep(duration)
}
this.before = time.Now()
this.written = 0
}
}
return
}

View File

@@ -0,0 +1,41 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package writers
import (
"sync"
"testing"
"time"
)
func TestSleep(t *testing.T) {
var count = 2000
var wg = sync.WaitGroup{}
wg.Add(count)
var before = time.Now()
for i := 0; i < count; i++ {
go func() {
defer wg.Done()
time.Sleep(1 * time.Second)
}()
}
wg.Wait()
t.Log(time.Since(before).Seconds()*1000, "ms")
}
func TestTimeout(t *testing.T) {
var count = 2000
var wg = sync.WaitGroup{}
wg.Add(count)
var before = time.Now()
for i := 0; i < count; i++ {
go func() {
defer wg.Done()
var timeout = time.NewTimer(1 * time.Second)
<-timeout.C
}()
}
wg.Wait()
t.Log(time.Since(before).Seconds()*1000, "ms")
}

View File

@@ -0,0 +1,51 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package writers
import "io"
type TeeWriterCloser struct {
primaryW io.WriteCloser
secondaryW io.WriteCloser
onFail func(err error)
}
func NewTeeWriterCloser(primaryW io.WriteCloser, secondaryW io.WriteCloser) *TeeWriterCloser {
return &TeeWriterCloser{
primaryW: primaryW,
secondaryW: secondaryW,
}
}
func (this *TeeWriterCloser) Write(p []byte) (n int, err error) {
{
n, err = this.primaryW.Write(p)
if err != nil {
if this.onFail != nil {
this.onFail(err)
}
}
}
{
_, err2 := this.secondaryW.Write(p)
if err2 != nil {
if this.onFail != nil {
this.onFail(err2)
}
}
}
return
}
func (this *TeeWriterCloser) Close() error {
// 这里不关闭secondary
return this.primaryW.Close()
}
func (this *TeeWriterCloser) OnFail(onFail func(err error)) {
this.onFail = onFail
}

View File

@@ -1,7 +1,9 @@
package waf
import (
"errors"
"github.com/TeaOSLab/EdgeNode/internal/waf/requests"
"github.com/iwind/TeaGo/types"
)
// rule group
@@ -30,7 +32,7 @@ func (this *RuleGroup) Init(waf *WAF) error {
for _, set := range this.RuleSets {
err := set.Init(waf)
if err != nil {
return err
return errors.New("init set '" + types.String(set.Id) + "' failed: " + err.Error())
}
}
}

View File

@@ -1,12 +1,14 @@
package waf
import (
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/waf/requests"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/types"
"net/http"
"sort"
)
@@ -47,7 +49,7 @@ func (this *RuleSet) Init(waf *WAF) error {
for _, rule := range this.Rules {
err := rule.Init()
if err != nil {
return err
return errors.New("init rule '" + rule.Param + " " + rule.Operator + " " + types.String(rule.Value) + "' failed: " + err.Error())
}
}
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/waf/requests"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/files"
"github.com/iwind/TeaGo/types"
"gopkg.in/yaml.v3"
"io/ioutil"
"net/http"
@@ -89,7 +90,7 @@ func (this *WAF) Init() (resultErrors []error) {
err := group.Init(this)
if err != nil {
// 这里我们不阻止其他规则正常加入
resultErrors = append(resultErrors, err)
resultErrors = append(resultErrors, errors.New("init group '"+types.String(group.Id)+"' failed: "+err.Error()))
}
}
}