Compare commits

...

37 Commits

Author SHA1 Message Date
刘祥超
24fc2249bb 优化文件缓存 2021-06-14 19:55:06 +08:00
刘祥超
84c931b411 缓存支持ETag和Last-Modified 2021-06-14 11:46:39 +08:00
刘祥超
7f422a2946 优化cache/FileList错误提示 2021-06-13 17:51:04 +08:00
刘祥超
13194366a5 优化文件缓存 2021-06-13 17:37:57 +08:00
刘祥超
993cda7766 修复内存缓存没有init()的Bug 2021-06-12 10:03:33 +08:00
刘祥超
a46e970c74 优化内存缓存 2021-06-11 14:53:51 +08:00
刘祥超
085adcf1c4 实现节点自动升级成最新版本 2021-06-10 19:19:15 +08:00
刘祥超
c1af8b36a4 完成两个TODO文档说明 2021-06-10 11:35:20 +08:00
刘祥超
8cba12b4b5 URL跳转规则支持匹配条件 2021-06-09 21:44:59 +08:00
刘祥超
3debe1d1df 支持不缓存条件 2021-06-08 22:45:11 +08:00
刘祥超
549f110e5f 增加服务流量统计 2021-06-08 11:24:41 +08:00
刘祥超
f3a45e9e64 改进UDP IsOk的使用方法 2021-06-07 15:48:39 +08:00
刘祥超
f461760158 支持UDP代理 2021-06-07 15:45:47 +08:00
刘祥超
a49b724745 优化HTTP缓存,主要是并发冲突、缓存写入不全等问题 2021-06-06 23:42:11 +08:00
刘祥超
0df5dfad23 某个服务端口启动失败后,会自动重试 2021-06-06 13:40:00 +08:00
刘祥超
aeb1bc08a7 改进编辑脚本 2021-06-06 11:58:41 +08:00
刘祥超
c51aca621a 调整API命名 2021-06-01 19:52:37 +08:00
刘祥超
c78d055dae 更新fcgi 2021-05-28 14:00:45 +08:00
刘祥超
4502a3b132 改进清空缓存目录逻辑 2021-05-25 18:28:24 +08:00
刘祥超
fa99d86d6f 对部分错误提示降级 2021-05-25 11:16:05 +08:00
刘祥超
2edd2bb105 缓存文件列表初始化时自动创建目录 2021-05-25 11:06:43 +08:00
刘祥超
0f8aee0ccb 修改版本号 2021-05-25 11:06:07 +08:00
刘祥超
2500929a99 调整在Mac OS上的编译脚本 2021-05-25 11:05:55 +08:00
刘祥超
496ee6cfa0 优化代码 2021-05-24 09:37:37 +08:00
刘祥超
437914a321 支持缓存策略全局的缓存条件/X-Cache中加入更多信息 2021-05-24 09:23:51 +08:00
刘祥超
3a93bf756a 加快缓存策略启动速度 2021-05-23 22:59:00 +08:00
刘祥超
38d81f340e 调整个别日志级别 2021-05-23 20:45:14 +08:00
刘祥超
889b9d063a URL跳转支持正则匹配 2021-05-23 17:01:08 +08:00
刘祥超
4c73b3618f 优化代码 2021-05-23 16:16:56 +08:00
刘祥超
df5f50682a 不再提示http2 Stream相关错误 2021-05-23 15:50:21 +08:00
刘祥超
9545bf69db 删除一个注释 2021-05-23 14:31:25 +08:00
刘祥超
b2f18c22ee 修复缓存状态码不生效的问题 2021-05-23 14:29:56 +08:00
刘祥超
63e3b7ac2f 修复跳转到HTTPS的自定义端口无法起作用的Bug 2021-05-22 10:26:37 +08:00
刘祥超
760a62c286 修改两处日志级别 2021-05-22 10:26:12 +08:00
刘祥超
296848a6d6 优化一个文件缓存统计的Bug 2021-05-19 22:14:57 +08:00
刘祥超
4e04534244 更改版本号 2021-05-19 14:16:04 +08:00
刘祥超
cad43e610d 缓存文件列表使用sqlite管理 2021-05-19 12:07:35 +08:00
54 changed files with 2692 additions and 596 deletions

View File

@@ -33,6 +33,7 @@ function build() {
mkdir $DIST/bin
mkdir $DIST/configs
mkdir $DIST/logs
mkdir $DIST/data
fi
cp $ROOT/configs/api.template.yaml $DIST/configs
@@ -47,7 +48,38 @@ function build() {
fi
echo "building ..."
env GOOS=${OS} GOARCH=${ARCH} go build -o $DIST/bin/${NAME} -ldflags="-s -w" $ROOT/../cmd/edge-node/main.go
MUSL_DIR="/usr/local/opt/musl-cross/bin"
CC_PATH=""
CXX_PATH=""
if [[ `uname -a` == *"Darwin"* && "${OS}" == "linux" ]]; then
# /usr/local/opt/musl-cross/bin/
if [ "${ARCH}" == "amd64" ]; then
CC_PATH="x86_64-linux-musl-gcc"
CXX_PATH="x86_64-linux-musl-g++"
fi
if [ "${ARCH}" == "386" ]; then
CC_PATH="i486-linux-musl-gcc"
CXX_PATH="i486-linux-musl-g++"
fi
if [ "${ARCH}" == "arm64" ]; then
CC_PATH="aarch64-linux-musl-gcc"
CXX_PATH="aarch64-linux-musl-g++"
fi
if [ "${ARCH}" == "mips64" ]; then
CC_PATH="mips64-linux-musl-gcc"
CXX_PATH="mips64-linux-musl-g++"
fi
if [ "${ARCH}" == "mips64le" ]; then
CC_PATH="mips64el-linux-musl-gcc"
CXX_PATH="mips64el-linux-musl-g++"
fi
fi
if [ ! -z $CC_PATH ]; then
env CC=$MUSL_DIR/$CC_PATH CXX=$MUSL_DIR/$CXX_PATH GOOS=${OS} GOARCH=${ARCH} CGO_ENABLED=1 go build -o $DIST/bin/${NAME} -ldflags "-linkmode external -extldflags -static -s -w" $ROOT/../cmd/edge-node/main.go
else
env GOOS=${OS} GOARCH=${ARCH} CGO_ENABLED=1 go build -o $DIST/bin/${NAME} -ldflags="-s -w" $ROOT/../cmd/edge-node/main.go
fi
# delete hidden files
find $DIST -name ".DS_Store" -delete

1
build/data/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
index.*

View File

@@ -7,8 +7,11 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/nodes"
"github.com/iwind/TeaGo/Tea"
_ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/types"
"io/ioutil"
"net/http"
_ "net/http/pprof"
"os"
"syscall"
)
@@ -17,7 +20,7 @@ func main() {
app := apps.NewAppCmd().
Version(teaconst.Version).
Product(teaconst.ProductName).
Usage(teaconst.ProcessName + " [-v|start|stop|restart|quit|test|service|daemon]")
Usage(teaconst.ProcessName + " [-v|start|stop|restart|status|quit|test|service|daemon|pprof]")
app.On("test", func() {
err := nodes.NewNode().Test()
@@ -57,6 +60,21 @@ func main() {
_ = process.Signal(syscall.SIGQUIT)
}
})
app.On("pprof", func() {
// TODO 自己指定端口
addr := "127.0.0.1:6060"
logs.Println("starting with pprof '" + addr + "'...")
go func() {
err := http.ListenAndServe(addr, nil)
if err != nil {
logs.Println("[error]" + err.Error())
}
}()
node := nodes.NewNode()
node.Start()
})
app.Run(func() {
node := nodes.NewNode()
node.Start()

6
go.mod
View File

@@ -12,13 +12,15 @@ require (
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/go-yaml/yaml v2.1.0+incompatible
github.com/golang/protobuf v1.4.2
github.com/iwind/TeaGo v0.0.0-20201020081413-7cf62d6f420f
github.com/iwind/gofcgi v0.0.0-20210506081859-17498ab3e9d7
github.com/iwind/TeaGo v0.0.0-20210411134150-ddf57e240c2f
github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11
github.com/lionsoul2014/ip2region v2.2.0-release+incompatible
github.com/mattn/go-sqlite3 v1.14.7
github.com/mssola/user_agent v0.5.2
github.com/shirou/gopsutil v2.20.9+incompatible
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299
golang.org/x/text v0.3.2
google.golang.org/grpc v1.32.0
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776
)

17
go.sum
View File

@@ -11,6 +11,7 @@ github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiU
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
@@ -55,11 +56,10 @@ github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/iwind/TeaGo v0.0.0-20200923021120-f5d76441fe9e/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/TeaGo v0.0.0-20201020081413-7cf62d6f420f h1:6Ws2H+eorfVUoMO2jta6A9nIdh8oi5/5LXo/LkAxR+E=
github.com/iwind/TeaGo v0.0.0-20201020081413-7cf62d6f420f/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/gofcgi v0.0.0-20210506081859-17498ab3e9d7 h1:apv23QzWNmv0D76gB3+u/5kf0F/Yw4W8h489CWUZtss=
github.com/iwind/gofcgi v0.0.0-20210506081859-17498ab3e9d7/go.mod h1:JtbX20untAjUVjZs1ZBtq80f5rJWvwtQNRL6EnuYRnY=
github.com/iwind/TeaGo v0.0.0-20210411134150-ddf57e240c2f h1:r2O8PONj/KiuZjJHVHn7KlCePUIjNtgAmvLfgRafQ8o=
github.com/iwind/TeaGo v0.0.0-20210411134150-ddf57e240c2f/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11 h1:DaQjoWZhLNxjhIXedVg4/vFEtHkZhK4IjIwsWdyzBLg=
github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11/go.mod h1:JtbX20untAjUVjZs1ZBtq80f5rJWvwtQNRL6EnuYRnY=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@@ -67,10 +67,10 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lionsoul2014/ip2region v2.2.0-release+incompatible h1:1qp9iks+69h7IGLazAplzS9Ca14HAxuD5c0rbFdPGy4=
github.com/lionsoul2014/ip2region v2.2.0-release+incompatible/go.mod h1:+ZBN7PBoh5gG6/y0ZQ85vJDBe21WnfbRrQQwTfliJJI=
github.com/mattn/go-sqlite3 v1.14.7 h1:fxWBnXkxfM6sRiuH3bqJ4CfzZojMOLVc0UTsTglEghA=
github.com/mattn/go-sqlite3 v1.14.7/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mssola/user_agent v0.5.2 h1:CZkTUahjL1+OcZ5zv3kZr8QiJ8jy2H08vZIEkBeRbxo=
github.com/mssola/user_agent v0.5.2/go.mod h1:TTPno8LPY3wAIEKRpAtkdMT0f8SE24pLRGPahjCH4uw=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
@@ -181,8 +181,9 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

41
internal/caches/errors.go Normal file
View File

@@ -0,0 +1,41 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
import "errors"
// 常用的几个错误
var (
ErrNotFound = errors.New("cache not found")
ErrFileIsWriting = errors.New("the file is writing")
ErrInvalidRange = errors.New("invalid range")
)
// CapacityError 容量错误
// 独立出来是为了可以在有些场合下可以忽略,防止产生没必要的错误提示数量太多
type CapacityError struct {
err string
}
func NewCapacityError(err string) error {
return &CapacityError{err: err}
}
func (this *CapacityError) Error() string {
return this.err
}
// CanIgnoreErr 检查错误是否可以忽略
func CanIgnoreErr(err error) bool {
if err == nil {
return true
}
if err == ErrFileIsWriting {
return true
}
_, ok := err.(*CapacityError)
if ok {
return true
}
return false
}

View File

@@ -0,0 +1,16 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
import (
"github.com/iwind/TeaGo/assert"
"testing"
)
func TestCanIgnoreErr(t *testing.T) {
a := assert.NewAssertion(t)
a.IsTrue(CanIgnoreErr(ErrFileIsWriting))
a.IsTrue(CanIgnoreErr(NewCapacityError("over capcity")))
a.IsFalse(CanIgnoreErr(ErrNotFound))
}

View File

@@ -1,6 +1,8 @@
package caches
import "time"
import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
)
type ItemType = int
@@ -10,20 +12,22 @@ const (
)
type Item struct {
Type ItemType
Key string
ExpiredAt int64
HeaderSize int64
BodySize int64
MetaSize int64
Type ItemType `json:"type"`
Key string `json:"key"`
ExpiredAt int64 `json:"expiredAt"`
HeaderSize int64 `json:"headerSize"`
BodySize int64 `json:"bodySize"`
MetaSize int64 `json:"metaSize"`
Host string `json:"host"` // 主机名
ServerId int64 `json:"serverId"` // 服务ID
}
func (this *Item) IsExpired() bool {
return this.ExpiredAt < time.Now().Unix()
return this.ExpiredAt < utils.UnixTime()
}
func (this *Item) TotalSize() int64 {
return this.Size() + this.MetaSize + int64(len(this.Key))
return this.Size() + this.MetaSize + int64(len(this.Key)) + 64
}
func (this *Item) Size() int64 {

View File

@@ -1,145 +0,0 @@
package caches
import (
"strings"
"sync"
)
// 缓存列表管理
type List struct {
m map[string]*Item // hash => item
locker sync.RWMutex
onAdd func(item *Item)
onRemove func(item *Item)
}
func NewList() *List {
return &List{
m: map[string]*Item{},
}
}
func (this *List) Reset() {
this.locker.Lock()
this.m = map[string]*Item{}
this.locker.Unlock()
}
func (this *List) Add(hash string, item *Item) {
this.locker.Lock()
if this.onAdd != nil {
this.onAdd(item)
}
this.m[hash] = item
this.locker.Unlock()
}
func (this *List) Exist(hash string) bool {
this.locker.RLock()
defer this.locker.RUnlock()
item, ok := this.m[hash]
if !ok {
return false
}
return !item.IsExpired()
}
// 根据前缀进行查找
func (this *List) FindKeysWithPrefix(prefix string) (keys []string) {
this.locker.RLock()
defer this.locker.RUnlock()
// TODO 需要优化性能支持千万级数据低于1s的处理速度
for _, item := range this.m {
if strings.HasPrefix(item.Key, prefix) {
keys = append(keys, item.Key)
}
}
return
}
func (this *List) Remove(hash string) {
this.locker.Lock()
item, ok := this.m[hash]
if ok {
if this.onRemove != nil {
this.onRemove(item)
}
delete(this.m, hash)
}
this.locker.Unlock()
}
// 清理过期的缓存
// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间
// callback 每次发现过期key的调用
func (this *List) Purge(count int, callback func(hash string)) {
this.locker.Lock()
deletedHashList := []string{}
for hash, item := range this.m {
if count <= 0 {
break
}
if item.IsExpired() {
if this.onRemove != nil {
this.onRemove(item)
}
delete(this.m, hash)
deletedHashList = append(deletedHashList, hash)
}
count--
}
this.locker.Unlock()
// 执行外部操作
for _, hash := range deletedHashList {
if callback != nil {
callback(hash)
}
}
}
func (this *List) Stat(check func(hash string) bool) *Stat {
this.locker.RLock()
defer this.locker.RUnlock()
result := &Stat{
Count: 0,
Size: 0,
}
for hash, item := range this.m {
if !item.IsExpired() {
// 检查文件是否存在、内容是否正确等
if check != nil && check(hash) {
result.Count++
result.ValueSize += item.Size()
result.Size += item.TotalSize()
}
}
}
return result
}
// 总数量
func (this *List) Count() int64 {
this.locker.RLock()
count := int64(len(this.m))
this.locker.RUnlock()
return count
}
// 添加事件
func (this *List) OnAdd(f func(item *Item)) {
this.onAdd = f
}
// 删除事件
func (this *List) OnRemove(f func(item *Item)) {
this.onRemove = f
}

View File

@@ -0,0 +1,404 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
import (
"database/sql"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/lists"
_ "github.com/mattn/go-sqlite3"
"os"
"strconv"
"sync/atomic"
"time"
)
// FileList 文件缓存列表管理
type FileList struct {
dir string
db *sql.DB
total int64
onAdd func(item *Item)
onRemove func(item *Item)
existsByHashStmt *sql.Stmt // 根据hash检查是否存在
insertStmt *sql.Stmt // 写入数据
selectByHashStmt *sql.Stmt // 使用hash查询数据
deleteByHashStmt *sql.Stmt // 根据hash删除数据
statStmt *sql.Stmt // 统计
purgeStmt *sql.Stmt // 清理
deleteAllStmt *sql.Stmt // 删除所有数据
oldTables []string
itemsTableName string
isClosed bool
}
func NewFileList(dir string) ListInterface {
return &FileList{dir: dir}
}
func (this *FileList) Init() error {
// 检查目录是否存在
_, err := os.Stat(this.dir)
if err != nil {
err = os.MkdirAll(this.dir, 0777)
if err != nil {
return err
}
remotelogs.Println("CACHE", "create cache dir '"+this.dir+"'")
}
this.itemsTableName = "cacheItems_v2"
db, err := sql.Open("sqlite3", "file:"+this.dir+"/index.db?cache=shared&mode=rwc&_journal_mode=WAL")
if err != nil {
return err
}
db.SetMaxOpenConns(1)
this.db = db
// 清除旧表
this.oldTables = []string{
"cacheItems",
}
err = this.removeOldTables()
if err != nil {
remotelogs.Warn("CACHE", "clean old tables failed: "+err.Error())
}
// TODO 耗时过长,暂时不整理数据库
/**_, err = db.Exec("VACUUM")
if err != nil {
return err
}**/
// 创建
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
"key" varchar(1024),
"headerSize" integer DEFAULT 0,
"bodySize" integer DEFAULT 0,
"metaSize" integer DEFAULT 0,
"expiredAt" integer DEFAULT 0,
"createdAt" integer DEFAULT 0,
"host" varchar(128),
"serverId" integer
);
CREATE INDEX IF NOT EXISTS "createdAt"
ON "` + this.itemsTableName + `" (
"createdAt" ASC
);
CREATE INDEX IF NOT EXISTS "expiredAt"
ON "` + this.itemsTableName + `" (
"expiredAt" ASC
);
CREATE UNIQUE INDEX IF NOT EXISTS "hash"
ON "` + this.itemsTableName + `" (
"hash" ASC
);
CREATE INDEX IF NOT EXISTS "serverId"
ON "` + this.itemsTableName + `" (
"serverId" ASC
);
`)
if err != nil {
return err
}
// 读取总数量
row := this.db.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`)
if row.Err() != nil {
return row.Err()
}
var total int64
err = row.Scan(&total)
if err != nil {
return err
}
this.total = total
// 常用语句
this.existsByHashStmt, err = this.db.Prepare(`SELECT "bodySize" FROM "` + this.itemsTableName + `" WHERE "hash"=? AND expiredAt>? LIMIT 1`)
if err != nil {
return err
}
this.insertStmt, err = this.db.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "host", "serverId", "createdAt") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return err
}
this.selectByHashStmt, err = this.db.Prepare(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? LIMIT 1`)
if err != nil {
return err
}
this.deleteByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?`)
if err != nil {
return err
}
this.statStmt, err = this.db.Prepare(`SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM "` + this.itemsTableName + `" WHERE expiredAt>?`)
if err != nil {
return err
}
this.purgeStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" WHERE expiredAt<=? LIMIT ?`)
if err != nil {
return err
}
this.deleteAllStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `"`)
if err != nil {
return err
}
return nil
}
func (this *FileList) Reset() error {
// 不错任何事情
return nil
}
func (this *FileList) Add(hash string, item *Item) error {
if this.isClosed {
return nil
}
_, err := this.insertStmt.Exec(hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.Host, item.ServerId, utils.UnixTime())
if err != nil {
return err
}
atomic.AddInt64(&this.total, 1)
if this.onAdd != nil {
this.onAdd(item)
}
return nil
}
func (this *FileList) Exist(hash string) (bool, error) {
if this.isClosed {
return false, nil
}
rows, err := this.existsByHashStmt.Query(hash, time.Now().Unix())
if err != nil {
return false, err
}
defer func() {
_ = rows.Close()
}()
if rows.Next() {
return true, nil
}
return false, nil
}
// CleanPrefix 清理某个前缀的缓存数据
func (this *FileList) CleanPrefix(prefix string) error {
if this.isClosed {
return nil
}
if len(prefix) == 0 {
return nil
}
var count = int64(10000)
for {
result, err := this.db.Exec(`UPDATE "`+this.itemsTableName+`" SET expiredAt=0 WHERE id IN (SELECT id FROM "`+this.itemsTableName+`" WHERE expiredAt>0 AND createdAt<=? AND INSTR("key", ?)==1 LIMIT `+strconv.FormatInt(count, 10)+`)`, utils.UnixTime(), prefix)
if err != nil {
return err
}
affectedRows, err := result.RowsAffected()
if err != nil {
return err
}
if affectedRows < count {
return nil
}
}
}
func (this *FileList) Remove(hash string) error {
if this.isClosed {
return nil
}
row := this.selectByHashStmt.QueryRow(hash)
if row.Err() != nil {
return row.Err()
}
var item = &Item{Type: ItemTypeFile}
err := row.Scan(&item.Key, &item.HeaderSize, &item.BodySize, &item.MetaSize, &item.ExpiredAt)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return err
}
_, err = this.deleteByHashStmt.Exec(hash)
if err != nil {
return err
}
atomic.AddInt64(&this.total, -1)
if this.onRemove != nil {
this.onRemove(item)
}
return nil
}
// Purge 清理过期的缓存
// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间
// callback 每次发现过期key的调用
func (this *FileList) Purge(count int, callback func(hash string) error) error {
if this.isClosed {
return nil
}
if count <= 0 {
count = 1000
}
rows, err := this.purgeStmt.Query(time.Now().Unix(), count)
if err != nil {
return err
}
defer func() {
_ = rows.Close()
}()
hashStrings := []string{}
for rows.Next() {
var hash string
err = rows.Scan(&hash)
if err != nil {
return err
}
hashStrings = append(hashStrings, hash)
}
// 不在 rows.Next() 循环中操作是为了避免死锁
for _, hash := range hashStrings {
err = this.Remove(hash)
if err != nil {
return err
}
err = callback(hash)
if err != nil {
return err
}
}
return nil
}
func (this *FileList) CleanAll() error {
if this.isClosed {
return nil
}
_, err := this.deleteAllStmt.Exec()
if err != nil {
return err
}
atomic.StoreInt64(&this.total, 0)
return nil
}
func (this *FileList) Stat(check func(hash string) bool) (*Stat, error) {
if this.isClosed {
return &Stat{}, nil
}
// 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些
row := this.statStmt.QueryRow(time.Now().Unix())
if row.Err() != nil {
return nil, row.Err()
}
stat := &Stat{}
err := row.Scan(&stat.Count, &stat.Size, &stat.ValueSize)
if err != nil {
return nil, err
}
return stat, nil
}
// Count 总数量
// 常用的方法,所以避免直接查询数据库
func (this *FileList) Count() (int64, error) {
return atomic.LoadInt64(&this.total), nil
}
// OnAdd 添加事件
func (this *FileList) OnAdd(f func(item *Item)) {
this.onAdd = f
}
// OnRemove 删除事件
func (this *FileList) OnRemove(f func(item *Item)) {
this.onRemove = f
}
func (this *FileList) Close() error {
this.isClosed = true
if this.db != nil {
_ = this.existsByHashStmt.Close()
_ = this.insertStmt.Close()
_ = this.selectByHashStmt.Close()
_ = this.deleteByHashStmt.Close()
_ = this.statStmt.Close()
_ = this.purgeStmt.Close()
_ = this.deleteAllStmt.Close()
return this.db.Close()
}
return nil
}
func (this *FileList) removeOldTables() error {
rows, err := this.db.Query(`SELECT "name" FROM sqlite_master WHERE "type"='table'`)
if err != nil {
return err
}
defer func() {
_ = rows.Close()
}()
for rows.Next() {
var name string
err = rows.Scan(&name)
if err != nil {
return err
}
if lists.ContainsString(this.oldTables, name) {
// 异步执行
go func() {
remotelogs.Println("CACHE", "remove old table '"+name+"' ...")
_, _ = this.db.Exec(`DROP TABLE "` + name + `"`)
remotelogs.Println("CACHE", "remove old table '"+name+"' done")
}()
}
}
return nil
}

