Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d40bc4e72b | ||
|
|
94d0fc7e88 | ||
|
|
ceaeba7089 | ||
|
|
a1e868bf29 | ||
|
|
e60af85819 | ||
|
|
7bd24fcc81 | ||
|
|
4331223916 | ||
|
|
f50113517a | ||
|
|
6d6e25f298 | ||
|
|
69c89fda48 | ||
|
|
9a56671457 |
@@ -11,7 +11,7 @@ func TestOpenFilePool_Get(t *testing.T) {
|
||||
var pool = caches.NewOpenFilePool("a")
|
||||
t.Log(pool.Filename())
|
||||
t.Log(pool.Get())
|
||||
t.Log(pool.Put(caches.NewOpenFile(nil, nil)))
|
||||
t.Log(pool.Put(caches.NewOpenFile(nil, nil, []byte{})))
|
||||
t.Log(pool.Get())
|
||||
t.Log(pool.Get())
|
||||
}
|
||||
|
||||
@@ -24,8 +24,7 @@ type FileReader struct {
|
||||
bodySize int64
|
||||
bodyOffset int64
|
||||
|
||||
bodyBufLen int
|
||||
bodyBuf []byte
|
||||
isClosed bool
|
||||
}
|
||||
|
||||
func NewFileReader(fp *os.File) *FileReader {
|
||||
@@ -181,10 +180,6 @@ func (this *FileReader) ReadHeader(buf []byte, callback ReaderFunc) error {
|
||||
}
|
||||
headerSize -= n
|
||||
} else {
|
||||
if n > headerSize {
|
||||
this.bodyBuf = buf[headerSize:]
|
||||
this.bodyBufLen = n - headerSize
|
||||
}
|
||||
_, e := callback(headerSize)
|
||||
if e != nil {
|
||||
isOk = true
|
||||
@@ -203,6 +198,12 @@ func (this *FileReader) ReadHeader(buf []byte, callback ReaderFunc) error {
|
||||
|
||||
isOk = true
|
||||
|
||||
// 移动到Body位置
|
||||
_, err = this.fp.Seek(this.bodyOffset, io.SeekStart)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -215,27 +216,7 @@ func (this *FileReader) ReadBody(buf []byte, callback ReaderFunc) error {
|
||||
}
|
||||
}()
|
||||
|
||||
offset := this.bodyOffset
|
||||
|
||||
// 直接返回从Header中剩余的
|
||||
if this.bodyBufLen > 0 && len(buf) >= this.bodyBufLen {
|
||||
offset += int64(this.bodyBufLen)
|
||||
|
||||
copy(buf, this.bodyBuf)
|
||||
isOk = true
|
||||
|
||||
goNext, err := callback(this.bodyBufLen)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !goNext {
|
||||
return nil
|
||||
}
|
||||
|
||||
if this.bodySize <= int64(this.bodyBufLen) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
var offset = this.bodyOffset
|
||||
|
||||
// 开始读Body部分
|
||||
_, err := this.fp.Seek(offset, io.SeekStart)
|
||||
@@ -269,32 +250,9 @@ func (this *FileReader) ReadBody(buf []byte, callback ReaderFunc) error {
|
||||
}
|
||||
|
||||
func (this *FileReader) Read(buf []byte) (n int, err error) {
|
||||
var isOk = false
|
||||
|
||||
defer func() {
|
||||
if !isOk {
|
||||
_ = this.discard()
|
||||
}
|
||||
}()
|
||||
|
||||
// 直接返回从Header中剩余的
|
||||
if this.bodyBufLen > 0 && len(buf) >= this.bodyBufLen {
|
||||
copy(buf, this.bodyBuf)
|
||||
isOk = true
|
||||
n = this.bodyBufLen
|
||||
|
||||
if this.bodySize <= int64(this.bodyBufLen) {
|
||||
err = io.EOF
|
||||
return
|
||||
}
|
||||
|
||||
this.bodyBufLen = 0
|
||||
return
|
||||
}
|
||||
|
||||
n, err = this.fp.Read(buf)
|
||||
if err == nil || err == io.EOF {
|
||||
isOk = true
|
||||
if err != nil && err != io.EOF {
|
||||
_ = this.discard()
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -370,6 +328,11 @@ func (this *FileReader) ReadBodyRange(buf []byte, start int64, end int64, callba
|
||||
|
||||
func (this *FileReader) Close() error {
|
||||
if this.openFileCache != nil {
|
||||
if this.isClosed {
|
||||
return nil
|
||||
}
|
||||
this.isClosed = true
|
||||
|
||||
if this.openFile != nil {
|
||||
this.openFileCache.Put(this.fp.Name(), this.openFile)
|
||||
} else {
|
||||
@@ -391,5 +354,6 @@ func (this *FileReader) readToBuff(fp *os.File, buf []byte) (ok bool, err error)
|
||||
|
||||
func (this *FileReader) discard() error {
|
||||
_ = this.fp.Close()
|
||||
this.isClosed = true
|
||||
return os.Remove(this.fp.Name())
|
||||
}
|
||||
|
||||
@@ -325,10 +325,11 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool)
|
||||
}
|
||||
|
||||
// OpenWriter 打开缓存文件等待写入
|
||||
func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
|
||||
func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, size int64) (Writer, error) {
|
||||
// 先尝试内存缓存
|
||||
if this.memoryStorage != nil {
|
||||
writer, err := this.memoryStorage.OpenWriter(key, expiredAt, status)
|
||||
// 我们限定仅小文件优先存在内存中
|
||||
if this.memoryStorage != nil && size > 0 && size < 32*1024*1024 {
|
||||
writer, err := this.memoryStorage.OpenWriter(key, expiredAt, status, size)
|
||||
if err == nil {
|
||||
return writer, nil
|
||||
}
|
||||
@@ -904,7 +905,7 @@ func (this *FileStorage) hotLoop() {
|
||||
continue
|
||||
}
|
||||
|
||||
writer, err := this.memoryStorage.openWriter(item.Key, item.ExpiresAt, item.Status, false)
|
||||
writer, err := this.memoryStorage.openWriter(item.Key, item.ExpiresAt, item.Status, reader.BodySize(), false)
|
||||
if err != nil {
|
||||
if !CanIgnoreErr(err) {
|
||||
remotelogs.Error("CACHE", "transfer hot item failed: "+err.Error())
|
||||
|
||||
@@ -62,7 +62,7 @@ func TestFileStorage_OpenWriter(t *testing.T) {
|
||||
|
||||
header := []byte("Header")
|
||||
body := []byte("This is Body")
|
||||
writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200)
|
||||
writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -104,7 +104,7 @@ func TestFileStorage_OpenWriter_HTTP(t *testing.T) {
|
||||
t.Log(time.Since(now).Seconds()*1000, "ms")
|
||||
}()
|
||||
|
||||
writer, err := storage.OpenWriter("my-http-response", time.Now().Unix()+86400, 200)
|
||||
writer, err := storage.OpenWriter("my-http-response", time.Now().Unix()+86400, 200, -1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -177,7 +177,7 @@ func TestFileStorage_Concurrent_Open_DifferentFile(t *testing.T) {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
|
||||
writer, err := storage.OpenWriter("abc"+strconv.Itoa(i), time.Now().Unix()+3600, 200)
|
||||
writer, err := storage.OpenWriter("abc"+strconv.Itoa(i), time.Now().Unix()+3600, 200, -1)
|
||||
if err != nil {
|
||||
if err != ErrFileIsWriting {
|
||||
t.Fatal(err)
|
||||
@@ -229,7 +229,7 @@ func TestFileStorage_Concurrent_Open_SameFile(t *testing.T) {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
|
||||
writer, err := storage.OpenWriter("abc"+strconv.Itoa(0), time.Now().Unix()+3600, 200)
|
||||
writer, err := storage.OpenWriter("abc"+strconv.Itoa(0), time.Now().Unix()+3600, 200, -1)
|
||||
if err != nil {
|
||||
if err != ErrFileIsWriting {
|
||||
t.Fatal(err)
|
||||
|
||||
@@ -13,7 +13,7 @@ type StorageInterface interface {
|
||||
OpenReader(key string, useStale bool) (reader Reader, err error)
|
||||
|
||||
// OpenWriter 打开缓存写入器等待写入
|
||||
OpenWriter(key string, expiredAt int64, status int) (Writer, error)
|
||||
OpenWriter(key string, expiredAt int64, status int, size int64) (Writer, error)
|
||||
|
||||
// Delete 删除某个键值对应的缓存
|
||||
Delete(key string) error
|
||||
|
||||
@@ -145,11 +145,11 @@ func (this *MemoryStorage) OpenReader(key string, useStale bool) (Reader, error)
|
||||
}
|
||||
|
||||
// OpenWriter 打开缓存写入器等待写入
|
||||
func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
|
||||
return this.openWriter(key, expiredAt, status, true)
|
||||
func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int, size int64) (Writer, error) {
|
||||
return this.openWriter(key, expiredAt, status, size, true)
|
||||
}
|
||||
|
||||
func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, isDirty bool) (Writer, error) {
|
||||
func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, size int64, isDirty bool) (Writer, error) {
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
|
||||
@@ -182,7 +182,10 @@ func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, i
|
||||
return nil, NewCapacityError("write memory cache failed: too many keys in cache storage")
|
||||
}
|
||||
capacityBytes := this.memoryCapacityBytes()
|
||||
if capacityBytes > 0 && capacityBytes <= this.totalSize {
|
||||
if size < 0 {
|
||||
size = 0
|
||||
}
|
||||
if capacityBytes > 0 && capacityBytes <= this.totalSize+size {
|
||||
return nil, NewCapacityError("write memory cache failed: over memory size: " + strconv.FormatInt(capacityBytes, 10) + ", current size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
|
||||
}
|
||||
|
||||
@@ -384,7 +387,7 @@ func (this *MemoryStorage) flushItem(key string) {
|
||||
return
|
||||
}
|
||||
|
||||
writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status)
|
||||
writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status, -1)
|
||||
if err != nil {
|
||||
if !CanIgnoreErr(err) {
|
||||
remotelogs.Error("CACHE", "flush items failed: open writer failed: "+err.Error())
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
func TestMemoryStorage_OpenWriter(t *testing.T) {
|
||||
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
|
||||
|
||||
writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200)
|
||||
writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -62,7 +62,7 @@ func TestMemoryStorage_OpenWriter(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
writer, err = storage.OpenWriter("abc", time.Now().Unix()+60, 200)
|
||||
writer, err = storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -103,7 +103,7 @@ func TestMemoryStorage_OpenReaderLock(t *testing.T) {
|
||||
func TestMemoryStorage_Delete(t *testing.T) {
|
||||
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
|
||||
{
|
||||
writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200)
|
||||
writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -111,7 +111,7 @@ func TestMemoryStorage_Delete(t *testing.T) {
|
||||
t.Log(len(storage.valuesMap))
|
||||
}
|
||||
{
|
||||
writer, err := storage.OpenWriter("abc1", time.Now().Unix()+60, 200)
|
||||
writer, err := storage.OpenWriter("abc1", time.Now().Unix()+60, 200, -1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -126,7 +126,7 @@ func TestMemoryStorage_Stat(t *testing.T) {
|
||||
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
|
||||
expiredAt := time.Now().Unix() + 60
|
||||
{
|
||||
writer, err := storage.OpenWriter("abc", expiredAt, 200)
|
||||
writer, err := storage.OpenWriter("abc", expiredAt, 200, -1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -139,7 +139,7 @@ func TestMemoryStorage_Stat(t *testing.T) {
|
||||
})
|
||||
}
|
||||
{
|
||||
writer, err := storage.OpenWriter("abc1", expiredAt, 200)
|
||||
writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -163,7 +163,7 @@ func TestMemoryStorage_CleanAll(t *testing.T) {
|
||||
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
|
||||
expiredAt := time.Now().Unix() + 60
|
||||
{
|
||||
writer, err := storage.OpenWriter("abc", expiredAt, 200)
|
||||
writer, err := storage.OpenWriter("abc", expiredAt, 200, -1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -175,7 +175,7 @@ func TestMemoryStorage_CleanAll(t *testing.T) {
|
||||
})
|
||||
}
|
||||
{
|
||||
writer, err := storage.OpenWriter("abc1", expiredAt, 200)
|
||||
writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -198,7 +198,7 @@ func TestMemoryStorage_Purge(t *testing.T) {
|
||||
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
|
||||
expiredAt := time.Now().Unix() + 60
|
||||
{
|
||||
writer, err := storage.OpenWriter("abc", expiredAt, 200)
|
||||
writer, err := storage.OpenWriter("abc", expiredAt, 200, -1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -210,7 +210,7 @@ func TestMemoryStorage_Purge(t *testing.T) {
|
||||
})
|
||||
}
|
||||
{
|
||||
writer, err := storage.OpenWriter("abc1", expiredAt, 200)
|
||||
writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -241,7 +241,7 @@ func TestMemoryStorage_Expire(t *testing.T) {
|
||||
for i := 0; i < 1000; i++ {
|
||||
expiredAt := time.Now().Unix() + int64(rands.Int(0, 60))
|
||||
key := "abc" + strconv.Itoa(i)
|
||||
writer, err := storage.OpenWriter(key, expiredAt, 200)
|
||||
writer, err := storage.OpenWriter(key, expiredAt, 200, -1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -1,76 +0,0 @@
|
||||
package caches
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/compressions"
|
||||
)
|
||||
|
||||
type compressionWriter struct {
|
||||
rawWriter Writer
|
||||
writer compressions.Writer
|
||||
key string
|
||||
expiredAt int64
|
||||
}
|
||||
|
||||
func NewCompressionWriter(gw Writer, cpWriter compressions.Writer, key string, expiredAt int64) Writer {
|
||||
return &compressionWriter{
|
||||
rawWriter: gw,
|
||||
writer: cpWriter,
|
||||
key: key,
|
||||
expiredAt: expiredAt,
|
||||
}
|
||||
}
|
||||
|
||||
func (this *compressionWriter) WriteHeader(data []byte) (n int, err error) {
|
||||
return this.writer.Write(data)
|
||||
}
|
||||
|
||||
// WriteHeaderLength 写入Header长度数据
|
||||
func (this *compressionWriter) WriteHeaderLength(headerLength int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// WriteBodyLength 写入Body长度数据
|
||||
func (this *compressionWriter) WriteBodyLength(bodyLength int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *compressionWriter) Write(data []byte) (n int, err error) {
|
||||
return this.writer.Write(data)
|
||||
}
|
||||
|
||||
func (this *compressionWriter) Close() error {
|
||||
err := this.writer.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return this.rawWriter.Close()
|
||||
}
|
||||
|
||||
func (this *compressionWriter) Discard() error {
|
||||
err := this.writer.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return this.rawWriter.Discard()
|
||||
}
|
||||
|
||||
func (this *compressionWriter) Key() string {
|
||||
return this.key
|
||||
}
|
||||
|
||||
func (this *compressionWriter) ExpiredAt() int64 {
|
||||
return this.expiredAt
|
||||
}
|
||||
|
||||
func (this *compressionWriter) HeaderSize() int64 {
|
||||
return this.rawWriter.HeaderSize()
|
||||
}
|
||||
|
||||
func (this *compressionWriter) BodySize() int64 {
|
||||
return this.rawWriter.BodySize()
|
||||
}
|
||||
|
||||
// ItemType 内容类型
|
||||
func (this *compressionWriter) ItemType() ItemType {
|
||||
return this.rawWriter.ItemType()
|
||||
}
|
||||
68
internal/compressions/reader_gzip_test.go
Normal file
68
internal/compressions/reader_gzip_test.go
Normal file
@@ -0,0 +1,68 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package compressions
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
||||
"io"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestGzipReader(t *testing.T) {
|
||||
fp, err := os.Open("/Users/WorkSpace/EdgeProject/EdgeCache/p43/36/7e/367e02720713fe05b66573a1d69b4f0a.cache")
|
||||
if err != nil {
|
||||
// not fatal
|
||||
t.Log(err)
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
_ = fp.Close()
|
||||
}()
|
||||
|
||||
var buf = make([]byte, 32*1024)
|
||||
cacheReader := caches.NewFileReader(fp)
|
||||
err = cacheReader.Init()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var headerBuf = []byte{}
|
||||
err = cacheReader.ReadHeader(buf, func(n int) (goNext bool, err error) {
|
||||
headerBuf = append(headerBuf, buf[:n]...)
|
||||
for {
|
||||
nIndex := bytes.Index(headerBuf, []byte{'\n'})
|
||||
if nIndex >= 0 {
|
||||
row := headerBuf[:nIndex]
|
||||
spaceIndex := bytes.Index(row, []byte{':'})
|
||||
if spaceIndex <= 0 {
|
||||
return false, errors.New("invalid header '" + string(row) + "'")
|
||||
}
|
||||
|
||||
headerBuf = headerBuf[nIndex+1:]
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
|
||||
reader, err := NewGzipReader(cacheReader)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for {
|
||||
n, err := reader.Read(buf)
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
t.Log(string(buf[:n]))
|
||||
_ = n
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
package teaconst
|
||||
|
||||
const (
|
||||
Version = "0.4.0"
|
||||
Version = "0.4.1"
|
||||
|
||||
ProductName = "Edge Node"
|
||||
ProcessName = "edge-node"
|
||||
|
||||
@@ -182,7 +182,7 @@ func (this *APIStream) handleWriteCache(message *pb.NodeStreamMessage) error {
|
||||
}
|
||||
|
||||
expiredAt := time.Now().Unix() + msg.LifeSeconds
|
||||
writer, err := storage.OpenWriter(msg.Key, expiredAt, 200)
|
||||
writer, err := storage.OpenWriter(msg.Key, expiredAt, 200, int64(len(msg.Value)))
|
||||
if err != nil {
|
||||
this.replyFail(message.RequestId, "prepare writing failed: "+err.Error())
|
||||
return err
|
||||
@@ -462,7 +462,7 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error {
|
||||
}
|
||||
|
||||
expiredAt := time.Now().Unix() + 8600
|
||||
writer, err := storage.OpenWriter(key, expiredAt, 200) // TODO 可以设置缓存过期时间
|
||||
writer, err := storage.OpenWriter(key, expiredAt, 200, resp.ContentLength) // TODO 可以设置缓存过期时间
|
||||
if err != nil {
|
||||
locker.Lock()
|
||||
errorMessages = append(errorMessages, "open cache writer failed: "+key+": "+err.Error())
|
||||
|
||||
@@ -315,12 +315,12 @@ func (this *HTTPRequest) doEnd() {
|
||||
// TODO 增加Header统计,考虑从Conn中读取
|
||||
if this.ReqServer != nil {
|
||||
if this.isCached {
|
||||
stats.SharedTrafficStatManager.Add(this.ReqServer.Id, this.ReqHost, this.writer.sentBodyBytes, this.writer.sentBodyBytes, 1, 1, 0, 0, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId())
|
||||
stats.SharedTrafficStatManager.Add(this.ReqServer.Id, this.ReqHost, this.writer.SentBodyBytes(), this.writer.SentBodyBytes(), 1, 1, 0, 0, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId())
|
||||
} else {
|
||||
if this.isAttack {
|
||||
stats.SharedTrafficStatManager.Add(this.ReqServer.Id, this.ReqHost, this.writer.sentBodyBytes, 0, 1, 0, 1, this.writer.sentBodyBytes, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId())
|
||||
stats.SharedTrafficStatManager.Add(this.ReqServer.Id, this.ReqHost, this.writer.SentBodyBytes(), 0, 1, 0, 1, this.writer.SentBodyBytes(), this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId())
|
||||
} else {
|
||||
stats.SharedTrafficStatManager.Add(this.ReqServer.Id, this.ReqHost, this.writer.sentBodyBytes, 0, 1, 0, 0, 0, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId())
|
||||
stats.SharedTrafficStatManager.Add(this.ReqServer.Id, this.ReqHost, this.writer.SentBodyBytes(), 0, 1, 0, 0, 0, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/compressions"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||||
@@ -22,7 +21,7 @@ import (
|
||||
func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
||||
this.cacheCanTryStale = false
|
||||
|
||||
cachePolicy := this.ReqServer.HTTPCachePolicy
|
||||
var cachePolicy = this.ReqServer.HTTPCachePolicy
|
||||
if cachePolicy == nil || !cachePolicy.IsOn {
|
||||
return
|
||||
}
|
||||
@@ -162,11 +161,15 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
||||
var err error
|
||||
|
||||
// 是否优先检查WebP
|
||||
var isWebP = false
|
||||
if this.web.WebP != nil &&
|
||||
this.web.WebP.IsOn &&
|
||||
this.web.WebP.MatchRequest(filepath.Ext(this.Path()), this.Format) &&
|
||||
this.web.WebP.MatchAccept(this.requestHeader("Accept")) {
|
||||
reader, _ = storage.OpenReader(key+webpSuffix, useStale)
|
||||
if reader != nil {
|
||||
isWebP = true
|
||||
}
|
||||
}
|
||||
|
||||
// 检查正常的文件
|
||||
@@ -184,13 +187,16 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
||||
}
|
||||
|
||||
if !this.canIgnore(err) {
|
||||
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
|
||||
remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: open cache failed: "+err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_ = reader.Close()
|
||||
if !this.writer.DelayRead() {
|
||||
_ = reader.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
if useStale {
|
||||
@@ -231,7 +237,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
||||
})
|
||||
if err != nil {
|
||||
if !this.canIgnore(err) {
|
||||
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
|
||||
remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: read header failed: "+err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -257,7 +263,11 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
||||
var eTag = ""
|
||||
var lastModifiedAt = reader.LastModified()
|
||||
if lastModifiedAt > 0 {
|
||||
eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "\""
|
||||
if isWebP {
|
||||
eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "_webp" + "\""
|
||||
} else {
|
||||
eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "\""
|
||||
}
|
||||
respHeader.Del("Etag")
|
||||
respHeader["ETag"] = []string{eTag}
|
||||
}
|
||||
@@ -357,7 +367,6 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
||||
}
|
||||
}
|
||||
|
||||
respHeader := this.writer.Header()
|
||||
if len(rangeSet) == 1 {
|
||||
respHeader.Set("Content-Range", "bytes "+strconv.FormatInt(rangeSet[0][0], 10)+"-"+strconv.FormatInt(rangeSet[0][1], 10)+"/"+strconv.FormatInt(reader.BodySize(), 10))
|
||||
respHeader.Set("Content-Length", strconv.FormatInt(rangeSet[0][1]-rangeSet[0][0]+1, 10))
|
||||
@@ -379,7 +388,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
||||
return true
|
||||
}
|
||||
if !this.canIgnore(err) {
|
||||
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
|
||||
remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -425,7 +434,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
||||
})
|
||||
if err != nil {
|
||||
if !this.canIgnore(err) {
|
||||
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
|
||||
remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error())
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -439,25 +448,11 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
||||
return true
|
||||
}
|
||||
} else { // 没有Range
|
||||
var body io.Reader = reader
|
||||
var contentEncoding = this.writer.Header().Get("Content-Encoding")
|
||||
if len(contentEncoding) > 0 && !httpAcceptEncoding(this.RawReq.Header.Get("Accept-Encoding"), contentEncoding) {
|
||||
decompressReader, err := compressions.NewReader(body, contentEncoding)
|
||||
if err == nil {
|
||||
body = decompressReader
|
||||
defer func() {
|
||||
_ = decompressReader.Close()
|
||||
}()
|
||||
|
||||
this.writer.Header().Del("Content-Encoding")
|
||||
this.writer.Header().Del("Content-Length")
|
||||
}
|
||||
}
|
||||
|
||||
this.writer.PrepareCompression(reader.BodySize())
|
||||
var resp = &http.Response{Body: reader}
|
||||
this.writer.Prepare(resp, reader.BodySize(), reader.Status(), false)
|
||||
this.writer.WriteHeader(reader.Status())
|
||||
|
||||
_, err = io.CopyBuffer(this.writer, body, buf)
|
||||
_, err = io.CopyBuffer(this.writer, resp.Body, buf)
|
||||
if err == io.EOF {
|
||||
err = nil
|
||||
}
|
||||
@@ -465,7 +460,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
||||
this.varMapping["cache.status"] = "MISS"
|
||||
|
||||
if !this.canIgnore(err) {
|
||||
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
|
||||
remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: read body failed: "+err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -190,7 +190,7 @@ func (this *HTTPRequest) doFastcgi() (shouldStop bool) {
|
||||
this.processResponseHeaders(resp.StatusCode)
|
||||
|
||||
// 准备
|
||||
this.writer.Prepare(resp.ContentLength, resp.StatusCode)
|
||||
this.writer.Prepare(resp, resp.ContentLength, resp.StatusCode, true)
|
||||
|
||||
// 设置响应代码
|
||||
this.writer.WriteHeader(resp.StatusCode)
|
||||
|
||||
@@ -73,6 +73,13 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
|
||||
return false
|
||||
}
|
||||
|
||||
if u.KeepArgs {
|
||||
var qIndex = strings.Index(this.uri, "?")
|
||||
if qIndex >= 0 {
|
||||
afterURL += this.uri[qIndex:]
|
||||
}
|
||||
}
|
||||
|
||||
if u.Status <= 0 {
|
||||
this.processResponseHeaders(http.StatusTemporaryRedirect)
|
||||
http.Redirect(this.RawWriter, this.RawReq, afterURL, http.StatusTemporaryRedirect)
|
||||
@@ -88,12 +95,20 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
|
||||
return false
|
||||
}
|
||||
|
||||
var afterURL = u.AfterURL
|
||||
if u.KeepArgs {
|
||||
var qIndex = strings.Index(this.uri, "?")
|
||||
if qIndex >= 0 {
|
||||
afterURL += this.uri[qIndex:]
|
||||
}
|
||||
}
|
||||
|
||||
if u.Status <= 0 {
|
||||
this.processResponseHeaders(http.StatusTemporaryRedirect)
|
||||
http.Redirect(this.RawWriter, this.RawReq, u.AfterURL, http.StatusTemporaryRedirect)
|
||||
http.Redirect(this.RawWriter, this.RawReq, afterURL, http.StatusTemporaryRedirect)
|
||||
} else {
|
||||
this.processResponseHeaders(u.Status)
|
||||
http.Redirect(this.RawWriter, this.RawReq, u.AfterURL, u.Status)
|
||||
http.Redirect(this.RawWriter, this.RawReq, afterURL, u.Status)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -61,11 +61,11 @@ func (this *HTTPRequest) doPage(status int) (shouldStop bool) {
|
||||
if page.NewStatus > 0 {
|
||||
// 自定义响应Headers
|
||||
this.processResponseHeaders(page.NewStatus)
|
||||
this.writer.Prepare(stat.Size(), page.NewStatus)
|
||||
this.writer.Prepare(nil, stat.Size(), page.NewStatus, true)
|
||||
this.writer.WriteHeader(page.NewStatus)
|
||||
} else {
|
||||
this.processResponseHeaders(status)
|
||||
this.writer.Prepare(stat.Size(), status)
|
||||
this.writer.Prepare(nil, stat.Size(), status, true)
|
||||
this.writer.WriteHeader(status)
|
||||
}
|
||||
buf := utils.BytePool1k.Get()
|
||||
@@ -100,11 +100,11 @@ func (this *HTTPRequest) doPage(status int) (shouldStop bool) {
|
||||
if page.NewStatus > 0 {
|
||||
// 自定义响应Headers
|
||||
this.processResponseHeaders(page.NewStatus)
|
||||
this.writer.Prepare(int64(len(content)), page.NewStatus)
|
||||
this.writer.Prepare(nil, int64(len(content)), page.NewStatus, true)
|
||||
this.writer.WriteHeader(page.NewStatus)
|
||||
} else {
|
||||
this.processResponseHeaders(status)
|
||||
this.writer.Prepare(int64(len(content)), status)
|
||||
this.writer.Prepare(nil, int64(len(content)), status, true)
|
||||
this.writer.WriteHeader(status)
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/compressions"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
"io"
|
||||
@@ -262,33 +261,15 @@ func (this *HTTPRequest) doReverseProxy() {
|
||||
}
|
||||
}
|
||||
|
||||
// 解压
|
||||
if !resp.Uncompressed {
|
||||
var contentEncoding = resp.Header.Get("Content-Encoding")
|
||||
if len(contentEncoding) > 0 && !httpAcceptEncoding(this.RawReq.Header.Get("Accept-Encoding"), contentEncoding) {
|
||||
reader, err := compressions.NewReader(resp.Body, contentEncoding)
|
||||
if err == nil {
|
||||
var body = resp.Body
|
||||
defer func() {
|
||||
_ = body.Close()
|
||||
}()
|
||||
|
||||
resp.Body = reader
|
||||
resp.Header.Del("Content-Encoding")
|
||||
resp.Header.Del("Content-Length")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 响应Header
|
||||
this.writer.AddHeaders(resp.Header)
|
||||
this.processResponseHeaders(resp.StatusCode)
|
||||
|
||||
// 是否需要刷新
|
||||
shouldAutoFlush := this.reverseProxy.AutoFlush || this.RawReq.Header.Get("Accept") == "text/event-stream"
|
||||
var shouldAutoFlush = this.reverseProxy.AutoFlush || this.RawReq.Header.Get("Accept") == "text/event-stream"
|
||||
|
||||
// 准备
|
||||
delayHeaders := this.writer.Prepare(resp.ContentLength, resp.StatusCode)
|
||||
var delayHeaders = this.writer.Prepare(resp, resp.ContentLength, resp.StatusCode, true)
|
||||
|
||||
// 设置响应代码
|
||||
if !delayHeaders {
|
||||
|
||||
@@ -302,7 +302,7 @@ func (this *HTTPRequest) doRoot() (isBreak bool) {
|
||||
this.cacheRef = nil // 不支持缓存
|
||||
}
|
||||
|
||||
this.writer.Prepare(fileSize, http.StatusOK)
|
||||
this.writer.Prepare(nil, fileSize, http.StatusOK, true)
|
||||
|
||||
pool := this.bytePool(fileSize)
|
||||
buf := pool.Get()
|
||||
|
||||
@@ -54,9 +54,9 @@ func (this *HTTPRequest) doURL(method string, url string, host string, statusCod
|
||||
}
|
||||
this.writer.AddHeaders(resp.Header)
|
||||
if statusCode <= 0 {
|
||||
this.writer.Prepare(resp.ContentLength, resp.StatusCode)
|
||||
this.writer.Prepare(resp, resp.ContentLength, resp.StatusCode, true)
|
||||
} else {
|
||||
this.writer.Prepare(resp.ContentLength, statusCode)
|
||||
this.writer.Prepare(resp, resp.ContentLength, statusCode, true)
|
||||
}
|
||||
|
||||
// 设置响应代码
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,102 +0,0 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// HTTPRateWriter 限速写入
|
||||
type HTTPRateWriter struct {
|
||||
parentWriter http.ResponseWriter
|
||||
|
||||
rateBytes int
|
||||
lastBytes int
|
||||
timeCost time.Duration
|
||||
}
|
||||
|
||||
func NewHTTPRateWriter(writer http.ResponseWriter, rateBytes int64) http.ResponseWriter {
|
||||
return &HTTPRateWriter{
|
||||
parentWriter: writer,
|
||||
rateBytes: types.Int(rateBytes),
|
||||
}
|
||||
}
|
||||
|
||||
func (this *HTTPRateWriter) Header() http.Header {
|
||||
return this.parentWriter.Header()
|
||||
}
|
||||
|
||||
func (this *HTTPRateWriter) Write(data []byte) (int, error) {
|
||||
if len(data) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
var left = this.rateBytes - this.lastBytes
|
||||
|
||||
if left <= 0 {
|
||||
if this.timeCost > 0 && this.timeCost < 1*time.Second {
|
||||
time.Sleep(1*time.Second - this.timeCost)
|
||||
}
|
||||
|
||||
this.lastBytes = 0
|
||||
this.timeCost = 0
|
||||
return this.Write(data)
|
||||
}
|
||||
|
||||
var n = len(data)
|
||||
|
||||
// n <= left
|
||||
if n <= left {
|
||||
this.lastBytes += n
|
||||
|
||||
var before = time.Now()
|
||||
defer func() {
|
||||
this.timeCost += time.Since(before)
|
||||
}()
|
||||
return this.parentWriter.Write(data)
|
||||
}
|
||||
|
||||
// n > left
|
||||
var before = time.Now()
|
||||
result, err := this.parentWriter.Write(data[:left])
|
||||
this.timeCost += time.Since(before)
|
||||
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
this.lastBytes += left
|
||||
|
||||
return this.Write(data[left:])
|
||||
}
|
||||
|
||||
func (this *HTTPRateWriter) WriteHeader(statusCode int) {
|
||||
this.parentWriter.WriteHeader(statusCode)
|
||||
}
|
||||
|
||||
// Hijack Hijack
|
||||
func (this *HTTPRateWriter) Hijack() (conn net.Conn, buf *bufio.ReadWriter, err error) {
|
||||
if this.parentWriter == nil {
|
||||
return
|
||||
}
|
||||
hijack, ok := this.parentWriter.(http.Hijacker)
|
||||
if ok {
|
||||
return hijack.Hijack()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Flush Flush
|
||||
func (this *HTTPRateWriter) Flush() {
|
||||
if this.parentWriter == nil {
|
||||
return
|
||||
}
|
||||
flusher, ok := this.parentWriter.(http.Flusher)
|
||||
if ok {
|
||||
flusher.Flush()
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -56,13 +56,16 @@ type Node struct {
|
||||
maxCPU int32
|
||||
maxThreads int
|
||||
timezone string
|
||||
|
||||
updatingServerMap map[int64]*serverconfigs.ServerConfig
|
||||
}
|
||||
|
||||
func NewNode() *Node {
|
||||
return &Node{
|
||||
sock: gosock.NewTmpSock(teaconst.ProcessName),
|
||||
maxThreads: -1,
|
||||
maxCPU: -1,
|
||||
sock: gosock.NewTmpSock(teaconst.ProcessName),
|
||||
maxThreads: -1,
|
||||
maxCPU: -1,
|
||||
updatingServerMap: map[int64]*serverconfigs.ServerConfig{},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -264,7 +267,7 @@ func (this *Node) loop() error {
|
||||
defer tr.End()
|
||||
|
||||
// 检查api.yaml是否存在
|
||||
apiConfigFile := Tea.ConfigFile("api.yaml")
|
||||
var apiConfigFile = Tea.ConfigFile("api.yaml")
|
||||
_, err := os.Stat(apiConfigFile)
|
||||
if err != nil {
|
||||
return nil
|
||||
@@ -275,7 +278,7 @@ func (this *Node) loop() error {
|
||||
return errors.New("create rpc client failed: " + err.Error())
|
||||
}
|
||||
|
||||
nodeCtx := rpcClient.Context()
|
||||
var nodeCtx = rpcClient.Context()
|
||||
tasksResp, err := rpcClient.NodeTaskRPC().FindNodeTasks(nodeCtx, &pb.FindNodeTasksRequest{})
|
||||
if err != nil {
|
||||
return errors.New("read node tasks failed: " + err.Error())
|
||||
@@ -295,11 +298,15 @@ func (this *Node) loop() error {
|
||||
return err
|
||||
}
|
||||
case "configChanged":
|
||||
if !task.IsPrimary {
|
||||
// 我们等等主节点配置准备完毕
|
||||
time.Sleep(2 * time.Second)
|
||||
if task.ServerId > 0 {
|
||||
err = this.syncServerConfig(task.ServerId)
|
||||
} else {
|
||||
if !task.IsPrimary {
|
||||
// 我们等等主节点配置准备完毕
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
err = this.syncConfig(task.Version)
|
||||
}
|
||||
err := this.syncConfig(task.Version)
|
||||
if err != nil {
|
||||
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
|
||||
NodeTaskId: task.Id,
|
||||
@@ -316,6 +323,7 @@ func (this *Node) loop() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case "nodeVersionChanged":
|
||||
goman.New(func() {
|
||||
sharedUpgradeManager.Start()
|
||||
@@ -419,22 +427,8 @@ func (this *Node) syncConfig(taskVersion int64) error {
|
||||
remotelogs.Println("NODE", "loading config ...")
|
||||
}
|
||||
|
||||
nodeconfigs.ResetNodeConfig(nodeConfig)
|
||||
caches.SharedManager.MaxDiskCapacity = nodeConfig.MaxCacheDiskCapacity
|
||||
caches.SharedManager.MaxMemoryCapacity = nodeConfig.MaxCacheMemoryCapacity
|
||||
if len(nodeConfig.HTTPCachePolicies) > 0 {
|
||||
caches.SharedManager.UpdatePolicies(nodeConfig.HTTPCachePolicies)
|
||||
} else {
|
||||
caches.SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{})
|
||||
}
|
||||
|
||||
sharedWAFManager.UpdatePolicies(nodeConfig.FindAllFirewallPolicies())
|
||||
iplibrary.SharedActionManager.UpdateActions(nodeConfig.FirewallActions)
|
||||
sharedNodeConfig = nodeConfig
|
||||
this.onReload(nodeConfig)
|
||||
|
||||
metrics.SharedManager.Update(nodeConfig.MetricItems)
|
||||
|
||||
// 发送事件
|
||||
events.Notify(events.EventReload)
|
||||
|
||||
@@ -447,30 +441,96 @@ func (this *Node) syncConfig(taskVersion int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 读取单个服务配置
|
||||
func (this *Node) syncServerConfig(serverId int64) error {
|
||||
rpcClient, err := rpc.SharedRPC()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := rpcClient.ServerRPC().ComposeServerConfig(rpcClient.Context(), &pb.ComposeServerConfigRequest{ServerId: serverId})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
if len(resp.ServerConfigJSON) == 0 {
|
||||
this.updatingServerMap[serverId] = nil
|
||||
} else {
|
||||
var config = &serverconfigs.ServerConfig{}
|
||||
err = json.Unmarshal(resp.ServerConfigJSON, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
this.updatingServerMap[serverId] = config
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 启动同步计时器
|
||||
func (this *Node) startSyncTimer() {
|
||||
// TODO 这个时间间隔可以自行设置
|
||||
ticker := time.NewTicker(60 * time.Second)
|
||||
var taskTicker = time.NewTicker(60 * time.Second)
|
||||
var serverChangeTicker = time.NewTicker(5 * time.Second)
|
||||
|
||||
events.OnKey(events.EventQuit, this, func() {
|
||||
remotelogs.Println("NODE", "quit sync timer")
|
||||
ticker.Stop()
|
||||
taskTicker.Stop()
|
||||
serverChangeTicker.Stop()
|
||||
})
|
||||
goman.New(func() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
case <-taskTicker.C: // 定期执行
|
||||
err := this.loop()
|
||||
if err != nil {
|
||||
remotelogs.Error("NODE", "sync config error: "+err.Error())
|
||||
continue
|
||||
}
|
||||
case <-nodeTaskNotify:
|
||||
case <-serverChangeTicker.C: // 服务变化
|
||||
this.locker.Lock()
|
||||
if len(this.updatingServerMap) > 0 {
|
||||
var updatingServerMap = this.updatingServerMap
|
||||
this.updatingServerMap = map[int64]*serverconfigs.ServerConfig{}
|
||||
newNodeConfig, err := nodeconfigs.CloneNodeConfig(sharedNodeConfig)
|
||||
if err != nil {
|
||||
remotelogs.Error("NODE", "apply server config error: "+err.Error())
|
||||
continue
|
||||
}
|
||||
for serverId, serverConfig := range updatingServerMap {
|
||||
if serverConfig != nil {
|
||||
newNodeConfig.AddServer(serverConfig)
|
||||
} else {
|
||||
newNodeConfig.RemoveServer(serverId)
|
||||
}
|
||||
}
|
||||
|
||||
err, serverErrors := newNodeConfig.Init()
|
||||
if err != nil {
|
||||
remotelogs.Error("NODE", "apply server config error: "+err.Error())
|
||||
continue
|
||||
}
|
||||
if len(serverErrors) > 0 {
|
||||
for _, serverErr := range serverErrors {
|
||||
remotelogs.ServerError(serverErr.Id, "NODE", serverErr.Message, nodeconfigs.NodeLogTypeServerConfigInitFailed, maps.Map{})
|
||||
}
|
||||
}
|
||||
|
||||
this.onReload(newNodeConfig)
|
||||
|
||||
err = sharedListenerManager.Start(newNodeConfig)
|
||||
if err != nil {
|
||||
remotelogs.Error("NODE", "apply server config error: "+err.Error())
|
||||
}
|
||||
}
|
||||
this.locker.Unlock()
|
||||
case <-nodeTaskNotify: // 有新的更新任务
|
||||
err := this.loop()
|
||||
if err != nil {
|
||||
remotelogs.Error("NODE", "sync config error: "+err.Error())
|
||||
continue
|
||||
}
|
||||
case <-nodeConfigChangedNotify:
|
||||
case <-nodeConfigChangedNotify: // 节点变化通知
|
||||
err := this.syncConfig(0)
|
||||
if err != nil {
|
||||
remotelogs.Error("NODE", "sync config error: "+err.Error())
|
||||
@@ -701,6 +761,25 @@ func (this *Node) listenSock() error {
|
||||
|
||||
// 重载配置调用
|
||||
func (this *Node) onReload(config *nodeconfigs.NodeConfig) {
|
||||
nodeconfigs.ResetNodeConfig(config)
|
||||
sharedNodeConfig = config
|
||||
|
||||
// 缓存策略
|
||||
caches.SharedManager.MaxDiskCapacity = config.MaxCacheDiskCapacity
|
||||
caches.SharedManager.MaxMemoryCapacity = config.MaxCacheMemoryCapacity
|
||||
if len(config.HTTPCachePolicies) > 0 {
|
||||
caches.SharedManager.UpdatePolicies(config.HTTPCachePolicies)
|
||||
} else {
|
||||
caches.SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{})
|
||||
}
|
||||
|
||||
// WAF策略
|
||||
sharedWAFManager.UpdatePolicies(config.FindAllFirewallPolicies())
|
||||
iplibrary.SharedActionManager.UpdateActions(config.FirewallActions)
|
||||
|
||||
// 统计指标
|
||||
metrics.SharedManager.Update(config.MetricItems)
|
||||
|
||||
// max cpu
|
||||
if config.MaxCPU != this.maxCPU {
|
||||
if config.MaxCPU > 0 && config.MaxCPU < int32(runtime.NumCPU()) {
|
||||
|
||||
26
internal/utils/readers/bytes_counter_reader.go
Normal file
26
internal/utils/readers/bytes_counter_reader.go
Normal file
@@ -0,0 +1,26 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package readers
|
||||
|
||||
import "io"
|
||||
|
||||
type BytesCounterReader struct {
|
||||
rawReader io.Reader
|
||||
count int64
|
||||
}
|
||||
|
||||
func NewBytesCounterReader(rawReader io.Reader) *BytesCounterReader {
|
||||
return &BytesCounterReader{
|
||||
rawReader: rawReader,
|
||||
}
|
||||
}
|
||||
|
||||
func (this *BytesCounterReader) Read(p []byte) (n int, err error) {
|
||||
n, err = this.rawReader.Read(p)
|
||||
this.count += int64(n)
|
||||
return
|
||||
}
|
||||
|
||||
func (this *BytesCounterReader) TotalBytes() int64 {
|
||||
return this.count
|
||||
}
|
||||
34
internal/utils/readers/filter_reader.go
Normal file
34
internal/utils/readers/filter_reader.go
Normal file
@@ -0,0 +1,34 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package readers
|
||||
|
||||
import "io"
|
||||
|
||||
type FilterFunc = func(p []byte, err error) error
|
||||
|
||||
type FilterReader struct {
|
||||
rawReader io.Reader
|
||||
filters []FilterFunc
|
||||
}
|
||||
|
||||
func NewFilterReader(rawReader io.Reader) *FilterReader {
|
||||
return &FilterReader{
|
||||
rawReader: rawReader,
|
||||
}
|
||||
}
|
||||
|
||||
func (this *FilterReader) Add(filter FilterFunc) {
|
||||
this.filters = append(this.filters, filter)
|
||||
}
|
||||
|
||||
func (this *FilterReader) Read(p []byte) (n int, err error) {
|
||||
n, err = this.rawReader.Read(p)
|
||||
for _, filter := range this.filters {
|
||||
filterErr := filter(p[:n], err)
|
||||
if filterErr != nil {
|
||||
err = filterErr
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
41
internal/utils/readers/filter_reader_test.go
Normal file
41
internal/utils/readers/filter_reader_test.go
Normal file
@@ -0,0 +1,41 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package readers_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/readers"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestNewFilterReader(t *testing.T) {
|
||||
var reader = readers.NewFilterReader(bytes.NewBufferString("0123456789"))
|
||||
reader.Add(func(p []byte, err error) error {
|
||||
t.Log("filter1:", string(p), err)
|
||||
return nil
|
||||
})
|
||||
reader.Add(func(p []byte, err error) error {
|
||||
t.Log("filter2:", string(p), err)
|
||||
if string(p) == "345" {
|
||||
return errors.New("end")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
reader.Add(func(p []byte, err error) error {
|
||||
t.Log("filter3:", string(p), err)
|
||||
return nil
|
||||
})
|
||||
|
||||
var buf = make([]byte, 3)
|
||||
for {
|
||||
n, err := reader.Read(buf)
|
||||
if n > 0 {
|
||||
t.Log(string(buf[:n]))
|
||||
}
|
||||
if err != nil {
|
||||
t.Log(err)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
52
internal/utils/readers/tee_reader.go
Normal file
52
internal/utils/readers/tee_reader.go
Normal file
@@ -0,0 +1,52 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package readers
|
||||
|
||||
import (
|
||||
"io"
|
||||
)
|
||||
|
||||
type TeeReader struct {
|
||||
r io.Reader
|
||||
w io.Writer
|
||||
|
||||
onFail func(err error)
|
||||
onEOF func()
|
||||
}
|
||||
|
||||
func NewTeeReader(reader io.Reader, writer io.Writer) *TeeReader {
|
||||
return &TeeReader{
|
||||
r: reader,
|
||||
w: writer,
|
||||
}
|
||||
}
|
||||
|
||||
func (this *TeeReader) Read(p []byte) (n int, err error) {
|
||||
n, err = this.r.Read(p)
|
||||
if n > 0 {
|
||||
_, wErr := this.w.Write(p[:n])
|
||||
if err == nil && wErr != nil {
|
||||
err = wErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
if this.onEOF != nil {
|
||||
this.onEOF()
|
||||
}
|
||||
} else {
|
||||
if this.onFail != nil {
|
||||
this.onFail(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (this *TeeReader) OnFail(onFail func(err error)) {
|
||||
this.onFail = onFail
|
||||
}
|
||||
|
||||
func (this *TeeReader) OnEOF(onEOF func()) {
|
||||
this.onEOF = onEOF
|
||||
}
|
||||
58
internal/utils/readers/tee_reader_closer.go
Normal file
58
internal/utils/readers/tee_reader_closer.go
Normal file
@@ -0,0 +1,58 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package readers
|
||||
|
||||
import "io"
|
||||
|
||||
type TeeReaderCloser struct {
|
||||
r io.Reader
|
||||
w io.Writer
|
||||
|
||||
onFail func(err error)
|
||||
onEOF func()
|
||||
}
|
||||
|
||||
func NewTeeReaderCloser(reader io.Reader, writer io.Writer) *TeeReaderCloser {
|
||||
return &TeeReaderCloser{
|
||||
r: reader,
|
||||
w: writer,
|
||||
}
|
||||
}
|
||||
|
||||
func (this *TeeReaderCloser) Read(p []byte) (n int, err error) {
|
||||
n, err = this.r.Read(p)
|
||||
if n > 0 {
|
||||
_, wErr := this.w.Write(p[:n])
|
||||
if err == nil && wErr != nil {
|
||||
err = wErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
if this.onEOF != nil {
|
||||
this.onEOF()
|
||||
}
|
||||
} else {
|
||||
if this.onFail != nil {
|
||||
this.onFail(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (this *TeeReaderCloser) Close() error {
|
||||
r, ok := this.r.(io.Closer)
|
||||
if ok {
|
||||
return r.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *TeeReaderCloser) OnFail(onFail func(err error)) {
|
||||
this.onFail = onFail
|
||||
}
|
||||
|
||||
func (this *TeeReaderCloser) OnEOF(onEOF func()) {
|
||||
this.onEOF = onEOF
|
||||
}
|
||||
28
internal/utils/writers/bytes_counter_writer.go
Normal file
28
internal/utils/writers/bytes_counter_writer.go
Normal file
@@ -0,0 +1,28 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package writers
|
||||
|
||||
import "io"
|
||||
|
||||
type BytesCounterWriter struct {
|
||||
writer io.Writer
|
||||
count int64
|
||||
}
|
||||
|
||||
func NewBytesCounterWriter(rawWriter io.Writer) *BytesCounterWriter {
|
||||
return &BytesCounterWriter{writer: rawWriter}
|
||||
}
|
||||
|
||||
func (this *BytesCounterWriter) Write(p []byte) (n int, err error) {
|
||||
n, err = this.writer.Write(p)
|
||||
this.count += int64(n)
|
||||
return
|
||||
}
|
||||
|
||||
func (this *BytesCounterWriter) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *BytesCounterWriter) TotalBytes() int64 {
|
||||
return this.count
|
||||
}
|
||||
87
internal/utils/writers/rate_limit_writer.go
Normal file
87
internal/utils/writers/rate_limit_writer.go
Normal file
@@ -0,0 +1,87 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package writers
|
||||
|
||||
import (
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"io"
|
||||
"time"
|
||||
)
|
||||
|
||||
// RateLimitWriter 限速写入
|
||||
type RateLimitWriter struct {
|
||||
rawWriter io.WriteCloser
|
||||
|
||||
rateBytes int
|
||||
|
||||
written int
|
||||
before time.Time
|
||||
}
|
||||
|
||||
func NewRateLimitWriter(rawWriter io.WriteCloser, rateBytes int64) io.WriteCloser {
|
||||
return &RateLimitWriter{
|
||||
rawWriter: rawWriter,
|
||||
rateBytes: types.Int(rateBytes),
|
||||
before: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
func (this *RateLimitWriter) Write(p []byte) (n int, err error) {
|
||||
if this.rateBytes <= 0 {
|
||||
return this.write(p)
|
||||
}
|
||||
|
||||
var size = len(p)
|
||||
if size == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
if size <= this.rateBytes {
|
||||
return this.write(p)
|
||||
}
|
||||
|
||||
for {
|
||||
size = len(p)
|
||||
|
||||
var limit = this.rateBytes
|
||||
if limit > size {
|
||||
limit = size
|
||||
}
|
||||
n1, wErr := this.write(p[:limit])
|
||||
n += n1
|
||||
if wErr != nil {
|
||||
return n, wErr
|
||||
}
|
||||
|
||||
if size > limit {
|
||||
p = p[limit:]
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (this *RateLimitWriter) Close() error {
|
||||
return this.rawWriter.Close()
|
||||
}
|
||||
|
||||
func (this *RateLimitWriter) write(p []byte) (n int, err error) {
|
||||
n, err = this.rawWriter.Write(p)
|
||||
|
||||
if err == nil {
|
||||
this.written += n
|
||||
|
||||
if this.written >= this.rateBytes {
|
||||
var duration = 1*time.Second - time.Now().Sub(this.before)
|
||||
if duration > 0 {
|
||||
time.Sleep(duration)
|
||||
}
|
||||
this.before = time.Now()
|
||||
this.written = 0
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
41
internal/utils/writers/rate_limit_writer_test.go
Normal file
41
internal/utils/writers/rate_limit_writer_test.go
Normal file
@@ -0,0 +1,41 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package writers
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestSleep(t *testing.T) {
|
||||
var count = 2000
|
||||
var wg = sync.WaitGroup{}
|
||||
wg.Add(count)
|
||||
var before = time.Now()
|
||||
for i := 0; i < count; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
time.Sleep(1 * time.Second)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
t.Log(time.Since(before).Seconds()*1000, "ms")
|
||||
}
|
||||
|
||||
func TestTimeout(t *testing.T) {
|
||||
var count = 2000
|
||||
var wg = sync.WaitGroup{}
|
||||
wg.Add(count)
|
||||
var before = time.Now()
|
||||
for i := 0; i < count; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
var timeout = time.NewTimer(1 * time.Second)
|
||||
<-timeout.C
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
t.Log(time.Since(before).Seconds()*1000, "ms")
|
||||
}
|
||||
51
internal/utils/writers/tee_writer_closer.go
Normal file
51
internal/utils/writers/tee_writer_closer.go
Normal file
@@ -0,0 +1,51 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package writers
|
||||
|
||||
import "io"
|
||||
|
||||
type TeeWriterCloser struct {
|
||||
primaryW io.WriteCloser
|
||||
secondaryW io.WriteCloser
|
||||
|
||||
onFail func(err error)
|
||||
}
|
||||
|
||||
func NewTeeWriterCloser(primaryW io.WriteCloser, secondaryW io.WriteCloser) *TeeWriterCloser {
|
||||
return &TeeWriterCloser{
|
||||
primaryW: primaryW,
|
||||
secondaryW: secondaryW,
|
||||
}
|
||||
}
|
||||
|
||||
func (this *TeeWriterCloser) Write(p []byte) (n int, err error) {
|
||||
{
|
||||
n, err = this.primaryW.Write(p)
|
||||
|
||||
if err != nil {
|
||||
if this.onFail != nil {
|
||||
this.onFail(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
_, err2 := this.secondaryW.Write(p)
|
||||
if err2 != nil {
|
||||
if this.onFail != nil {
|
||||
this.onFail(err2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (this *TeeWriterCloser) Close() error {
|
||||
// 这里不关闭secondary
|
||||
return this.primaryW.Close()
|
||||
}
|
||||
|
||||
func (this *TeeWriterCloser) OnFail(onFail func(err error)) {
|
||||
this.onFail = onFail
|
||||
}
|
||||
@@ -1,7 +1,9 @@
|
||||
package waf
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/waf/requests"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
)
|
||||
|
||||
// rule group
|
||||
@@ -30,7 +32,7 @@ func (this *RuleGroup) Init(waf *WAF) error {
|
||||
for _, set := range this.RuleSets {
|
||||
err := set.Init(waf)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.New("init set '" + types.String(set.Id) + "' failed: " + err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
package waf
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/waf/requests"
|
||||
"github.com/iwind/TeaGo/lists"
|
||||
"github.com/iwind/TeaGo/maps"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"net/http"
|
||||
"sort"
|
||||
)
|
||||
@@ -47,7 +49,7 @@ func (this *RuleSet) Init(waf *WAF) error {
|
||||
for _, rule := range this.Rules {
|
||||
err := rule.Init()
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.New("init rule '" + rule.Param + " " + rule.Operator + " " + types.String(rule.Value) + "' failed: " + err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/waf/requests"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"github.com/iwind/TeaGo/files"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"gopkg.in/yaml.v3"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
@@ -89,7 +90,7 @@ func (this *WAF) Init() (resultErrors []error) {
|
||||
err := group.Init(this)
|
||||
if err != nil {
|
||||
// 这里我们不阻止其他规则正常加入
|
||||
resultErrors = append(resultErrors, err)
|
||||
resultErrors = append(resultErrors, errors.New("init group '"+types.String(group.Id)+"' failed: "+err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user