View File

@@ -0,0 +1,248 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
import (
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/rands"
stringutil "github.com/iwind/TeaGo/utils/string"
"strconv"
"sync"
"testing"
"time"
)
func TestFileList_Init(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}
func TestFileList_Add(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
err = list.Add(stringutil.Md5("123456"), &Item{
Key: "123456",
ExpiredAt: time.Now().Unix(),
HeaderSize: 1,
MetaSize: 2,
BodySize: 3,
Host: "teaos.cn",
ServerId: 1,
})
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}
func TestFileList_Add_Many(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
before := time.Now()
for i := 0; i < 2000_0000; i++ {
u := "http://edge.teaos.cn/123456" + strconv.Itoa(i)
_ = list.Add(stringutil.Md5(u), &Item{
Key: u,
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1,
MetaSize: 2,
BodySize: 3,
})
if err != nil {
t.Fatal(err)
}
if i > 0 && i%10_000 == 0 {
t.Log(i, int(10000/time.Since(before).Seconds()), "qps")
before = time.Now()
}
}
t.Log("ok")
}
func TestFileList_Exist(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
before := time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
{
exists, err := list.Exist(stringutil.Md5("123456"))
if err != nil {
t.Fatal(err)
}
t.Log("exists:", exists)
}
{
exists, err := list.Exist(stringutil.Md5("http://edge.teaos.cn/1234561"))
if err != nil {
t.Fatal(err)
}
t.Log("exists:", exists)
}
}
func TestFileList_Exist_Many_DB(t *testing.T) {
// 测试在多个数据库下的性能
var listSlice = []ListInterface{}
for i := 1; i <= 10; i++ {
list := NewFileList(Tea.Root + "/data/data" + strconv.Itoa(i))
err := list.Init()
if err != nil {
t.Fatal(err)
}
listSlice = append(listSlice, list)
}
var wg = sync.WaitGroup{}
var threads = 8
wg.Add(threads)
var count = 200_000
var countLocker sync.Mutex
var tasks = make(chan int, count)
for i := 0; i < count; i++ {
tasks <- i
}
var hash = stringutil.Md5("http://edge.teaos.cn/1234561")
before := time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
for i := 0; i < threads; i++ {
go func() {
defer wg.Done()
for {
select {
case <-tasks:
countLocker.Lock()
count--
countLocker.Unlock()
var list = listSlice[rands.Int(0, len(listSlice)-1)]
_, _ = list.Exist(hash)
default:
return
}
}
}()
}
wg.Wait()
t.Log("left:", count)
}
func TestFileList_CleanPrefix(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
before := time.Now()
err = list.CleanPrefix("1234")
if err != nil {
t.Fatal(err)
}
t.Log(time.Since(before).Seconds()*1000, "ms")
}
func TestFileList_Remove(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
list.OnRemove(func(item *Item) {
t.Logf("remove %#v", item)
})
err = list.Remove(stringutil.Md5("123456"))
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}
func TestFileList_Purge(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
err = list.Purge(2, func(hash string) error {
t.Log(hash)
return nil
})
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}
func TestFileList_Stat(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
stat, err := list.Stat(nil)
if err != nil {
t.Fatal(err)
}
t.Log("count:", stat.Count, "size:", stat.Size, "valueSize:", stat.ValueSize)
}
func TestFileList_Count(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
before := time.Now()
count, err := list.Count()
if err != nil {
t.Fatal(err)
}
t.Log("count:", count)
t.Log(time.Since(before).Seconds()*1000, "ms")
}
func TestFileList_CleanAll(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
err = list.CleanAll()
if err != nil {
t.Fatal(err)
}
t.Log("ok")
t.Log(list.Count())
}
func BenchmarkFileList_Exist(b *testing.B) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
b.Fatal(err)
}
for i := 0; i < b.N; i++ {
_, _ = list.Exist("f0eb5b87e0b0041f3917002c0707475f")
}
}

View File

@@ -0,0 +1,44 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
type ListInterface interface {
// Init 初始化
Init() error
// Reset 重置数据
Reset() error
// Add 添加内容
Add(hash string, item *Item) error
// Exist 检查内容是否存在
Exist(hash string) (bool, error)
// CleanPrefix 清除某个前缀的缓存
CleanPrefix(prefix string) error
// Remove 删除内容
Remove(hash string) error
// Purge 清理过期数据
Purge(count int, callback func(hash string) error) error
// CleanAll 清除所有缓存
CleanAll() error
// Stat 统计
Stat(check func(hash string) bool) (*Stat, error)
// Count 总数量
Count() (int64, error)
// OnAdd 添加事件
OnAdd(f func(item *Item))
// OnRemove 删除事件
OnRemove(f func(item *Item))
// Close 关闭
Close() error
}

View File

@@ -0,0 +1,254 @@
package caches
import (
"github.com/iwind/TeaGo/logs"
"strconv"
"strings"
"sync"
"testing"
)
// MemoryList 内存缓存列表管理
type MemoryList struct {
itemMaps map[string]map[string]*Item // prefix => { hash => item }
prefixes []string
locker sync.RWMutex
onAdd func(item *Item)
onRemove func(item *Item)
purgeIndex int
}
func NewMemoryList() ListInterface {
return &MemoryList{
itemMaps: map[string]map[string]*Item{},
}
}
func (this *MemoryList) Init() error {
this.prefixes = []string{"000"}
for i := 100; i <= 999; i++ {
this.prefixes = append(this.prefixes, strconv.Itoa(i))
}
for _, prefix := range this.prefixes {
this.itemMaps[prefix] = map[string]*Item{}
}
return nil
}
func (this *MemoryList) Reset() error {
this.locker.Lock()
for key := range this.itemMaps {
this.itemMaps[key] = map[string]*Item{}
}
this.locker.Unlock()
return nil
}
func (this *MemoryList) Add(hash string, item *Item) error {
this.locker.Lock()
prefix := this.prefix(hash)
itemMap, ok := this.itemMaps[prefix]
if !ok {
itemMap = map[string]*Item{}
this.itemMaps[prefix] = itemMap
}
// 先删除,为了可以正确触发统计
oldItem, ok := itemMap[hash]
if ok {
if this.onRemove != nil {
this.onRemove(oldItem)
}
}
// 添加
if this.onAdd != nil {
this.onAdd(item)
}
itemMap[hash] = item
this.locker.Unlock()
return nil
}
func (this *MemoryList) Exist(hash string) (bool, error) {
this.locker.RLock()
defer this.locker.RUnlock()
prefix := this.prefix(hash)
itemMap, ok := this.itemMaps[prefix]
if !ok {
return false, nil
}
item, ok := itemMap[hash]
if !ok {
return false, nil
}
return !item.IsExpired(), nil
}
// CleanPrefix 根据前缀进行清除
func (this *MemoryList) CleanPrefix(prefix string) error {
this.locker.RLock()
defer this.locker.RUnlock()
// TODO 需要优化性能支持千万级数据低于1s的处理速度
for _, itemMap := range this.itemMaps {
for _, item := range itemMap {
if strings.HasPrefix(item.Key, prefix) {
item.ExpiredAt = 0
}
}
}
return nil
}
func (this *MemoryList) Remove(hash string) error {
this.locker.Lock()
itemMap, ok := this.itemMaps[this.prefix(hash)]
if !ok {
this.locker.Unlock()
return nil
}
item, ok := itemMap[hash]
if ok {
if this.onRemove != nil {
this.onRemove(item)
}
delete(itemMap, hash)
}
this.locker.Unlock()
return nil
}
// Purge 清理过期的缓存
// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间
// callback 每次发现过期key的调用
func (this *MemoryList) Purge(count int, callback func(hash string) error) error {
this.locker.Lock()
deletedHashList := []string{}
if this.purgeIndex >= len(this.prefixes) {
this.purgeIndex = 0
}
prefix := this.prefixes[this.purgeIndex]
this.purgeIndex++
itemMap, ok := this.itemMaps[prefix]
if !ok {
this.locker.Unlock()
return nil
}
for hash, item := range itemMap {
if count <= 0 {
break
}
if item.IsExpired() {
if this.onRemove != nil {
this.onRemove(item)
}
delete(itemMap, hash)
deletedHashList = append(deletedHashList, hash)
}
count--
}
this.locker.Unlock()
// 执行外部操作
for _, hash := range deletedHashList {
if callback != nil {
err := callback(hash)
if err != nil {
return err
}
}
}
return nil
}
func (this *MemoryList) CleanAll() error {
return this.Reset()
}
func (this *MemoryList) Stat(check func(hash string) bool) (*Stat, error) {
this.locker.RLock()
defer this.locker.RUnlock()
result := &Stat{
Count: 0,
Size: 0,
}
for _, itemMap := range this.itemMaps {
for hash, item := range itemMap {
if !item.IsExpired() {
// 检查文件是否存在、内容是否正确等
if check != nil && check(hash) {
result.Count++
result.ValueSize += item.Size()
result.Size += item.TotalSize()
}
}
}
}
return result, nil
}
// Count 总数量
func (this *MemoryList) Count() (int64, error) {
this.locker.RLock()
var count = 0
for _, itemMap := range this.itemMaps {
count += len(itemMap)
}
this.locker.RUnlock()
return int64(count), nil
}
// OnAdd 添加事件
func (this *MemoryList) OnAdd(f func(item *Item)) {
this.onAdd = f
}
// OnRemove 删除事件
func (this *MemoryList) OnRemove(f func(item *Item)) {
this.onRemove = f
}
func (this *MemoryList) Close() error {
return nil
}
func (this *MemoryList) print(t *testing.T) {
this.locker.Lock()
for _, itemMap := range this.itemMaps {
if len(itemMap) > 0 {
logs.PrintAsJSON(itemMap, t)
}
}
this.locker.Unlock()
}
func (this *MemoryList) prefix(hash string) string {
var prefix string
if len(hash) > 3 {
prefix = hash[:3]
} else {
prefix = hash
}
_, ok := this.itemMaps[prefix]
if !ok {
prefix = "000"
}
return prefix
}

View File

@@ -0,0 +1,190 @@
package caches
import (
"fmt"
"github.com/cespare/xxhash"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/rands"
"math/rand"
"strconv"
"testing"
"time"
)
func TestMemoryList_Add(t *testing.T) {
list := NewMemoryList().(*MemoryList)
_ = list.Init()
_ = list.Add("a", &Item{
Key: "a1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
_ = list.Add("b", &Item{
Key: "b1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
_ = list.Add("123456", &Item{
Key: "c1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
t.Log(list.prefixes)
logs.PrintAsJSON(list.itemMaps, t)
}
func TestMemoryList_Remove(t *testing.T) {
list := NewMemoryList().(*MemoryList)
_ = list.Init()
_ = list.Add("a", &Item{
Key: "a1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
_ = list.Add("b", &Item{
Key: "b1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
_ = list.Remove("b")
list.print(t)
}
func TestMemoryList_Purge(t *testing.T) {
list := NewMemoryList().(*MemoryList)
_ = list.Init()
_ = list.Add("a", &Item{
Key: "a1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
_ = list.Add("b", &Item{
Key: "b1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
_ = list.Add("c", &Item{
Key: "c1",
ExpiredAt: time.Now().Unix() - 3600,
HeaderSize: 1024,
})
_ = list.Add("d", &Item{
Key: "d1",
ExpiredAt: time.Now().Unix() - 2,
HeaderSize: 1024,
})
_ = list.Purge(100, func(hash string) error {
t.Log("delete:", hash)
return nil
})
list.print(t)
for i := 0; i < 1000; i++ {
_ = list.Purge(100, func(hash string) error {
t.Log("delete:", hash)
return nil
})
t.Log(list.purgeIndex)
}
}
func TestMemoryList_Purge_Large_List(t *testing.T) {
list := NewMemoryList().(*MemoryList)
_ = list.Init()
for i := 0; i < 1_000_000; i++ {
_ = list.Add("a"+strconv.Itoa(i), &Item{
Key: "a" + strconv.Itoa(i),
ExpiredAt: time.Now().Unix() + int64(rands.Int(0, 24*3600)),
HeaderSize: 1024,
})
}
time.Sleep(1 * time.Hour)
}
func TestMemoryList_Stat(t *testing.T) {
list := NewMemoryList()
_ = list.Init()
_ = list.Add("a", &Item{
Key: "a1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
_ = list.Add("b", &Item{
Key: "b1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
_ = list.Add("c", &Item{
Key: "c1",
ExpiredAt: time.Now().Unix(),
HeaderSize: 1024,
})
_ = list.Add("d", &Item{
Key: "d1",
ExpiredAt: time.Now().Unix() - 2,
HeaderSize: 1024,
})
result, _ := list.Stat(func(hash string) bool {
// 随机测试
rand.Seed(time.Now().UnixNano())
return rand.Int()%2 == 0
})
t.Log(result)
}
func TestMemoryList_CleanPrefix(t *testing.T) {
list := NewMemoryList()
_ = list.Init()
before := time.Now()
for i := 0; i < 1_000_000; i++ {
key := "http://www.teaos.cn/hello/" + strconv.Itoa(i/10000) + "/" + strconv.Itoa(i) + ".html"
_ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{
Key: key,
ExpiredAt: time.Now().Unix() + 3600,
BodySize: 0,
HeaderSize: 0,
})
}
t.Log(time.Since(before).Seconds()*1000, "ms")
before = time.Now()
err := list.CleanPrefix("http://www.teaos.cn/hello/10")
if err != nil {
t.Fatal(err)
}
logs.Println(list.Stat(func(hash string) bool {
return true
}))
t.Log(time.Since(before).Seconds()*1000, "ms")
}
func TestMemoryList_GC(t *testing.T) {
list := NewMemoryList().(*MemoryList)
_ = list.Init()
for i := 0; i < 1_000_000; i++ {
key := "http://www.teaos.cn/hello" + strconv.Itoa(i/100000) + "/" + strconv.Itoa(i) + ".html"
_ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{
Key: key,
ExpiredAt: 0,
BodySize: 0,
HeaderSize: 0,
})
}
time.Sleep(10 * time.Second)
t.Log("clean...", len(list.itemMaps))
_ = list.CleanAll()
t.Log("cleanAll...", len(list.itemMaps))
before := time.Now()
//runtime.GC()
t.Log("gc cost:", time.Since(before).Seconds()*1000, "ms")
timeout := time.NewTimer(2 * time.Minute)
<-timeout.C
t.Log("2 minutes passed")
time.Sleep(30 * time.Minute)
}

View File

@@ -1,119 +0,0 @@
package caches
import (
"fmt"
"github.com/cespare/xxhash"
"math/rand"
"strconv"
"testing"
"time"
)
func TestList_Add(t *testing.T) {
list := NewList()
list.Add("a", &Item{
Key: "a1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
list.Add("b", &Item{
Key: "b1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
t.Log(list.m)
}
func TestList_Remove(t *testing.T) {
list := NewList()
list.Add("a", &Item{
Key: "a1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
list.Add("b", &Item{
Key: "b1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
list.Remove("b")
t.Log(list.m)
}
func TestList_Purge(t *testing.T) {
list := NewList()
list.Add("a", &Item{
Key: "a1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
list.Add("b", &Item{
Key: "b1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
list.Add("c", &Item{
Key: "c1",
ExpiredAt: time.Now().Unix() - 3600,
HeaderSize: 1024,
})
list.Add("d", &Item{
Key: "d1",
ExpiredAt: time.Now().Unix() - 2,
HeaderSize: 1024,
})
list.Purge(100, func(hash string) {
t.Log("delete:", hash)
})
t.Log(list.m)
}
func TestList_Stat(t *testing.T) {
list := NewList()
list.Add("a", &Item{
Key: "a1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
list.Add("b", &Item{
Key: "b1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
list.Add("c", &Item{
Key: "c1",
ExpiredAt: time.Now().Unix(),
HeaderSize: 1024,
})
list.Add("d", &Item{
Key: "d1",
ExpiredAt: time.Now().Unix() - 2,
HeaderSize: 1024,
})
result := list.Stat(func(hash string) bool {
// 随机测试
rand.Seed(time.Now().UnixNano())
return rand.Int()%2 == 0
})
t.Log(result)
}
func TestList_FindKeysWithPrefix(t *testing.T) {
list := NewList()
before := time.Now()
for i := 0; i < 1_000_000; i++ {
key := "http://www.teaos.cn/hello" + strconv.Itoa(i/100000) + "/" + strconv.Itoa(i) + ".html"
list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{
Key: key,
ExpiredAt: 0,
BodySize: 0,
HeaderSize: 0,
})
}
t.Log(time.Since(before).Seconds()*1000, "ms")
before = time.Now()
keys := list.FindKeysWithPrefix("http://www.teaos.cn/hello/5000")
t.Log(len(keys))
t.Log(time.Since(before).Seconds()*1000, "ms")
}

View File

@@ -3,27 +3,33 @@ package caches
type ReaderFunc func(n int) (goNext bool, err error)
type Reader interface {
// 初始化
// Init 初始化
Init() error
// 状态码
// TypeName 类型名称
TypeName() string
// Status 状态码
Status() int
// 读取Header
// LastModified 最后修改时间
LastModified() int64
// ReadHeader 读取Header
ReadHeader(buf []byte, callback ReaderFunc) error
// 读取Body
// ReadBody 读取Body
ReadBody(buf []byte, callback ReaderFunc) error
// 读取某个范围内的Body
// ReadBodyRange 读取某个范围内的Body
ReadBodyRange(buf []byte, start int64, end int64, callback ReaderFunc) error
// Header Size
// HeaderSize Header Size
HeaderSize() int64
// Body Size
// BodySize Body Size
BodySize() int64
// 关闭
// Close 关闭
Close() error
}

View File

@@ -106,10 +106,22 @@ func (this *FileReader) Init() error {
return nil
}
func (this *FileReader) TypeName() string {
return "disk"
}
func (this *FileReader) Status() int {
return this.status
}
func (this *FileReader) LastModified() int64 {
stat, err := this.fp.Stat()
if err != nil {
return 0
}
return stat.ModTime().Unix()
}
func (this *FileReader) HeaderSize() int64 {
return int64(this.headerSize)
}

View File

@@ -16,10 +16,18 @@ func (this *MemoryReader) Init() error {
return nil
}
func (this *MemoryReader) TypeName() string {
return "memory"
}
func (this *MemoryReader) Status() int {
return this.item.Status
}
func (this *MemoryReader) LastModified() int64 {
return this.item.ModifiedAt
}
func (this *MemoryReader) HeaderSize() int64 {
return int64(len(this.item.HeaderValue))
}

View File

@@ -12,6 +12,8 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/Tea"
stringutil "github.com/iwind/TeaGo/utils/string"
"golang.org/x/text/language"
"golang.org/x/text/message"
"io"
"os"
"path/filepath"
@@ -34,12 +36,6 @@ const (
SizeMeta = SizeExpiresAt + SizeStatus + SizeURLLength + SizeHeaderLength + SizeBodyLength
)
var (
ErrNotFound = errors.New("cache not found")
ErrFileIsWriting = errors.New("the file is writing")
ErrInvalidRange = errors.New("invalid range")
)
// FileStorage 文件缓存
// 文件结构:
// [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data]
@@ -49,15 +45,16 @@ type FileStorage struct {
memoryStorage *MemoryStorage // 一级缓存
totalSize int64
list *List
locker sync.RWMutex
ticker *utils.Ticker
list ListInterface
writingKeyMap map[string]bool // key => bool
locker sync.RWMutex
ticker *utils.Ticker
}
func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage {
return &FileStorage{
policy: policy,
list: NewList(),
policy: policy,
writingKeyMap: map[string]bool{},
}
}
@@ -68,41 +65,10 @@ func (this *FileStorage) Policy() *serverconfigs.HTTPCachePolicy {
// Init 初始化
func (this *FileStorage) Init() error {
this.list.OnAdd(func(item *Item) {
atomic.AddInt64(&this.totalSize, item.TotalSize())
})
this.list.OnRemove(func(item *Item) {
atomic.AddInt64(&this.totalSize, -item.TotalSize())
})
this.locker.Lock()
defer this.locker.Unlock()
before := time.Now()
cacheDir := ""
defer func() {
// 统计
count := 0
size := int64(0)
if this.list != nil {
stat := this.list.Stat(func(hash string) bool {
return true
})
count = stat.Count
size = stat.Size
}
cost := time.Since(before).Seconds() * 1000
sizeMB := strconv.FormatInt(size, 10) + " Bytes"
if size > 1024*1024*1024 {
sizeMB = fmt.Sprintf("%.3f G", float64(size)/1024/1024/1024)
} else if size > 1024*1024 {
sizeMB = fmt.Sprintf("%.3f M", float64(size)/1024/1024)
} else if size > 1024 {
sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024)
}
remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+cacheDir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+strconv.Itoa(count)+", size: "+sizeMB)
}()
// 配置
cacheConfig := &serverconfigs.HTTPFileCacheStorage{}
@@ -115,7 +81,7 @@ func (this *FileStorage) Init() error {
return err
}
this.cacheConfig = cacheConfig
cacheDir = cacheConfig.Dir
cacheDir := cacheConfig.Dir
if !filepath.IsAbs(this.cacheConfig.Dir) {
this.cacheConfig.Dir = Tea.Root + Tea.DS + this.cacheConfig.Dir
@@ -127,6 +93,26 @@ func (this *FileStorage) Init() error {
return errors.New("[CACHE]cache storage dir can not be empty")
}
list := NewFileList(dir + "/p" + strconv.FormatInt(this.policy.Id, 10))
err = list.Init()
if err != nil {
return err
}
this.list = list
stat, err := list.Stat(func(hash string) bool {
return true
})
if err != nil {
return err
}
this.totalSize = stat.Size
this.list.OnAdd(func(item *Item) {
atomic.AddInt64(&this.totalSize, item.TotalSize())
})
this.list.OnRemove(func(item *Item) {
atomic.AddInt64(&this.totalSize, -item.TotalSize())
})
// 检查目录是否存在
_, err = os.Stat(dir)
if err != nil {
@@ -140,6 +126,23 @@ func (this *FileStorage) Init() error {
}
}
defer func() {
// 统计
count := stat.Count
size := stat.Size
cost := time.Since(before).Seconds() * 1000
sizeMB := strconv.FormatInt(size, 10) + " Bytes"
if size > 1024*1024*1024 {
sizeMB = fmt.Sprintf("%.3f G", float64(size)/1024/1024/1024)
} else if size > 1024*1024 {
sizeMB = fmt.Sprintf("%.3f M", float64(size)/1024/1024)
} else if size > 1024 {
sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024)
}
remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+cacheDir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+message.NewPrinter(language.English).Sprintf("%d", count)+", size: "+sizeMB)
}()
// 初始化list
err = this.initList()
if err != nil {
@@ -189,11 +192,9 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
}
hash, path := this.keyPath(key)
if !this.list.Exist(hash) {
return nil, ErrNotFound
}
// TODO 尝试使用mmap加快读取速度
var isOk = false
fp, err := os.OpenFile(path, os.O_RDONLY, 0444)
if err != nil {
if !os.IsNotExist(err) {
@@ -201,6 +202,21 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
}
return nil, ErrNotFound
}
defer func() {
if !isOk {
_ = fp.Close()
_ = os.Remove(path)
}
}()
// 检查文件记录是否已过期
exists, err := this.list.Exist(hash)
if err != nil {
return nil, err
}
if !exists {
return nil, ErrNotFound
}
reader := NewFileReader(fp)
if err != nil {
@@ -210,6 +226,8 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
if err != nil {
return nil, err
}
isOk = true
return reader, nil
}
@@ -223,18 +241,41 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
}
}
// 是否正在写入
var isWriting = false
this.locker.Lock()
_, ok := this.writingKeyMap[key]
this.locker.Unlock()
if ok {
return nil, ErrFileIsWriting
}
this.locker.Lock()
this.writingKeyMap[key] = true
this.locker.Unlock()
defer func() {
if !isWriting {
this.locker.Lock()
delete(this.writingKeyMap, key)
this.locker.Unlock()
}
}()
// 检查是否超出最大值
if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys {
return nil, errors.New("write file cache failed: too many keys in cache storage")
count, err := this.list.Count()
if err != nil {
return nil, err
}
if this.policy.MaxKeys > 0 && count > this.policy.MaxKeys {
return nil, NewCapacityError("write file cache failed: too many keys in cache storage")
}
capacityBytes := this.diskCapacityBytes()
if capacityBytes > 0 && capacityBytes <= this.totalSize {
return nil, errors.New("write file cache failed: over disk size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
return nil, NewCapacityError("write file cache failed: over disk size, current total size: " + strconv.FormatInt(this.totalSize, 10) + " bytes, capacity: " + strconv.FormatInt(capacityBytes, 10))
}
hash := stringutil.Md5(key)
dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
_, err := os.Stat(dir)
_, err = os.Stat(dir)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
@@ -246,13 +287,17 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
}
// 先删除
this.list.Remove(hash)
err = this.list.Remove(hash)
if err != nil {
return nil, err
}
path := dir + "/" + hash + ".cache.tmp"
writer, err := os.OpenFile(path, os.O_CREATE|os.O_SYNC|os.O_WRONLY, 0666)
if err != nil {
return nil, err
}
isWriting = true
isOk := false
removeOnFailure := true
@@ -337,7 +382,11 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
isOk = true
return NewFileWriter(writer, key, expiredAt), nil
return NewFileWriter(writer, key, expiredAt, func() {
this.locker.Lock()
delete(this.writingKeyMap, key)
this.locker.Unlock()
}), nil
}
// AddToList 添加到List
@@ -351,7 +400,10 @@ func (this *FileStorage) AddToList(item *Item) {
item.MetaSize = SizeMeta
hash := stringutil.Md5(item.Key)
this.list.Add(hash, item)
err := this.list.Add(hash, item)
if err != nil && !strings.Contains(err.Error(), "UNIQUE constraint failed") {
remotelogs.Error("CACHE", "add to list failed: "+err.Error())
}
}
// Delete 删除某个键值对应的缓存
@@ -365,8 +417,11 @@ func (this *FileStorage) Delete(key string) error {
}
hash, path := this.keyPath(key)
this.list.Remove(hash)
err := os.Remove(path)
err := this.list.Remove(hash)
if err != nil {
return err
}
err = os.Remove(path)
if err == nil || os.IsNotExist(err) {
return nil
}
@@ -380,7 +435,7 @@ func (this *FileStorage) Stat() (*Stat, error) {
return this.list.Stat(func(hash string) bool {
return true
}), nil
})
}
// CleanAll 清除所有的缓存
@@ -393,7 +448,10 @@ func (this *FileStorage) CleanAll() error {
_ = this.memoryStorage.CleanAll()
}
this.list.Reset()
err := this.list.CleanAll()
if err != nil {
return err
}
// 删除缓存和目录
// 不能直接删除子目录,比较危险
@@ -415,6 +473,7 @@ func (this *FileStorage) CleanAll() error {
return nil
}
// 改成待删除
subDirs, err := fp.Readdir(-1)
if err != nil {
return err
@@ -431,7 +490,33 @@ func (this *FileStorage) CleanAll() error {
continue
}
// 删除目录
// 修改目录
tmpDir := dir + "/" + subDir + "-deleted"
err = os.Rename(dir+"/"+subDir, tmpDir)
if err != nil {
return err
}
}
// 重新遍历待删除
fp2, err := os.Open(dir)
if err != nil {
return err
}
defer func() {
_ = fp2.Close()
}()
subDirs, err = fp2.Readdir(-1)
if err != nil {
return err
}
for _, info := range subDirs {
subDir := info.Name()
if !strings.HasSuffix(subDir, "-deleted") {
continue
}
// 删除
err = os.RemoveAll(dir + "/" + subDir)
if err != nil {
return err
@@ -453,29 +538,25 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
// 目录
if urlType == "dir" {
resultKeys := []string{}
for _, key := range keys {
resultKeys = append(resultKeys, this.list.FindKeysWithPrefix(key)...)
err := this.list.CleanPrefix(key)
if err != nil {
return err
}
}
keys = resultKeys
}
// 文件
for _, key := range keys {
hash, path := this.keyPath(key)
if !this.list.Exist(hash) {
err := os.Remove(path)
if err != nil && !os.IsNotExist(err) {
return err
}
continue
}
err := os.Remove(path)
if err != nil && !os.IsNotExist(err) {
return err
}
this.list.Remove(hash)
err = this.list.Remove(hash)
if err != nil {
return err
}
}
return nil
}
@@ -490,10 +571,12 @@ func (this *FileStorage) Stop() {
this.memoryStorage.Stop()
}
this.list.Reset()
_ = this.list.Reset()
if this.ticker != nil {
this.ticker.Stop()
}
_ = this.list.Close()
}
// TotalDiskSize 消耗的磁盘尺寸
@@ -534,45 +617,23 @@ func (this *FileStorage) hashPath(hash string) (path string) {
// 初始化List
func (this *FileStorage) initList() error {
this.list.Reset()
dir := this.dir()
// 清除tmp
files, err := filepath.Glob(dir + "/*/*/*.cache.tmp")
err := this.list.Reset()
if err != nil {
return err
}
for _, path := range files {
_ = os.Remove(path)
}
// 加载缓存
files, err = filepath.Glob(dir + "/*/*/*.cache")
if err != nil {
return err
}
for _, path := range files {
basename := filepath.Base(path)
index := strings.LastIndex(basename, ".")
if index < 0 {
continue
}
hash := basename[:index]
// 使用异步防止阻塞主线程
go func() {
dir := this.dir()
// 解析文件信息
item, err := this.decodeFile(path)
if err != nil {
if err != ErrNotFound {
remotelogs.Error("CACHE", "decode path '"+path+"': "+err.Error())
// 清除tmp
files, err := filepath.Glob(dir + "/*/*/*.cache.tmp")
if err == nil && len(files) > 0 {
for _, path := range files {
_ = os.Remove(path)
}
continue
}
if item == nil {
continue
}
this.list.Add(hash, item)
}
}()
// 启动定时清理任务
this.ticker = utils.NewTicker(30 * time.Second)
@@ -686,13 +747,17 @@ func (this *FileStorage) decodeFile(path string) (*Item, error) {
// 清理任务
func (this *FileStorage) purgeLoop() {
this.list.Purge(1000, func(hash string) {
err := this.list.Purge(1000, func(hash string) error {
path := this.hashPath(hash)
err := os.Remove(path)
if err != nil && !os.IsNotExist(err) {
remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error())
}
return nil
})
if err != nil {
remotelogs.Warn("CACHE", "purge file storage failed: " + err.Error())
}
}
func (this *FileStorage) readToBuff(fp *os.File, buf []byte) (ok bool, err error) {

View File

@@ -40,7 +40,7 @@ func TestFileStorage_Init(t *testing.T) {
time.Sleep(2 * time.Second)
storage.purgeLoop()
t.Log(len(storage.list.m), "entries left")
t.Log(storage.list.(*FileList).total, "entries left")
}
func TestFileStorage_OpenWriter(t *testing.T) {
@@ -441,14 +441,16 @@ func TestFileStorage_CleanAll(t *testing.T) {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
t.Log("before:", storage.list.m)
c, _ := storage.list.Count()
t.Log("before:", c)
err = storage.CleanAll()
if err != nil {
t.Fatal(err)
}
t.Log("after:", storage.list.m)
c, _ = storage.list.Count()
t.Log("after:", c)
t.Log("ok")
}

View File

@@ -3,7 +3,7 @@ package caches
import (
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/errors"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/cespare/xxhash"
"strconv"
@@ -18,38 +18,47 @@ type MemoryItem struct {
BodyValue []byte
Status int
IsDone bool
ModifiedAt int64
}
func (this *MemoryItem) IsExpired() bool {
return this.ExpiredAt < utils.UnixTime()
}
type MemoryStorage struct {
policy *serverconfigs.HTTPCachePolicy
list *List
list ListInterface
locker *sync.RWMutex
valuesMap map[uint64]*MemoryItem
ticker *utils.Ticker
purgeDuration time.Duration
totalSize int64
writingKeyMap map[string]bool // key => bool
}
func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage {
return &MemoryStorage{
policy: policy,
list: NewList(),
locker: &sync.RWMutex{},
valuesMap: map[uint64]*MemoryItem{},
policy: policy,
list: NewMemoryList(),
locker: &sync.RWMutex{},
valuesMap: map[uint64]*MemoryItem{},
writingKeyMap: map[string]bool{},
}
}
// Init 初始化
func (this *MemoryStorage) Init() error {
_ = this.list.Init()
this.list.OnAdd(func(item *Item) {
atomic.AddInt64(&this.totalSize, item.Size())
atomic.AddInt64(&this.totalSize, item.TotalSize())
})
this.list.OnRemove(func(item *Item) {
atomic.AddInt64(&this.totalSize, -item.Size())
atomic.AddInt64(&this.totalSize, -item.TotalSize())
})
if this.purgeDuration <= 0 {
this.purgeDuration = 30 * time.Second
this.purgeDuration = 10 * time.Second
}
// 启动定时清理任务
@@ -91,22 +100,54 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) {
// OpenWriter 打开缓存写入器等待写入
func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
this.locker.Lock()
defer this.locker.Unlock()
// 是否正在写入
var isWriting = false
_, ok := this.writingKeyMap[key]
if ok {
return nil, ErrFileIsWriting
}
this.writingKeyMap[key] = true
defer func() {
if !isWriting {
delete(this.writingKeyMap, key)
}
}()
// 检查是否过期
hash := this.hash(key)
item, ok := this.valuesMap[hash]
if ok && !item.IsExpired() {
return nil, ErrFileIsWriting
}
// 检查是否超出最大值
if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys {
return nil, errors.New("write memory cache failed: too many keys in cache storage")
totalKeys, err := this.list.Count()
if err != nil {
return nil, err
}
if this.policy.MaxKeys > 0 && totalKeys > this.policy.MaxKeys {
return nil, NewCapacityError("write memory cache failed: too many keys in cache storage")
}
capacityBytes := this.memoryCapacityBytes()
if capacityBytes > 0 && capacityBytes <= this.totalSize {
return nil, errors.New("write memory cache failed: over memory size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
return nil, NewCapacityError("write memory cache failed: over memory size: " + strconv.FormatInt(capacityBytes, 10) + ", current size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
}
// 先删除
err := this.Delete(key)
err = this.deleteWithoutKey(key)
if err != nil {
return nil, err
}
return NewMemoryWriter(this.valuesMap, key, expiredAt, status, this.locker), nil
isWriting = true
return NewMemoryWriter(this.valuesMap, key, expiredAt, status, this.locker, func() {
this.locker.Lock()
delete(this.writingKeyMap, key)
this.locker.Unlock()
}), nil
}
// Delete 删除某个键值对应的缓存
@@ -114,7 +155,7 @@ func (this *MemoryStorage) Delete(key string) error {
hash := this.hash(key)
this.locker.Lock()
delete(this.valuesMap, hash)
this.list.Remove(fmt.Sprintf("%d", hash))
_ = this.list.Remove(fmt.Sprintf("%d", hash))
this.locker.Unlock()
return nil
}
@@ -126,14 +167,14 @@ func (this *MemoryStorage) Stat() (*Stat, error) {
return this.list.Stat(func(hash string) bool {
return true
}), nil
})
}
// CleanAll 清除所有缓存
func (this *MemoryStorage) CleanAll() error {
this.locker.Lock()
this.valuesMap = map[uint64]*MemoryItem{}
this.list.Reset()
_ = this.list.Reset()
atomic.StoreInt64(&this.totalSize, 0)
this.locker.Unlock()
return nil
@@ -143,11 +184,12 @@ func (this *MemoryStorage) CleanAll() error {
func (this *MemoryStorage) Purge(keys []string, urlType string) error {
// 目录
if urlType == "dir" {
resultKeys := []string{}
for _, key := range keys {
resultKeys = append(resultKeys, this.list.FindKeysWithPrefix(key)...)
err := this.list.CleanPrefix(key)
if err != nil {
return err
}
}
keys = resultKeys
}
for _, key := range keys {
@@ -162,13 +204,19 @@ func (this *MemoryStorage) Purge(keys []string, urlType string) error {
// Stop 停止缓存策略
func (this *MemoryStorage) Stop() {
this.locker.Lock()
defer this.locker.Unlock()
this.valuesMap = map[uint64]*MemoryItem{}
this.list.Reset()
this.writingKeyMap = map[string]bool{}
_ = this.list.Reset()
if this.ticker != nil {
this.ticker.Stop()
}
_ = this.list.Close()
this.locker.Unlock()
remotelogs.Println("CACHE", "close memory storage '"+strconv.FormatInt(this.policy.Id, 10)+"'")
}
// Policy 获取当前存储的Policy
@@ -180,7 +228,7 @@ func (this *MemoryStorage) Policy() *serverconfigs.HTTPCachePolicy {
func (this *MemoryStorage) AddToList(item *Item) {
item.MetaSize = int64(len(item.Key)) + 32 /** 32是我们评估的数据结构的长度 **/
hash := fmt.Sprintf("%d", this.hash(item.Key))
this.list.Add(hash, item)
_ = this.list.Add(hash, item)
}
// TotalDiskSize 消耗的磁盘尺寸
@@ -200,13 +248,14 @@ func (this *MemoryStorage) hash(key string) uint64 {
// 清理任务
func (this *MemoryStorage) purgeLoop() {
this.list.Purge(2048, func(hash string) {
_ = this.list.Purge(2048, func(hash string) error {
uintHash, err := strconv.ParseUint(hash, 10, 64)
if err == nil {
this.locker.Lock()
delete(this.valuesMap, uintHash)
this.locker.Unlock()
}
return nil
})
}
@@ -226,3 +275,10 @@ func (this *MemoryStorage) memoryCapacityBytes() int64 {
}
return c1
}
func (this *MemoryStorage) deleteWithoutKey(key string) error {
hash := this.hash(key)
delete(this.valuesMap, hash)
_ = this.list.Remove(fmt.Sprintf("%d", hash))
return nil
}

View File

@@ -174,7 +174,8 @@ func TestMemoryStorage_CleanAll(t *testing.T) {
if err != nil {
t.Fatal(err)
}
t.Log(storage.list.Count(), len(storage.valuesMap))
total, _ := storage.list.Count()
t.Log(total, len(storage.valuesMap))
}
func TestMemoryStorage_Purge(t *testing.T) {
@@ -208,7 +209,8 @@ func TestMemoryStorage_Purge(t *testing.T) {
if err != nil {
t.Fatal(err)
}
t.Log(storage.list.Count(), len(storage.valuesMap))
total, _ := storage.list.Count()
t.Log(total, len(storage.valuesMap))
}
func TestMemoryStorage_Expire(t *testing.T) {

View File

@@ -14,17 +14,19 @@ type FileWriter struct {
headerSize int64
bodySize int64
expiredAt int64
endFunc func()
}
func NewFileWriter(rawWriter *os.File, key string, expiredAt int64) *FileWriter {
func NewFileWriter(rawWriter *os.File, key string, expiredAt int64, endFunc func()) *FileWriter {
return &FileWriter{
key: key,
rawWriter: rawWriter,
expiredAt: expiredAt,
endFunc: endFunc,
}
}
// 写入数据
// WriteHeader 写入数据
func (this *FileWriter) WriteHeader(data []byte) (n int, err error) {
n, err = this.rawWriter.Write(data)
this.headerSize += int64(n)
@@ -34,7 +36,7 @@ func (this *FileWriter) WriteHeader(data []byte) (n int, err error) {
return
}
// 写入Header长度数据
// WriteHeaderLength 写入Header长度数据
func (this *FileWriter) WriteHeaderLength(headerLength int) error {
bytes4 := make([]byte, 4)
binary.BigEndian.PutUint32(bytes4, uint32(headerLength))
@@ -51,7 +53,7 @@ func (this *FileWriter) WriteHeaderLength(headerLength int) error {
return nil
}
// 写入数据
// Write 写入数据
func (this *FileWriter) Write(data []byte) (n int, err error) {
n, err = this.rawWriter.Write(data)
this.bodySize += int64(n)
@@ -61,7 +63,7 @@ func (this *FileWriter) Write(data []byte) (n int, err error) {
return
}
// 写入Body长度数据
// WriteBodyLength 写入Body长度数据
func (this *FileWriter) WriteBodyLength(bodyLength int64) error {
bytes8 := make([]byte, 8)
binary.BigEndian.PutUint64(bytes8, uint64(bodyLength))
@@ -78,8 +80,10 @@ func (this *FileWriter) WriteBodyLength(bodyLength int64) error {
return nil
}
// 关闭
// Close 关闭
func (this *FileWriter) Close() error {
defer this.endFunc()
err := this.WriteHeaderLength(types.Int(this.headerSize))
if err != nil {
return err
@@ -103,8 +107,10 @@ func (this *FileWriter) Close() error {
return err
}
// 丢弃
// Discard 丢弃
func (this *FileWriter) Discard() error {
defer this.endFunc()
_ = this.rawWriter.Close()
err := os.Remove(this.rawWriter.Name())
@@ -127,7 +133,7 @@ func (this *FileWriter) Key() string {
return this.key
}
// 内容类型
// ItemType 获取内容类型
func (this *FileWriter) ItemType() ItemType {
return ItemTypeFile
}

View File

@@ -3,6 +3,7 @@ package caches
import (
"github.com/cespare/xxhash"
"sync"
"time"
)
type MemoryWriter struct {
@@ -14,52 +15,59 @@ type MemoryWriter struct {
bodySize int64
status int
hash uint64
item *MemoryItem
hash uint64
item *MemoryItem
endFunc func()
}
func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, status int, locker *sync.RWMutex) *MemoryWriter {
func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, status int, locker *sync.RWMutex, endFunc func()) *MemoryWriter {
w := &MemoryWriter{
m: m,
key: key,
expiredAt: expiredAt,
locker: locker,
item: &MemoryItem{
ExpiredAt: expiredAt,
Status: status,
ExpiredAt: expiredAt,
ModifiedAt: time.Now().Unix(),
Status: status,
},
status: status,
status: status,
endFunc: endFunc,
}
w.hash = w.calculateHash(key)
return w
}
// 写入数据
// WriteHeader 写入数据
func (this *MemoryWriter) WriteHeader(data []byte) (n int, err error) {
this.headerSize += int64(len(data))
this.item.HeaderValue = append(this.item.HeaderValue, data...)
return len(data), nil
}
// 写入数据
// Write 写入数据
func (this *MemoryWriter) Write(data []byte) (n int, err error) {
this.bodySize += int64(len(data))
this.item.BodyValue = append(this.item.BodyValue, data...)
return len(data), nil
}
// 数据尺寸
// HeaderSize 数据尺寸
func (this *MemoryWriter) HeaderSize() int64 {
return this.headerSize
}
// BodySize 主体内容尺寸
func (this *MemoryWriter) BodySize() int64 {
return this.bodySize
}
// 关闭
// Close 关闭
func (this *MemoryWriter) Close() error {
// 需要在Locker之外
defer this.endFunc()
if this.item == nil {
return nil
}
@@ -72,25 +80,28 @@ func (this *MemoryWriter) Close() error {
return nil
}
// 丢弃
// Discard 丢弃
func (this *MemoryWriter) Discard() error {
// 需要在Locker之外
defer this.endFunc()
this.locker.Lock()
delete(this.m, this.hash)
this.locker.Unlock()
return nil
}
// Key
// Key 获取Key
func (this *MemoryWriter) Key() string {
return this.key
}
// 过期时间
// ExpiredAt 过期时间
func (this *MemoryWriter) ExpiredAt() int64 {
return this.expiredAt
}
// 内容类型
// ItemType 内容类型
func (this *MemoryWriter) ItemType() ItemType {
return ItemTypeMemory
}

View File

@@ -1,7 +1,7 @@
package teaconst
const (
Version = "0.1.0"
Version = "0.2.0"
ProductName = "Edge Node"
ProcessName = "edge-node"

View File

@@ -47,7 +47,7 @@ func (this *IPListManager) Start() {
// 第一次读取
err := this.loop()
if err != nil {
remotelogs.Println("IP_LIST_MANAGER", err.Error())
remotelogs.Error("IP_LIST_MANAGER", err.Error())
}
ticker := time.NewTicker(60 * time.Second)
@@ -64,7 +64,7 @@ func (this *IPListManager) Start() {
if err != nil {
countErrors++
remotelogs.Println("IP_LIST_MANAGER", err.Error())
remotelogs.Error("IP_LIST_MANAGER", err.Error())
// 连续错误小于3次的我们立即重试
if countErrors <= 3 {

View File

@@ -1,6 +1,7 @@
package nodes
import (
"context"
"encoding/json"
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs"
@@ -13,6 +14,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/logs"
"io"
"net/http"
"os/exec"
@@ -55,10 +57,16 @@ func (this *APIStream) loop() error {
return errors.Wrap(err)
}
isQuiting := false
ctx, cancelFunc := context.WithCancel(rpcClient.Context())
nodeStream, err := rpcClient.NodeRPC().NodeStream(ctx)
events.On(events.EventQuit, func() {
isQuiting = true
remotelogs.Println("API_STREAM", "quiting")
if nodeStream != nil {
cancelFunc()
}
})
nodeStream, err := rpcClient.NodeRPC().NodeStream(rpcClient.Context())
if err != nil {
if isQuiting {
return nil
@@ -69,12 +77,14 @@ func (this *APIStream) loop() error {
for {
if isQuiting {
logs.Println("API_STREAM", "quit")
break
}
message, err := nodeStream.Recv()
if err != nil {
if isQuiting {
remotelogs.Println("API_STREAM", "quit")
return nil
}
return errors.Wrap(err)

View File

@@ -77,7 +77,7 @@ Loop:
return err
}
_, err = client.HTTPAccessLogRPC().CreateHTTPAccessLogs(client.Context(), &pb.CreateHTTPAccessLogsRequest{AccessLogs: accessLogs})
_, err = client.HTTPAccessLogRPC().CreateHTTPAccessLogs(client.Context(), &pb.CreateHTTPAccessLogsRequest{HttpAccessLogs: accessLogs})
if err != nil {
return err
}

View File

@@ -1,6 +1,7 @@
package nodes
import (
"context"
"errors"
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
@@ -9,6 +10,8 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/stats"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/types"
"golang.org/x/net/http2"
"io"
"net"
"net/http"
"net/url"
@@ -59,6 +62,7 @@ type HTTPRequest struct {
rewriteIsExternalURL bool // 重写目标是否为外部URL
cacheRef *serverconfigs.HTTPCacheRef // 缓存设置
cacheKey string // 缓存使用的Key
isCached bool // 是否已经被缓存
// WAF相关
firewallPolicyId int64
@@ -229,7 +233,11 @@ func (this *HTTPRequest) doEnd() {
// 流量统计
// TODO 增加是否开启开关
if this.Server != nil {
stats.SharedTrafficStatManager.Add(this.Server.Id, this.writer.sentBodyBytes)
if this.isCached {
stats.SharedTrafficStatManager.Add(this.Server.Id, this.writer.sentBodyBytes, this.writer.sentBodyBytes, 1, 1)
} else {
stats.SharedTrafficStatManager.Add(this.Server.Id, this.writer.sentBodyBytes, 0, 1, 0)
}
}
}
@@ -481,7 +489,7 @@ func (this *HTTPRequest) Format(source string) string {
return this.rawURI
case "requestPath":
return this.requestPath()
case "requestPathExtension": // TODO 需要添加到文档中
case "requestPathExtension":
return filepath.Ext(this.requestPath())
case "requestLength":
return strconv.FormatInt(this.requestLength(), 10)
@@ -576,7 +584,6 @@ func (this *HTTPRequest) Format(source string) string {
}
// response.
// TODO 需要在文档中添加说明
if prefix == "response" {
switch suffix {
case "contentType":
@@ -1133,3 +1140,30 @@ func (this *HTTPRequest) bytePool(contentLength int64) *utils.BytePool {
}
return bytePool128k
}
// 检查是否可以忽略错误
func (this *HTTPRequest) canIgnore(err error) bool {
if err == nil {
return true
}
// 网络错误
_, ok := err.(*net.OpError)
if ok {
return true
}
// 客户端主动取消
if err == context.Canceled || err == io.ErrShortWrite {
return true
}
// HTTP/2流错误
{
_, ok := err.(http2.StreamError)
if ok {
return true
}
}
return false
}

View File

@@ -8,11 +8,17 @@ import (
"github.com/iwind/TeaGo/logs"
"net/http"
"strconv"
"time"
)
// 读取缓存
func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
if this.web.Cache == nil || !this.web.Cache.IsOn || len(this.web.Cache.CacheRefs) == 0 {
cachePolicy := sharedNodeConfig.HTTPCachePolicy
if cachePolicy == nil || !cachePolicy.IsOn {
return
}
if this.web.Cache == nil || !this.web.Cache.IsOn || (len(cachePolicy.CacheRefs) == 0 && len(this.web.Cache.CacheRefs) == 0) {
return
}
var addStatusHeader = this.web.Cache.AddStatusHeader
@@ -25,12 +31,8 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
}()
}
cachePolicy := sharedNodeConfig.HTTPCachePolicy
if cachePolicy == nil || !cachePolicy.IsOn {
return
}
// 检查条件
// 检查服务独立的缓存条件
refType := ""
for _, cacheRef := range this.web.Cache.CacheRefs {
if !cacheRef.IsOn ||
cacheRef.Conds == nil ||
@@ -38,12 +40,35 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
continue
}
if cacheRef.Conds.MatchRequest(this.Format) {
if cacheRef.IsReverse {
return
}
this.cacheRef = cacheRef
refType = "server"
break
}
}
if this.cacheRef == nil {
return
// 检查策略默认的缓存条件
for _, cacheRef := range cachePolicy.CacheRefs {
if !cacheRef.IsOn ||
cacheRef.Conds == nil ||
!cacheRef.Conds.HasRequestConds() {
continue
}
if cacheRef.Conds.MatchRequest(this.Format) {
if cacheRef.IsReverse {
return
}
this.cacheRef = cacheRef
refType = "policy"
break
}
}
if this.cacheRef == nil {
return
}
}
// 相关变量
@@ -89,7 +114,9 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
return
}
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
}
return
}
defer func() {
@@ -121,13 +148,56 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
return true, nil
})
if err != nil {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
}
return
}
if addStatusHeader {
this.writer.Header().Set("X-Cache", "HIT")
this.writer.Header().Set("X-Cache", "HIT, "+refType+", "+reader.TypeName())
}
// ETag
var respHeader = this.writer.Header()
var eTag = respHeader.Get("ETag")
var lastModifiedAt = reader.LastModified()
if len(eTag) == 0 {
if lastModifiedAt > 0 {
eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "\""
respHeader["ETag"] = []string{eTag}
}
}
// 支持 Last-Modified
var modifiedTime = respHeader.Get("Last-Modified")
if len(modifiedTime) == 0 {
if lastModifiedAt > 0 {
modifiedTime = time.Unix(lastModifiedAt, 0).Format("Mon, 02 Jan 2006 15:04:05 GMT")
if len(respHeader.Get("Last-Modified")) == 0 {
respHeader.Set("Last-Modified", modifiedTime)
}
}
}
// 支持 If-None-Match
if len(eTag) > 0 && this.requestHeader("If-None-Match") == eTag {
// 自定义Header
this.processResponseHeaders(http.StatusNotModified)
this.writer.WriteHeader(http.StatusNotModified)
this.cacheRef = nil
return true
}
// 支持 If-Modified-Since
if len(modifiedTime) > 0 && this.requestHeader("If-Modified-Since") == modifiedTime {
// 自定义Header
this.processResponseHeaders(http.StatusNotModified)
this.writer.WriteHeader(http.StatusNotModified)
this.cacheRef = nil
return true
}
this.processResponseHeaders(reader.Status())
// 输出Body
@@ -211,7 +281,9 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
this.writer.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return true
}
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
}
return
}
} else if len(rangeSet) > 1 {
@@ -252,7 +324,9 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
return true, err
})
if err != nil {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
}
return true
}
}
@@ -273,12 +347,15 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
return true, nil
})
if err != nil {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
}
return
}
}
}
this.cacheRef = nil // 终止读取不再往下传递
this.isCached = true
this.cacheRef = nil
return true
}

View File

@@ -12,7 +12,7 @@ import (
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
"github.com/iwind/gofcgi/pkg"
"github.com/iwind/gofcgi/pkg/fcgi"
"io"
"net"
"net/url"
@@ -78,7 +78,7 @@ func (this *HTTPRequest) doFastcgi() (shouldStop bool) {
poolSize = 32
}
client, err := pkg.SharedPool(fastcgi.Network(), fastcgi.RealAddress(), uint(poolSize)).Client()
client, err := fcgi.SharedPool(fastcgi.Network(), fastcgi.RealAddress(), uint(poolSize)).Client()
if err != nil {
this.write500(err)
return
@@ -151,7 +151,7 @@ func (this *HTTPRequest) doFastcgi() (shouldStop bool) {
params["HTTP_HOST"] = this.Host
}
fcgiReq := pkg.NewRequest()
fcgiReq := fcgi.NewRequest()
fcgiReq.SetTimeout(fastcgi.ReadTimeoutDuration())
fcgiReq.SetParams(params)
fcgiReq.SetBody(this.RawReq.Body, uint32(this.requestLength()))
@@ -189,7 +189,7 @@ func (this *HTTPRequest) doFastcgi() (shouldStop bool) {
this.processResponseHeaders(resp.StatusCode)
// 准备
this.writer.Prepare(resp.ContentLength)
this.writer.Prepare(resp.ContentLength, resp.StatusCode)
// 设置响应代码
this.writer.WriteHeader(resp.StatusCode)
@@ -200,14 +200,20 @@ func (this *HTTPRequest) doFastcgi() (shouldStop bool) {
_, err = io.CopyBuffer(this.writer, resp.Body, buf)
pool.Put(buf)
err1 := resp.Body.Close()
if err1 != nil {
remotelogs.Error("REQUEST_REVERSE_PROXY", err1.Error())
closeErr := resp.Body.Close()
if closeErr != nil {
remotelogs.Warn("HTTP_REQUEST_FASTCGI", closeErr.Error())
}
if err != nil && err != io.EOF {
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error())
remotelogs.Warn("HTTP_REQUEST_FASTCGI", err.Error())
this.addError(err)
}
// 是否成功结束
if err == nil && closeErr == nil {
this.writer.SetOk()
}
return
}

View File

@@ -2,6 +2,7 @@ package nodes
import (
"net/http"
"strconv"
"strings"
)
@@ -12,7 +13,10 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
if !u.IsOn {
continue
}
if u.MatchPrefix {
if !u.MatchRequest(this.Format) {
continue
}
if u.MatchPrefix { // 匹配前缀
if strings.HasPrefix(fullURL, u.BeforeURL) {
afterURL := u.AfterURL
if u.KeepRequestURI {
@@ -25,7 +29,39 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
}
return true
}
} else {
} else if u.MatchRegexp { // 正则匹配
reg := u.BeforeURLRegexp()
if reg == nil {
continue
}
matches := reg.FindStringSubmatch(fullURL)
if len(matches) == 0 {
continue
}
afterURL := u.AfterURL
for i, match := range matches {
afterURL = strings.ReplaceAll(afterURL, "${"+strconv.Itoa(i)+"}", match)
}
subNames := reg.SubexpNames()
if len(subNames) > 0 {
for _, subName := range subNames {
if len(subName) > 0 {
index := reg.SubexpIndex(subName)
if index > -1 {
afterURL = strings.ReplaceAll(afterURL, "${"+subName+"}", matches[index])
}
}
}
}
if u.Status <= 0 {
http.Redirect(this.RawWriter, this.RawReq, afterURL, http.StatusTemporaryRedirect)
} else {
http.Redirect(this.RawWriter, this.RawReq, afterURL, u.Status)
}
return true
} else { // 精准匹配
if fullURL == u.RealBeforeURL() {
if u.Status <= 0 {
http.Redirect(this.RawWriter, this.RawReq, u.AfterURL, http.StatusTemporaryRedirect)

View File

@@ -1,6 +1,7 @@
package nodes
import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/logs"
"io"
@@ -50,7 +51,11 @@ func (this *HTTPRequest) doPage(status int) (shouldStop bool) {
_, err = io.CopyBuffer(this.writer, fp, buf)
bytePool1k.Put(buf)
if err != nil {
logs.Error(err)
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_PAGE", "write to client failed: "+err.Error())
}
} else {
this.writer.SetOk()
}
err = fp.Close()
if err != nil {

View File

@@ -19,11 +19,10 @@ func (this *HTTPRequest) doRedirectToHTTPS(redirectToHTTPSConfig *serverconfigs.
} else if redirectToHTTPSConfig.Port > 0 {
lastIndex := strings.LastIndex(host, ":")
if lastIndex > 0 {
if redirectToHTTPSConfig.Port != 443 {
host = host[:lastIndex] + ":" + strconv.Itoa(redirectToHTTPSConfig.Port)
} else {
host = host[:lastIndex]
}
host = host[:lastIndex]
}
if redirectToHTTPSConfig.Port != 443 {
host = host + ":" + strconv.Itoa(redirectToHTTPSConfig.Port)
}
} else {
lastIndex := strings.LastIndex(host, ":")

View File

@@ -35,7 +35,7 @@ func (this *HTTPRequest) doReverseProxy() {
origin := this.reverseProxy.NextOrigin(requestCall)
if origin == nil {
err := errors.New(this.requestPath() + ": no available backends for reverse proxy")
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error())
remotelogs.Error("HTTP_REQUEST_REVERSE_PROXY", err.Error())
this.write502(err)
return
}
@@ -55,7 +55,7 @@ func (this *HTTPRequest) doReverseProxy() {
// 处理Scheme
if origin.Addr == nil {
err := errors.New(this.requestPath() + ": origin '" + strconv.FormatInt(origin.Id, 10) + "' does not has a address")
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error())
remotelogs.Error("HTTP_REQUEST_REVERSE_PROXY", err.Error())
this.write502(err)
return
}
@@ -142,7 +142,7 @@ func (this *HTTPRequest) doReverseProxy() {
// 获取请求客户端
client, err := SharedHTTPClientPool.Client(this.RawReq, origin, originAddr)
if err != nil {
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error())
remotelogs.Error("HTTP_REQUEST_REVERSE_PROXY", err.Error())
this.write502(err)
return
}
@@ -162,7 +162,7 @@ func (this *HTTPRequest) doReverseProxy() {
// TODO 如果超过最大失败次数,则下线
this.write502(err)
remotelogs.Println("REQUEST_REVERSE_PROXY", this.RawReq.URL.String()+"': "+err.Error())
remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", this.RawReq.URL.String()+"': "+err.Error())
} else {
// 是否为客户端方面的错误
isClientError := false
@@ -189,7 +189,7 @@ func (this *HTTPRequest) doReverseProxy() {
if this.doWAFResponse(resp) {
err = resp.Body.Close()
if err != nil {
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error())
remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", err.Error())
}
return
}
@@ -201,7 +201,7 @@ func (this *HTTPRequest) doReverseProxy() {
if len(this.web.Pages) > 0 && this.doPage(resp.StatusCode) {
err = resp.Body.Close()
if err != nil {
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error())
remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", err.Error())
}
return
}
@@ -226,7 +226,7 @@ func (this *HTTPRequest) doReverseProxy() {
shouldAutoFlush := this.reverseProxy.AutoFlush || this.RawReq.Header.Get("Accept") == "text/event-stream"
// 准备
this.writer.Prepare(resp.ContentLength)
this.writer.Prepare(resp.ContentLength, resp.StatusCode)
// 设置响应代码
this.writer.WriteHeader(resp.StatusCode)
@@ -254,13 +254,22 @@ func (this *HTTPRequest) doReverseProxy() {
}
pool.Put(buf)
err1 := resp.Body.Close()
if err1 != nil {
remotelogs.Error("REQUEST_REVERSE_PROXY", err1.Error())
closeErr := resp.Body.Close()
if closeErr != nil {
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", closeErr.Error())
}
}
if err != nil && err != io.EOF {
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error())
this.addError(err)
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", err.Error())
this.addError(err)
}
}
// 是否成功结束
if err == nil && closeErr == nil {
this.writer.SetOk()
}
}

View File

@@ -296,7 +296,7 @@ func (this *HTTPRequest) doRoot() (isBreak bool) {
this.cacheRef = nil // 不支持缓存
}
this.writer.Prepare(fileSize)
this.writer.Prepare(fileSize, http.StatusOK)
pool := this.bytePool(fileSize)
buf := pool.Get()
@@ -382,6 +382,9 @@ func (this *HTTPRequest) doRoot() (isBreak bool) {
}
}
// 设置成功
this.writer.SetOk()
return true
}

View File

@@ -1,6 +1,7 @@
package nodes
import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/logs"
"io"
@@ -65,8 +66,16 @@ func (this *HTTPRequest) doShutdown() {
buf := bytePool1k.Get()
_, err = io.CopyBuffer(this.writer, fp, buf)
bytePool1k.Put(buf)
if err != nil {
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_SHUTDOWN", "write to client failed: "+err.Error())
}
} else {
this.writer.SetOk()
}
err = fp.Close()
if err != nil {
logs.Error(err)
remotelogs.Warn("HTTP_REQUEST_SHUTDOWN", "close file failed: "+err.Error())
}
}

View File

@@ -2,6 +2,7 @@ package nodes
import (
"errors"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/logs"
"io"
@@ -50,7 +51,11 @@ func (this *HTTPRequest) doURL(method string, url string, host string, statusCod
}
this.writer.AddHeaders(resp.Header)
this.writer.Prepare(resp.ContentLength)
if statusCode <= 0 {
this.writer.Prepare(resp.ContentLength, resp.StatusCode)
} else {
this.writer.Prepare(resp.ContentLength, statusCode)
}
// 设置响应代码
if statusCode <= 0 {
@@ -64,4 +69,12 @@ func (this *HTTPRequest) doURL(method string, url string, host string, statusCod
buf := pool.Get()
_, err = io.CopyBuffer(this.writer, resp.Body, buf)
pool.Put(buf)
if err != nil {
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_URL", "write to client failed: "+err.Error())
}
} else {
this.writer.SetOk()
}
}

View File

@@ -70,7 +70,7 @@ func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFir
for _, action := range actions {
goNext, err := action.DoHTTP(this.RawReq, this.RawWriter)
if err != nil {
remotelogs.Error("REQUEST", "do action '"+err.Error()+"' failed: "+err.Error())
remotelogs.Error("HTTP_REQUEST_WAF", "do action '"+err.Error()+"' failed: "+err.Error())
return true, false
}
if !goNext {
@@ -101,7 +101,7 @@ func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFir
for _, remoteAddr := range remoteAddrs {
result, err := iplibrary.SharedLibrary.Lookup(remoteAddr)
if err != nil {
remotelogs.Error("REQUEST", "iplibrary lookup failed: "+err.Error())
remotelogs.Error("HTTP_REQUEST_WAF", "iplibrary lookup failed: "+err.Error())
} else if result != nil {
// 检查国家级别封禁
if len(regionConfig.DenyCountryIds) > 0 && len(result.Country) > 0 {
@@ -147,7 +147,7 @@ func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFir
}
goNext, ruleGroup, ruleSet, err := w.MatchRequest(this.RawReq, this.writer)
if err != nil {
remotelogs.Error("REQUEST", this.rawURI+": "+err.Error())
remotelogs.Error("HTTP_REQUEST_WAF", this.rawURI+": "+err.Error())
return
}
@@ -181,7 +181,7 @@ func (this *HTTPRequest) doWAFResponse(resp *http.Response) (blocked bool) {
goNext, ruleGroup, ruleSet, err := w.MatchResponse(this.RawReq, resp, this.writer)
if err != nil {
remotelogs.Error("REQUEST", this.rawURI+": "+err.Error())
remotelogs.Error("HTTP_REQUEST_WAF", this.rawURI+": "+err.Error())
return
}

View File

@@ -14,7 +14,7 @@ import (
"strings"
)
// 响应Writer
// HTTPWriter 响应Writer
type HTTPWriter struct {
req *HTTPRequest
writer http.ResponseWriter
@@ -32,9 +32,11 @@ type HTTPWriter struct {
cacheWriter caches.Writer // 缓存写入
cacheStorage caches.StorageInterface
isOk bool // 是否完全成功
}
// 包装对象
// NewHTTPWriter 包装对象
func NewHTTPWriter(req *HTTPRequest, httpResponseWriter http.ResponseWriter) *HTTPWriter {
return &HTTPWriter{
req: req,
@@ -42,7 +44,7 @@ func NewHTTPWriter(req *HTTPRequest, httpResponseWriter http.ResponseWriter) *HT
}
}
// 重置
// Reset 重置
func (this *HTTPWriter) Reset(httpResponseWriter http.ResponseWriter) {
this.writer = httpResponseWriter
@@ -58,23 +60,25 @@ func (this *HTTPWriter) Reset(httpResponseWriter http.ResponseWriter) {
this.gzipBodyWriter = nil
}
// 设置Gzip
// Gzip 设置Gzip
func (this *HTTPWriter) Gzip(config *serverconfigs.HTTPGzipConfig) {
this.gzipConfig = config
}
// 准备输出
func (this *HTTPWriter) Prepare(size int64) {
// Prepare 准备输出
func (this *HTTPWriter) Prepare(size int64, status int) {
this.statusCode = status
this.prepareGzip(size)
this.prepareCache(size)
}
// 包装前的原始的Writer
// Raw 包装前的原始的Writer
func (this *HTTPWriter) Raw() http.ResponseWriter {
return this.writer
}
// 获取Header
// Header 获取Header
func (this *HTTPWriter) Header() http.Header {
if this.writer == nil {
return http.Header{}
@@ -82,7 +86,7 @@ func (this *HTTPWriter) Header() http.Header {
return this.writer.Header()
}
// 添加一组Header
// AddHeaders 添加一组Header
func (this *HTTPWriter) AddHeaders(header http.Header) {
if this.writer == nil {
return
@@ -97,7 +101,7 @@ func (this *HTTPWriter) AddHeaders(header http.Header) {
}
}
// 写入数据
// Write 写入数据
func (this *HTTPWriter) Write(data []byte) (n int, err error) {
if this.writer != nil {
if this.gzipWriter != nil {
@@ -113,8 +117,9 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) {
if this.cacheWriter != nil {
_, err = this.cacheWriter.Write(data)
if err != nil {
_ = this.cacheWriter.Discard()
this.cacheWriter = nil
remotelogs.Error("REQUEST_WRITER", "write cache failed: "+err.Error())
remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
}
}
} else {
@@ -126,7 +131,7 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) {
if this.gzipBodyWriter != nil {
_, err := this.gzipBodyWriter.Write(data)
if err != nil {
remotelogs.Error("REQUEST_WRITER", err.Error())
remotelogs.Error("HTTP_WRITER", err.Error())
}
} else {
this.body = append(this.body, data...)
@@ -135,17 +140,17 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) {
return
}
// 写入字符串
// WriteString 写入字符串
func (this *HTTPWriter) WriteString(s string) (n int, err error) {
return this.Write([]byte(s))
}
// 读取发送的字节数
// SentBodyBytes 读取发送的字节数
func (this *HTTPWriter) SentBodyBytes() int64 {
return this.sentBodyBytes
}
// 写入状态码
// WriteHeader 写入状态码
func (this *HTTPWriter) WriteHeader(statusCode int) {
if this.writer != nil {
this.writer.WriteHeader(statusCode)
@@ -153,7 +158,7 @@ func (this *HTTPWriter) WriteHeader(statusCode int) {
this.statusCode = statusCode
}
// 读取状态码
// StatusCode 读取状态码
func (this *HTTPWriter) StatusCode() int {
if this.statusCode == 0 {
return http.StatusOK
@@ -161,22 +166,22 @@ func (this *HTTPWriter) StatusCode() int {
return this.statusCode
}
// 设置拷贝Body数据
// SetBodyCopying 设置拷贝Body数据
func (this *HTTPWriter) SetBodyCopying(b bool) {
this.bodyCopying = b
}
// 判断是否在拷贝Body数据
// BodyIsCopying 判断是否在拷贝Body数据
func (this *HTTPWriter) BodyIsCopying() bool {
return this.bodyCopying
}
// 读取拷贝的Body数据
// Body 读取拷贝的Body数据
func (this *HTTPWriter) Body() []byte {
return this.body
}
// 读取Header二进制数据
// HeaderData 读取Header二进制数据
func (this *HTTPWriter) HeaderData() []byte {
if this.writer == nil {
return nil
@@ -198,7 +203,12 @@ func (this *HTTPWriter) HeaderData() []byte {
return writer.Bytes()
}
// 关闭
// SetOk 设置成功
func (this *HTTPWriter) SetOk() {
this.isOk = true
}
// Close 关闭
func (this *HTTPWriter) Close() {
// gzip writer
if this.gzipWriter != nil {
@@ -212,20 +222,26 @@ func (this *HTTPWriter) Close() {
// cache writer
if this.cacheWriter != nil {
err := this.cacheWriter.Close()
if err == nil {
this.cacheStorage.AddToList(&caches.Item{
Type: this.cacheWriter.ItemType(),
Key: this.cacheWriter.Key(),
ExpiredAt: this.cacheWriter.ExpiredAt(),
HeaderSize: this.cacheWriter.HeaderSize(),
BodySize: this.cacheWriter.BodySize(),
})
if this.isOk {
err := this.cacheWriter.Close()
if err == nil {
this.cacheStorage.AddToList(&caches.Item{
Type: this.cacheWriter.ItemType(),
Key: this.cacheWriter.Key(),
ExpiredAt: this.cacheWriter.ExpiredAt(),
HeaderSize: this.cacheWriter.HeaderSize(),
BodySize: this.cacheWriter.BodySize(),
Host: this.req.Host,
ServerId: this.req.Server.Id,
})
}
} else {
_ = this.cacheWriter.Discard()
}
}
}
// Hijack
// Hijack Hijack
func (this *HTTPWriter) Hijack() (conn net.Conn, buf *bufio.ReadWriter, err error) {
hijack, ok := this.writer.(http.Hijacker)
if ok {
@@ -234,7 +250,7 @@ func (this *HTTPWriter) Hijack() (conn net.Conn, buf *bufio.ReadWriter, err erro
return
}
// Flush
// Flush Flush
func (this *HTTPWriter) Flush() {
flusher, ok := this.writer.(http.Flusher)
if ok {
@@ -282,7 +298,7 @@ func (this *HTTPWriter) prepareGzip(size int64) {
var err error = nil
this.gzipWriter, err = gzip.NewWriterLevel(this.writer, int(this.gzipConfig.Level))
if err != nil {
remotelogs.Error("REQUEST_WRITER", err.Error())
remotelogs.Error("HTTP_WRITER", err.Error())
return
}
@@ -291,7 +307,7 @@ func (this *HTTPWriter) prepareGzip(size int64) {
this.gzipBodyBuffer = bytes.NewBuffer([]byte{})
this.gzipBodyWriter, err = gzip.NewWriterLevel(this.gzipBodyBuffer, int(this.gzipConfig.Level))
if err != nil {
remotelogs.Error("REQUEST_WRITER", err.Error())
remotelogs.Error("HTTP_WRITER", err.Error())
}
}
@@ -373,8 +389,8 @@ func (this *HTTPWriter) prepareCache(size int64) {
expiredAt := utils.UnixTime() + life
cacheWriter, err := storage.OpenWriter(this.req.cacheKey, expiredAt, this.StatusCode())
if err != nil {
if err != caches.ErrFileIsWriting {
remotelogs.Error("REQUEST_WRITER", "write cache failed: "+err.Error())
if !caches.CanIgnoreErr(err) {
remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
}
return
}
@@ -388,7 +404,8 @@ func (this *HTTPWriter) prepareCache(size int64) {
for _, v1 := range v {
_, err = cacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n"))
if err != nil {
remotelogs.Error("REQUEST_WRITER", "write cache failed: "+err.Error())
remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
_ = this.cacheWriter.Discard()
this.cacheWriter = nil
return
}

View File

@@ -43,8 +43,19 @@ func (this *Listener) Listen() error {
return nil
}
protocol := this.group.Protocol()
if protocol.IsUDPFamily() {
return this.listenUDP()
}
return this.listenTCP()
}
netListener, err := this.createListener()
func (this *Listener) listenTCP() error {
if this.group == nil {
return nil
}
protocol := this.group.Protocol()
netListener, err := this.createTCPListener()
if err != nil {
return err
}
@@ -80,11 +91,6 @@ func (this *Listener) Listen() error {
BaseListener: BaseListener{Group: this.group},
Listener: netListener,
}
case serverconfigs.ProtocolUDP:
this.listener = &UDPListener{
BaseListener: BaseListener{Group: this.group},
Listener: netListener,
}
default:
return errors.New("unknown protocol '" + protocol.String() + "'")
}
@@ -108,6 +114,31 @@ func (this *Listener) Listen() error {
return nil
}
func (this *Listener) listenUDP() error {
listener, err := this.createUDPListener()
if err != nil {
return err
}
events.On(events.EventQuit, func() {
remotelogs.Println("LISTENER", "quit "+this.group.FullAddr())
_ = listener.Close()
})
this.listener = &UDPListener{
BaseListener: BaseListener{Group: this.group},
Listener: listener,
}
go func() {
err := this.listener.Serve()
if err != nil {
remotelogs.Error("LISTENER", err.Error())
}
}()
return nil
}
func (this *Listener) Close() error {
if this.listener == nil {
return nil
@@ -115,8 +146,8 @@ func (this *Listener) Close() error {
return this.listener.Close()
}
// 创建监听器
func (this *Listener) createListener() (net.Listener, error) {
// 创建TCP监听器
func (this *Listener) createTCPListener() (net.Listener, error) {
listenConfig := net.ListenConfig{
Control: nil,
KeepAlive: 0,
@@ -131,3 +162,13 @@ func (this *Listener) createListener() (net.Listener, error) {
return listenConfig.Listen(context.Background(), "tcp", this.group.Addr())
}
// 创建UDP监听器
func (this *Listener) createUDPListener() (*net.UDPConn, error) {
// TODO 将来支持udp4/udp6
addr, err := net.ResolveUDPAddr("udp", this.group.Addr())
if err != nil {
return nil, err
}
return net.ListenUDP("udp", addr)
}

View File

@@ -4,6 +4,8 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"golang.org/x/net/http2"
"io"
"log"
"net"
"net/http"
"strings"
@@ -11,6 +13,8 @@ import (
"time"
)
var httpErrorLogger = log.New(io.Discard, "", 0)
type HTTPListener struct {
BaseListener
@@ -37,6 +41,7 @@ func (this *HTTPListener) Serve() error {
Handler: handler,
ReadHeaderTimeout: 3 * time.Second, // TODO 改成可以配置
IdleTimeout: 2 * time.Minute, // TODO 改成可以配置
ErrorLog: httpErrorLogger,
ConnState: func(conn net.Conn, state http.ConnState) {
switch state {
case http.StateNew:

View File

@@ -2,20 +2,20 @@ package nodes
import "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
// 各协议监听器的接口
// ListenerInterface 各协议监听器的接口
type ListenerInterface interface {
// 初始化
// Init 初始化
Init()
// 监听
// Serve 监听
Serve() error
// 关闭
// Close 关闭
Close() error
// 重载配置
// Reload 重载配置
Reload(serverGroup *serverconfigs.ServerGroup)
// 获取当前活跃的连接数
// CountActiveListeners 获取当前活跃的连接数
CountActiveListeners() int
}

View File

@@ -4,33 +4,56 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/lists"
"net/url"
"regexp"
"sync"
"time"
)
var sharedListenerManager = NewListenerManager()
// 端口监听管理器
// ListenerManager 端口监听管理器
type ListenerManager struct {
listenersMap map[string]*Listener // addr => *Listener
locker sync.Mutex
lastConfig *nodeconfigs.NodeConfig
retryListenerMap map[string]*Listener // 需要重试的监听器 addr => Listener
ticker *time.Ticker
}
// 获取新对象
// NewListenerManager 获取新对象
func NewListenerManager() *ListenerManager {
return &ListenerManager{
listenersMap: map[string]*Listener{},
manager := &ListenerManager{
listenersMap: map[string]*Listener{},
retryListenerMap: map[string]*Listener{},
ticker: time.NewTicker(1 * time.Minute),
}
// 提升测试效率
if Tea.IsTesting() {
manager.ticker = time.NewTicker(5 * time.Second)
}
go func() {
for range manager.ticker.C {
manager.retryListeners()
}
}()
return manager
}
// 启动监听
// Start 启动监听
func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
this.locker.Lock()
defer this.locker.Unlock()
// 重置数据
this.retryListenerMap = map[string]*Listener{}
// 检查是否有变化
/**if this.lastConfig != nil && this.lastConfig.Version == node.Version {
return nil
@@ -83,7 +106,16 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
listener.Reload(group)
err := listener.Listen()
if err != nil {
remotelogs.Error("LISTENER_MANAGER", err.Error())
// 放入到重试队列中
this.retryListenerMap[addr] = listener
firstServer := group.FirstServer()
if firstServer == nil {
remotelogs.Error("LISTENER_MANAGER", err.Error())
} else {
remotelogs.ServerError(firstServer.Id, "LISTENER_MANAGER", err.Error())
}
continue
}
this.listenersMap[addr] = listener
@@ -93,7 +125,7 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
return nil
}
// 获取总的活跃连接数
// TotalActiveConnections 获取总的活跃连接数
func (this *ListenerManager) TotalActiveConnections() int {
this.locker.Lock()
defer this.locker.Unlock()
@@ -116,3 +148,18 @@ func (this *ListenerManager) prettyAddress(addr string) string {
}
return u.String()
}
// 重试失败的Listener
func (this *ListenerManager) retryListeners() {
this.locker.Lock()
defer this.locker.Unlock()
for addr, listener := range this.retryListenerMap {
err := listener.Listen()
if err == nil {
delete(this.retryListenerMap, addr)
this.listenersMap[addr] = listener
remotelogs.ServerSuccess(listener.group.FirstServer().Id, "LISTENER_MANAGER", "retry to listen '"+addr+"' successfully")
}
}
}

View File

@@ -80,7 +80,7 @@ func (this *TCPListener) handleConn(conn net.Conn) error {
}
// 记录流量
stats.SharedTrafficStatManager.Add(firstServer.Id, int64(n))
stats.SharedTrafficStatManager.Add(firstServer.Id, int64(n), 0, 0, 0)
}
if err != nil {
closer()

View File

@@ -1,28 +1,201 @@
package nodes
import (
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/stats"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"net"
"sync"
"time"
)
type UDPListener struct {
BaseListener
Listener net.Listener
Listener *net.UDPConn
connMap map[string]*UDPConn
connLocker sync.Mutex
connTicker *utils.Ticker
}
func (this *UDPListener) Serve() error {
// TODO
// TODO 注意管理 CountActiveConnections
return nil
firstServer := this.Group.FirstServer()
if firstServer == nil {
return errors.New("no server available")
}
if firstServer.ReverseProxy == nil {
return errors.New("no ReverseProxy configured for the server")
}
this.connMap = map[string]*UDPConn{}
this.connTicker = utils.NewTicker(1 * time.Minute)
go func() {
for this.connTicker.Next() {
this.gcConns()
}
}()
var buffer = make([]byte, 4*1024)
for {
n, addr, _ := this.Listener.ReadFrom(buffer)
if n > 0 {
this.connLocker.Lock()
conn, ok := this.connMap[addr.String()]
this.connLocker.Unlock()
if ok && !conn.IsOk() {
_ = conn.Close()
ok = false
}
if !ok {
originConn, err := this.connectOrigin(firstServer.ReverseProxy, "")
if err != nil {
remotelogs.Error("UDP_LISTENER", "unable to connect to origin server: "+err.Error())
continue
}
if originConn == nil {
remotelogs.Error("UDP_LISTENER", "unable to find a origin server")
continue
}
conn = NewUDPConn(firstServer.Id, addr, this.Listener, originConn.(*net.UDPConn))
this.connLocker.Lock()
this.connMap[addr.String()] = conn
this.connLocker.Unlock()
}
_, _ = conn.Write(buffer[:n])
}
}
}
func (this *UDPListener) Close() error {
// TODO
return nil
if this.connTicker != nil {
this.connTicker.Stop()
}
// 关闭所有连接
this.connLocker.Lock()
for _, conn := range this.connMap {
_ = conn.Close()
}
this.connLocker.Unlock()
return this.Listener.Close()
}
func (this *UDPListener) Reload(group *serverconfigs.ServerGroup) {
this.Group = group
this.Reset()
}
func (this *UDPListener) connectOrigin(reverseProxy *serverconfigs.ReverseProxyConfig, remoteAddr string) (conn net.Conn, err error) {
if reverseProxy == nil {
return nil, errors.New("no reverse proxy config")
}
retries := 3
for i := 0; i < retries; i++ {
origin := reverseProxy.NextOrigin(nil)
if origin == nil {
continue
}
conn, err = OriginConnect(origin, remoteAddr)
if err != nil {
remotelogs.Error("UDP_LISTENER", "unable to connect origin: "+origin.Addr.Host+":"+origin.Addr.PortRange+": "+err.Error())
continue
} else {
return
}
}
err = errors.New("no origin can be used")
return
}
// 回收连接
func (this *UDPListener) gcConns() {
this.connLocker.Lock()
closingConns := []*UDPConn{}
for addr, conn := range this.connMap {
if !conn.IsOk() {
closingConns = append(closingConns, conn)
delete(this.connMap, addr)
}
}
this.connLocker.Unlock()
for _, conn := range closingConns {
_ = conn.Close()
}
}
// UDPConn 自定义的UDP连接管理
type UDPConn struct {
addr net.Addr
proxyConn net.Conn
serverConn net.Conn
activatedAt int64
isOk bool
isClosed bool
}
func NewUDPConn(serverId int64, addr net.Addr, proxyConn *net.UDPConn, serverConn *net.UDPConn) *UDPConn {
conn := &UDPConn{
addr: addr,
proxyConn: proxyConn,
serverConn: serverConn,
activatedAt: time.Now().Unix(),
isOk: true,
}
go func() {
buffer := bytePool32k.Get()
defer func() {
bytePool32k.Put(buffer)
}()
for {
n, err := serverConn.Read(buffer)
if n > 0 {
conn.activatedAt = time.Now().Unix()
_, writingErr := proxyConn.WriteTo(buffer[:n], addr)
if writingErr != nil {
conn.isOk = false
break
}
// 记录流量
stats.SharedTrafficStatManager.Add(serverId, int64(n), 0, 0, 0)
}
if err != nil {
conn.isOk = false
break
}
}
}()
return conn
}
func (this *UDPConn) Write(b []byte) (n int, err error) {
this.activatedAt = time.Now().Unix()
n, err = this.serverConn.Write(b)
if err != nil {
this.isOk = false
}
return
}
func (this *UDPConn) Close() error {
this.isOk = false
if this.isClosed {
return nil
}
this.isClosed = true
return this.serverConn.Close()
}
func (this *UDPConn) IsOk() bool {
if !this.isOk {
return false
}
return time.Now().Unix()-this.activatedAt < 30 // 如果超过 N 秒没有活动我们认为是超时
}

View File

@@ -33,8 +33,10 @@ import (
var sharedNodeConfig *nodeconfigs.NodeConfig
var nodeTaskNotify = make(chan bool, 8)
var DaemonIsOn = false
var DaemonPid = 0
// 节点
// Node 节点
type Node struct {
isLoaded bool
}
@@ -43,7 +45,7 @@ func NewNode() *Node {
return &Node{}
}
// 检查配置
// Test 检查配置
func (this *Node) Test() error {
// 检查是否能连接API
rpcClient, err := rpc.SharedRPC()
@@ -58,8 +60,15 @@ func (this *Node) Test() error {
return nil
}
// 启动
// Start 启动
func (this *Node) Start() {
_, ok := os.LookupEnv("EdgeDaemon")
if ok {
remotelogs.Println("NODE", "start from daemon")
DaemonIsOn = true
DaemonPid = os.Getppid()
}
// 启动事件
events.Notify(events.EventStart)
@@ -146,7 +155,7 @@ func (this *Node) Start() {
select {}
}
// 实现守护进程
// Daemon 实现守护进程
func (this *Node) Daemon() {
path := os.TempDir() + "/edge-node.sock"
isDebug := lists.ContainsString(os.Args, "debug")
@@ -164,6 +173,10 @@ func (this *Node) Daemon() {
if err != nil {
return err
}
// 可以标记当前是从守护进程启动的
_ = os.Setenv("EdgeDaemon", "on")
cmd := exec.Command(exe)
err = cmd.Start()
if err != nil {
@@ -191,7 +204,7 @@ func (this *Node) Daemon() {
}
}
// 安装系统服务
// InstallSystemService 安装系统服务
func (this *Node) InstallSystemService() error {
shortName := teaconst.SystemdServiceName
@@ -285,6 +298,8 @@ func (this *Node) loop() error {
if err != nil {
return err
}
case "nodeVersionChanged":
go sharedUpgradeManager.Start()
}
}

View File

@@ -9,7 +9,7 @@ import (
"strconv"
)
// 连接源站
// OriginConnect 连接源站
func OriginConnect(origin *serverconfigs.OriginConfig, remoteAddr string) (net.Conn, error) {
if origin.Addr == nil {
return nil, errors.New("origin server address should not be empty")
@@ -70,9 +70,15 @@ func OriginConnect(origin *serverconfigs.OriginConfig, remoteAddr string) (net.C
return tls.Dial("tcp", origin.Addr.Host+":"+origin.Addr.PortRange, &tls.Config{
InsecureSkipVerify: true,
})
case serverconfigs.ProtocolUDP:
addr, err := net.ResolveUDPAddr("udp", origin.Addr.Host+":"+origin.Addr.PortRange)
if err != nil {
return nil, err
}
return net.DialUDP("udp", nil, addr)
}
// TODO 支持从Unix、Pipe、HTTP、HTTPS中读取数据
return nil, errors.New("invalid scheme '" + origin.Addr.Protocol.String() + "'")
return nil, errors.New("invalid origin scheme '" + origin.Addr.Protocol.String() + "'")
}

View File

@@ -0,0 +1,252 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package nodes
import (
"crypto/md5"
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/logs"
stringutil "github.com/iwind/TeaGo/utils/string"
"os"
"os/exec"
"runtime"
"time"
)
var sharedUpgradeManager = NewUpgradeManager()
// UpgradeManager 节点升级管理器
// TODO 需要在集群中设置是否自动更新
type UpgradeManager struct {
isInstalling bool
lastFile string
}
// NewUpgradeManager 获取新对象
func NewUpgradeManager() *UpgradeManager {
return &UpgradeManager{}
}
// Start 启动升级
func (this *UpgradeManager) Start() {
// 测试环境下不更新
if Tea.IsTesting() {
return
}
if this.isInstalling {
return
}
this.isInstalling = true
// 还原安装状态
defer func() {
this.isInstalling = false
}()
remotelogs.Println("UPGRADE_MANAGER", "upgrading node ...")
err := this.install()
if err != nil {
remotelogs.Error("UPGRADE_MANAGER", "download failed: "+err.Error())
return
}
remotelogs.Println("UPGRADE_MANAGER", "upgrade successfully")
go func() {
err = this.restart()
if err != nil {
logs.Println("UPGRADE_MANAGER", err.Error())
}
}()
}
func (this *UpgradeManager) install() error {
// 检查是否有已下载但未安装成功的
if len(this.lastFile) > 0 {
_, err := os.Stat(this.lastFile)
if err == nil {
err = this.unzip(this.lastFile)
if err != nil {
return err
}
this.lastFile = ""
return nil
}
}
// 创建临时文件
dir := Tea.Root + "/tmp"
_, err := os.Stat(dir)
if err != nil {
if os.IsNotExist(err) {
err = os.Mkdir(dir, 0777)
if err != nil {
return err
}
} else {
return err
}
}
remotelogs.Println("UPGRADE_MANAGER", "downloading new node ...")
path := dir + "/edge-node" + ".tmp"
fp, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0777)
if err != nil {
return err
}
isClosed := false
defer func() {
if !isClosed {
_ = fp.Close()
}
}()
client, err := rpc.SharedRPC()
if err != nil {
return err
}
var offset int64
var h = md5.New()
var sum = ""
var filename = ""
for {
resp, err := client.NodeRPC().DownloadNodeInstallationFile(client.Context(), &pb.DownloadNodeInstallationFileRequest{
Os: runtime.GOOS,
Arch: runtime.GOARCH,
ChunkOffset: offset,
})
if err != nil {
return err
}
if len(resp.Sum) == 0 {
return nil
}
sum = resp.Sum
filename = resp.Filename
if stringutil.VersionCompare(resp.Version, teaconst.Version) <= 0 {
return nil
}
if len(resp.ChunkData) == 0 {
break
}
// 写入文件
_, err = fp.Write(resp.ChunkData)
if err != nil {
return err
}
_, err = h.Write(resp.ChunkData)
if err != nil {
return err
}
offset = resp.Offset
}
if len(filename) == 0 {
return nil
}
isClosed = true
err = fp.Close()
if err != nil {
return err
}
if fmt.Sprintf("%x", h.Sum(nil)) != sum {
_ = os.Remove(path)
return nil
}
// 改成zip
zipPath := dir + "/" + filename
err = os.Rename(path, zipPath)
if err != nil {
return err
}
this.lastFile = zipPath
// 解压
err = this.unzip(zipPath)
if err != nil {
return err
}
return nil
}
// 解压
func (this *UpgradeManager) unzip(zipPath string) error {
var isOk = false
defer func() {
if isOk {
// 只有解压并覆盖成功后才会删除
_ = os.Remove(zipPath)
}
}()
// 解压
var target = Tea.Root
if Tea.IsTesting() {
// 测试环境下只解压在tmp目录
target = Tea.Root + "/tmp"
}
// 先改先前的可执行文件
err := os.Rename(target+"/bin/edge-node", target+"/bin/.edge-node.old")
hasBackup := err == nil
defer func() {
if !isOk && hasBackup {
// 失败时还原
_ = os.Rename(target+"/bin/.edge-node.old", target+"/bin/edge-node")
}
}()
unzip := utils.NewUnzip(zipPath, target, "edge-node/")
err = unzip.Run()
if err != nil {
return err
}
isOk = true
return nil
}
// 重启
func (this *UpgradeManager) restart() error {
// 重新启动
if DaemonIsOn && DaemonPid == os.Getppid() {
os.Exit(0) // TODO 试着更优雅重启
} else {
exe, err := os.Executable()
if err != nil {
return err
}
// quit
events.Notify(events.EventQuit)
// 启动
cmd := exec.Command(exe, "start")
err = cmd.Start()
if err != nil {
return err
}
// 退出当前进程
time.Sleep(1 * time.Second)
os.Exit(0)
}
return nil
}

View File

@@ -0,0 +1,16 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package nodes
import (
_ "github.com/iwind/TeaGo/bootstrap"
"testing"
)
func TestUpgradeManager_install(t *testing.T) {
err := NewUpgradeManager().install()
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}

View File

@@ -24,7 +24,7 @@ func init() {
}()
}
// 打印普通信息
// Println 打印普通信息
func Println(tag string, description string) {
logs.Println("[" + tag + "]" + description)
@@ -47,7 +47,7 @@ func Println(tag string, description string) {
}
}
// 打印警告信息
// Warn 打印警告信息
func Warn(tag string, description string) {
logs.Println("[" + tag + "]" + description)
@@ -70,7 +70,7 @@ func Warn(tag string, description string) {
}
}
// 打印错误信息
// Error 打印错误信息
func Error(tag string, description string) {
logs.Println("[" + tag + "]" + description)
@@ -93,6 +93,54 @@ func Error(tag string, description string) {
}
}
// ServerError 打印服务相关错误信息
func ServerError(serverId int64, tag string, description string) {
logs.Println("[" + tag + "]" + description)
nodeConfig, _ := nodeconfigs.SharedNodeConfig()
if nodeConfig == nil {
return
}
select {
case logChan <- &pb.NodeLog{
Role: teaconst.Role,
Tag: tag,
Description: description,
Level: "error",
NodeId: nodeConfig.Id,
ServerId: serverId,
CreatedAt: time.Now().Unix(),
}:
default:
}
}
// ServerSuccess 打印服务相关成功信息
func ServerSuccess(serverId int64, tag string, description string) {
logs.Println("[" + tag + "]" + description)
nodeConfig, _ := nodeconfigs.SharedNodeConfig()
if nodeConfig == nil {
return
}
select {
case logChan <- &pb.NodeLog{
Role: teaconst.Role,
Tag: tag,
Description: description,
Level: "success",
NodeId: nodeConfig.Id,
ServerId: serverId,
CreatedAt: time.Now().Unix(),
}:
default:
}
}
// 上传日志
func uploadLogs() error {
logList := []*pb.NodeLog{}

View File

@@ -15,9 +15,16 @@ import (
var SharedTrafficStatManager = NewTrafficStatManager()
type TrafficItem struct {
Bytes int64
CachedBytes int64
CountRequests int64
CountCachedRequests int64
}
// TrafficStatManager 区域流量统计
type TrafficStatManager struct {
m map[string]int64 // [timestamp serverId] => bytes
itemMap map[string]*TrafficItem // [timestamp serverId] => bytes
locker sync.Mutex
configFunc func() *nodeconfigs.NodeConfig
}
@@ -25,7 +32,7 @@ type TrafficStatManager struct {
// NewTrafficStatManager 获取新对象
func NewTrafficStatManager() *TrafficStatManager {
manager := &TrafficStatManager{
m: map[string]int64{},
itemMap: map[string]*TrafficItem{},
}
return manager
@@ -55,7 +62,7 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig)
}
// Add 添加流量
func (this *TrafficStatManager) Add(serverId int64, bytes int64) {
func (this *TrafficStatManager) Add(serverId int64, bytes int64, cachedBytes int64, countRequests int64, countCachedRequests int64) {
if bytes == 0 {
return
}
@@ -64,7 +71,15 @@ func (this *TrafficStatManager) Add(serverId int64, bytes int64) {
key := strconv.FormatInt(timestamp, 10) + strconv.FormatInt(serverId, 10)
this.locker.Lock()
this.m[key] += bytes
item, ok := this.itemMap[key]
if !ok {
item = &TrafficItem{}
this.itemMap[key] = item
}
item.Bytes += bytes
item.CachedBytes += cachedBytes
item.CountRequests += countRequests
item.CountCachedRequests += countCachedRequests
this.locker.Unlock()
}
@@ -81,12 +96,12 @@ func (this *TrafficStatManager) Upload() error {
}
this.locker.Lock()
m := this.m
this.m = map[string]int64{}
m := this.itemMap
this.itemMap = map[string]*TrafficItem{}
this.locker.Unlock()
pbStats := []*pb.ServerDailyStat{}
for key, bytes := range m {
for key, item := range m {
timestamp, err := strconv.ParseInt(key[:10], 10, 64)
if err != nil {
return err
@@ -97,10 +112,13 @@ func (this *TrafficStatManager) Upload() error {
}
pbStats = append(pbStats, &pb.ServerDailyStat{
ServerId: serverId,
RegionId: config.RegionId,
Bytes: bytes,
CreatedAt: timestamp,
ServerId: serverId,
RegionId: config.RegionId,
Bytes: item.Bytes,
CachedBytes: item.CachedBytes,
CountRequests: item.CountRequests,
CountCachedRequests: item.CountCachedRequests,
CreatedAt: timestamp,
})
}
if len(pbStats) == 0 {

View File

@@ -8,15 +8,15 @@ import (
func TestTrafficStatManager_Add(t *testing.T) {
manager := NewTrafficStatManager()
for i := 0; i < 100; i++ {
manager.Add(1, 10)
manager.Add(1, 10, 1, 0)
}
t.Log(manager.m)
t.Log(manager.itemMap)
}
func TestTrafficStatManager_Upload(t *testing.T) {
manager := NewTrafficStatManager()
for i := 0; i < 100; i++ {
manager.Add(1, 10)
manager.Add(1, 10, 1, 0)
}
err := manager.Upload()
if err != nil {
@@ -30,6 +30,6 @@ func BenchmarkTrafficStatManager_Add(b *testing.B) {
manager := NewTrafficStatManager()
for i := 0; i < b.N; i++ {
manager.Add(1, 1024)
manager.Add(1, 1024, 1, 0)
}
}

98
internal/utils/unzip.go Normal file
View File

@@ -0,0 +1,98 @@
package utils
import (
"archive/zip"
"errors"
"io"
"os"
"strings"
)
type Unzip struct {
zipFile string
targetDir string
stripPrefix string
}
func NewUnzip(zipFile string, targetDir string, stripPrefix string) *Unzip {
return &Unzip{
zipFile: zipFile,
targetDir: targetDir,
stripPrefix: stripPrefix,
}
}
func (this *Unzip) Run() error {
if len(this.zipFile) == 0 {
return errors.New("zip file should not be empty")
}
if len(this.targetDir) == 0 {
return errors.New("target dir should not be empty")
}
reader, err := zip.OpenReader(this.zipFile)
if err != nil {
return err
}
defer func() {
_ = reader.Close()
}()
for _, file := range reader.File {
info := file.FileInfo()
filename := file.Name
if len(this.stripPrefix) > 0 {
filename = strings.TrimPrefix(filename, this.stripPrefix)
}
target := this.targetDir + "/" + filename
// 目录
if info.IsDir() {
stat, err := os.Stat(target)
if err != nil {
if !os.IsNotExist(err) {
return err
} else {
err = os.MkdirAll(target, info.Mode())
if err != nil {
return err
}
}
} else if !stat.IsDir() {
err = os.MkdirAll(target, info.Mode())
if err != nil {
return err
}
}
continue
}
// 文件
err := func(file *zip.File, target string) error {
fileReader, err := file.Open()
if err != nil {
return err
}
defer func() {
_ = fileReader.Close()
}()
fileWriter, err := os.OpenFile(target, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, file.FileInfo().Mode())
if err != nil {
return err
}
defer func() {
_ = fileWriter.Close()
}()
_, err = io.Copy(fileWriter, fileReader)
return err
}(file, target)
if err != nil {
return err
}
}
return nil
}