Compare commits

..

23 Commits

Author SHA1 Message Date
刘祥超
24fc2249bb 优化文件缓存 2021-06-14 19:55:06 +08:00
刘祥超
84c931b411 缓存支持ETag和Last-Modified 2021-06-14 11:46:39 +08:00
刘祥超
7f422a2946 优化cache/FileList错误提示 2021-06-13 17:51:04 +08:00
刘祥超
13194366a5 优化文件缓存 2021-06-13 17:37:57 +08:00
刘祥超
993cda7766 修复内存缓存没有init()的Bug 2021-06-12 10:03:33 +08:00
刘祥超
a46e970c74 优化内存缓存 2021-06-11 14:53:51 +08:00
刘祥超
085adcf1c4 实现节点自动升级成最新版本 2021-06-10 19:19:15 +08:00
刘祥超
c1af8b36a4 完成两个TODO文档说明 2021-06-10 11:35:20 +08:00
刘祥超
8cba12b4b5 URL跳转规则支持匹配条件 2021-06-09 21:44:59 +08:00
刘祥超
3debe1d1df 支持不缓存条件 2021-06-08 22:45:11 +08:00
刘祥超
549f110e5f 增加服务流量统计 2021-06-08 11:24:41 +08:00
刘祥超
f3a45e9e64 改进UDP IsOk的使用方法 2021-06-07 15:48:39 +08:00
刘祥超
f461760158 支持UDP代理 2021-06-07 15:45:47 +08:00
刘祥超
a49b724745 优化HTTP缓存,主要是并发冲突、缓存写入不全等问题 2021-06-06 23:42:11 +08:00
刘祥超
0df5dfad23 某个服务端口启动失败后,会自动重试 2021-06-06 13:40:00 +08:00
刘祥超
aeb1bc08a7 改进编辑脚本 2021-06-06 11:58:41 +08:00
刘祥超
c51aca621a 调整API命名 2021-06-01 19:52:37 +08:00
刘祥超
c78d055dae 更新fcgi 2021-05-28 14:00:45 +08:00
刘祥超
4502a3b132 改进清空缓存目录逻辑 2021-05-25 18:28:24 +08:00
刘祥超
fa99d86d6f 对部分错误提示降级 2021-05-25 11:16:05 +08:00
刘祥超
2edd2bb105 缓存文件列表初始化时自动创建目录 2021-05-25 11:06:43 +08:00
刘祥超
0f8aee0ccb 修改版本号 2021-05-25 11:06:07 +08:00
刘祥超
2500929a99 调整在Mac OS上的编译脚本 2021-05-25 11:05:55 +08:00
47 changed files with 1763 additions and 315 deletions

View File

@@ -48,7 +48,38 @@ function build() {
fi
echo "building ..."
env GOOS=${OS} GOARCH=${ARCH} go build -o $DIST/bin/${NAME} -ldflags="-s -w" $ROOT/../cmd/edge-node/main.go
MUSL_DIR="/usr/local/opt/musl-cross/bin"
CC_PATH=""
CXX_PATH=""
if [[ `uname -a` == *"Darwin"* && "${OS}" == "linux" ]]; then
# /usr/local/opt/musl-cross/bin/
if [ "${ARCH}" == "amd64" ]; then
CC_PATH="x86_64-linux-musl-gcc"
CXX_PATH="x86_64-linux-musl-g++"
fi
if [ "${ARCH}" == "386" ]; then
CC_PATH="i486-linux-musl-gcc"
CXX_PATH="i486-linux-musl-g++"
fi
if [ "${ARCH}" == "arm64" ]; then
CC_PATH="aarch64-linux-musl-gcc"
CXX_PATH="aarch64-linux-musl-g++"
fi
if [ "${ARCH}" == "mips64" ]; then
CC_PATH="mips64-linux-musl-gcc"
CXX_PATH="mips64-linux-musl-g++"
fi
if [ "${ARCH}" == "mips64le" ]; then
CC_PATH="mips64el-linux-musl-gcc"
CXX_PATH="mips64el-linux-musl-g++"
fi
fi
if [ ! -z $CC_PATH ]; then
env CC=$MUSL_DIR/$CC_PATH CXX=$MUSL_DIR/$CXX_PATH GOOS=${OS} GOARCH=${ARCH} CGO_ENABLED=1 go build -o $DIST/bin/${NAME} -ldflags "-linkmode external -extldflags -static -s -w" $ROOT/../cmd/edge-node/main.go
else
env GOOS=${OS} GOARCH=${ARCH} CGO_ENABLED=1 go build -o $DIST/bin/${NAME} -ldflags="-s -w" $ROOT/../cmd/edge-node/main.go
fi
# delete hidden files
find $DIST -name ".DS_Store" -delete

View File

@@ -7,8 +7,11 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/nodes"
"github.com/iwind/TeaGo/Tea"
_ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/types"
"io/ioutil"
"net/http"
_ "net/http/pprof"
"os"
"syscall"
)
@@ -17,7 +20,7 @@ func main() {
app := apps.NewAppCmd().
Version(teaconst.Version).
Product(teaconst.ProductName).
Usage(teaconst.ProcessName + " [-v|start|stop|restart|quit|test|service|daemon]")
Usage(teaconst.ProcessName + " [-v|start|stop|restart|status|quit|test|service|daemon|pprof]")
app.On("test", func() {
err := nodes.NewNode().Test()
@@ -57,6 +60,21 @@ func main() {
_ = process.Signal(syscall.SIGQUIT)
}
})
app.On("pprof", func() {
// TODO 自己指定端口
addr := "127.0.0.1:6060"
logs.Println("starting with pprof '" + addr + "'...")
go func() {
err := http.ListenAndServe(addr, nil)
if err != nil {
logs.Println("[error]" + err.Error())
}
}()
node := nodes.NewNode()
node.Start()
})
app.Run(func() {
node := nodes.NewNode()
node.Start()

4
go.mod
View File

@@ -12,8 +12,8 @@ require (
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/go-yaml/yaml v2.1.0+incompatible
github.com/golang/protobuf v1.4.2
github.com/iwind/TeaGo v0.0.0-20201020081413-7cf62d6f420f
github.com/iwind/gofcgi v0.0.0-20210506081859-17498ab3e9d7
github.com/iwind/TeaGo v0.0.0-20210411134150-ddf57e240c2f
github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11
github.com/lionsoul2014/ip2region v2.2.0-release+incompatible
github.com/mattn/go-sqlite3 v1.14.7
github.com/mssola/user_agent v0.5.2

15
go.sum
View File

@@ -11,6 +11,7 @@ github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiU
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
@@ -55,11 +56,10 @@ github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/iwind/TeaGo v0.0.0-20200923021120-f5d76441fe9e/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/TeaGo v0.0.0-20201020081413-7cf62d6f420f h1:6Ws2H+eorfVUoMO2jta6A9nIdh8oi5/5LXo/LkAxR+E=
github.com/iwind/TeaGo v0.0.0-20201020081413-7cf62d6f420f/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/gofcgi v0.0.0-20210506081859-17498ab3e9d7 h1:apv23QzWNmv0D76gB3+u/5kf0F/Yw4W8h489CWUZtss=
github.com/iwind/gofcgi v0.0.0-20210506081859-17498ab3e9d7/go.mod h1:JtbX20untAjUVjZs1ZBtq80f5rJWvwtQNRL6EnuYRnY=
github.com/iwind/TeaGo v0.0.0-20210411134150-ddf57e240c2f h1:r2O8PONj/KiuZjJHVHn7KlCePUIjNtgAmvLfgRafQ8o=
github.com/iwind/TeaGo v0.0.0-20210411134150-ddf57e240c2f/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11 h1:DaQjoWZhLNxjhIXedVg4/vFEtHkZhK4IjIwsWdyzBLg=
github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11/go.mod h1:JtbX20untAjUVjZs1ZBtq80f5rJWvwtQNRL6EnuYRnY=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@@ -70,9 +70,7 @@ github.com/lionsoul2014/ip2region v2.2.0-release+incompatible/go.mod h1:+ZBN7PBo
github.com/mattn/go-sqlite3 v1.14.7 h1:fxWBnXkxfM6sRiuH3bqJ4CfzZojMOLVc0UTsTglEghA=
github.com/mattn/go-sqlite3 v1.14.7/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mssola/user_agent v0.5.2 h1:CZkTUahjL1+OcZ5zv3kZr8QiJ8jy2H08vZIEkBeRbxo=
github.com/mssola/user_agent v0.5.2/go.mod h1:TTPno8LPY3wAIEKRpAtkdMT0f8SE24pLRGPahjCH4uw=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
@@ -183,8 +181,9 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

41
internal/caches/errors.go Normal file
View File

@@ -0,0 +1,41 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
import "errors"
// 常用的几个错误
var (
ErrNotFound = errors.New("cache not found")
ErrFileIsWriting = errors.New("the file is writing")
ErrInvalidRange = errors.New("invalid range")
)
// CapacityError 容量错误
// 独立出来是为了可以在有些场合下可以忽略,防止产生没必要的错误提示数量太多
type CapacityError struct {
err string
}
func NewCapacityError(err string) error {
return &CapacityError{err: err}
}
func (this *CapacityError) Error() string {
return this.err
}
// CanIgnoreErr 检查错误是否可以忽略
func CanIgnoreErr(err error) bool {
if err == nil {
return true
}
if err == ErrFileIsWriting {
return true
}
_, ok := err.(*CapacityError)
if ok {
return true
}
return false
}

View File

@@ -0,0 +1,16 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
import (
"github.com/iwind/TeaGo/assert"
"testing"
)
func TestCanIgnoreErr(t *testing.T) {
a := assert.NewAssertion(t)
a.IsTrue(CanIgnoreErr(ErrFileIsWriting))
a.IsTrue(CanIgnoreErr(NewCapacityError("over capcity")))
a.IsFalse(CanIgnoreErr(ErrNotFound))
}

View File

@@ -1,6 +1,8 @@
package caches
import "time"
import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
)
type ItemType = int
@@ -16,10 +18,12 @@ type Item struct {
HeaderSize int64 `json:"headerSize"`
BodySize int64 `json:"bodySize"`
MetaSize int64 `json:"metaSize"`
Host string `json:"host"` // 主机名
ServerId int64 `json:"serverId"` // 服务ID
}
func (this *Item) IsExpired() bool {
return this.ExpiredAt < time.Now().Unix()
return this.ExpiredAt < utils.UnixTime()
}
func (this *Item) TotalSize() int64 {

View File

@@ -4,7 +4,12 @@ package caches
import (
"database/sql"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/lists"
_ "github.com/mattn/go-sqlite3"
"os"
"strconv"
"sync/atomic"
"time"
)
@@ -17,6 +22,19 @@ type FileList struct {
onAdd func(item *Item)
onRemove func(item *Item)
existsByHashStmt *sql.Stmt // 根据hash检查是否存在
insertStmt *sql.Stmt // 写入数据
selectByHashStmt *sql.Stmt // 使用hash查询数据
deleteByHashStmt *sql.Stmt // 根据hash删除数据
statStmt *sql.Stmt // 统计
purgeStmt *sql.Stmt // 清理
deleteAllStmt *sql.Stmt // 删除所有数据
oldTables []string
itemsTableName string
isClosed bool
}
func NewFileList(dir string) ListInterface {
@@ -24,51 +42,80 @@ func NewFileList(dir string) ListInterface {
}
func (this *FileList) Init() error {
db, err := sql.Open("sqlite3", "file:"+this.dir+"/index.db?cache=shared&mode=rwc")
// 检查目录是否存在
_, err := os.Stat(this.dir)
if err != nil {
err = os.MkdirAll(this.dir, 0777)
if err != nil {
return err
}
remotelogs.Println("CACHE", "create cache dir '"+this.dir+"'")
}
this.itemsTableName = "cacheItems_v2"
db, err := sql.Open("sqlite3", "file:"+this.dir+"/index.db?cache=shared&mode=rwc&_journal_mode=WAL")
if err != nil {
return err
}
db.SetMaxOpenConns(1)
this.db = db
_, err = db.Exec("VACUUM")
// 清除旧表
this.oldTables = []string{
"cacheItems",
}
err = this.removeOldTables()
if err != nil {
return err
remotelogs.Warn("CACHE", "clean old tables failed: "+err.Error())
}
// TODO 耗时过长,暂时不整理数据库
/**_, err = db.Exec("VACUUM")
if err != nil {
return err
}**/
// 创建
// TODO accessesAt 用来存储访问时间,将来可以根据此访问时间删除不常访问的内容
// 且访问时间只需要每隔一个小时存储一个整数值即可,因为不需要那么精确
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS "cacheItems" (
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
"key" varchar(1024),
"headerSize" integer DEFAULT 0,
"bodySize" integer DEFAULT 0,
"metaSize" integer DEFAULT 0,
"expiredAt" integer DEFAULT 0,
"accessedAt" integer DEFAULT 0
"createdAt" integer DEFAULT 0,
"host" varchar(128),
"serverId" integer
);
CREATE INDEX IF NOT EXISTS "createdAt"
ON "` + this.itemsTableName + `" (
"createdAt" ASC
);
CREATE INDEX IF NOT EXISTS "expiredAt"
ON "` + this.itemsTableName + `" (
"expiredAt" ASC
);
CREATE UNIQUE INDEX IF NOT EXISTS "hash"
ON "cacheItems" (
"hash"
ON "` + this.itemsTableName + `" (
"hash" ASC
);
CREATE INDEX IF NOT EXISTS "expiredAt"
ON "cacheItems" (
"expiredAt"
);
CREATE INDEX IF NOT EXISTS "accessedAt"
ON "cacheItems" (
"accessedAt"
CREATE INDEX IF NOT EXISTS "serverId"
ON "` + this.itemsTableName + `" (
"serverId" ASC
);
`)
if err != nil {
return err
}
this.db = db
// 读取总数量
row := this.db.QueryRow("SELECT COUNT(*) FROM cacheItems")
row := this.db.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`)
if row.Err() != nil {
return row.Err()
}
@@ -79,6 +126,42 @@ ON "cacheItems" (
}
this.total = total
// 常用语句
this.existsByHashStmt, err = this.db.Prepare(`SELECT "bodySize" FROM "` + this.itemsTableName + `" WHERE "hash"=? AND expiredAt>? LIMIT 1`)
if err != nil {
return err
}
this.insertStmt, err = this.db.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "host", "serverId", "createdAt") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return err
}
this.selectByHashStmt, err = this.db.Prepare(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? LIMIT 1`)
if err != nil {
return err
}
this.deleteByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?`)
if err != nil {
return err
}
this.statStmt, err = this.db.Prepare(`SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM "` + this.itemsTableName + `" WHERE expiredAt>?`)
if err != nil {
return err
}
this.purgeStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" WHERE expiredAt<=? LIMIT ?`)
if err != nil {
return err
}
this.deleteAllStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `"`)
if err != nil {
return err
}
return nil
}
@@ -88,7 +171,11 @@ func (this *FileList) Reset() error {
}
func (this *FileList) Add(hash string, item *Item) error {
_, err := this.db.Exec(`INSERT INTO cacheItems ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt") VALUES (?, ?, ?, ?, ?, ?)`, hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt)
if this.isClosed {
return nil
}
_, err := this.insertStmt.Exec(hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.Host, item.ServerId, utils.UnixTime())
if err != nil {
return err
}
@@ -102,54 +189,55 @@ func (this *FileList) Add(hash string, item *Item) error {
}
func (this *FileList) Exist(hash string) (bool, error) {
row := this.db.QueryRow(`SELECT "bodySize" FROM cacheItems WHERE "hash"=? LIMIT 1`, hash)
if row == nil {
if this.isClosed {
return false, nil
}
if row.Err() != nil {
return false, row.Err()
}
var bodySize int
err := row.Scan(&bodySize)
rows, err := this.existsByHashStmt.Query(hash, time.Now().Unix())
if err != nil {
if err == sql.ErrNoRows {
return false, nil
}
return false, err
}
return true, nil
}
// FindKeysWithPrefix 根据前缀进行查找
func (this *FileList) FindKeysWithPrefix(prefix string) (keys []string, err error) {
if len(prefix) == 0 {
return
}
// TODO 需要优化上千万结果的情况
rows, err := this.db.Query(`SELECT "key" FROM cacheItems WHERE INSTR("key", ?)==1 LIMIT 100000`, prefix)
if err != nil {
return nil, err
}
defer func() {
_ = rows.Close()
}()
if rows.Next() {
return true, nil
}
return false, nil
}
for rows.Next() {
var key string
err = rows.Scan(&key)
if err != nil {
return nil, err
}
keys = append(keys, key)
// CleanPrefix 清理某个前缀的缓存数据
func (this *FileList) CleanPrefix(prefix string) error {
if this.isClosed {
return nil
}
return
if len(prefix) == 0 {
return nil
}
var count = int64(10000)
for {
result, err := this.db.Exec(`UPDATE "`+this.itemsTableName+`" SET expiredAt=0 WHERE id IN (SELECT id FROM "`+this.itemsTableName+`" WHERE expiredAt>0 AND createdAt<=? AND INSTR("key", ?)==1 LIMIT `+strconv.FormatInt(count, 10)+`)`, utils.UnixTime(), prefix)
if err != nil {
return err
}
affectedRows, err := result.RowsAffected()
if err != nil {
return err
}
if affectedRows < count {
return nil
}
}
}
func (this *FileList) Remove(hash string) error {
row := this.db.QueryRow(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM cacheItems WHERE "hash"=? LIMIT 1`, hash)
if this.isClosed {
return nil
}
row := this.selectByHashStmt.QueryRow(hash)
if row.Err() != nil {
return row.Err()
}
@@ -163,7 +251,7 @@ func (this *FileList) Remove(hash string) error {
return err
}
_, err = this.db.Exec(`DELETE FROM cacheItems WHERE "hash"=?`, hash)
_, err = this.deleteByHashStmt.Exec(hash)
if err != nil {
return err
}
@@ -181,11 +269,15 @@ func (this *FileList) Remove(hash string) error {
// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间
// callback 每次发现过期key的调用
func (this *FileList) Purge(count int, callback func(hash string) error) error {
if this.isClosed {
return nil
}
if count <= 0 {
count = 1000
}
rows, err := this.db.Query(`SELECT "hash" FROM cacheItems WHERE expiredAt<=? LIMIT ?`, time.Now().Unix(), count)
rows, err := this.purgeStmt.Query(time.Now().Unix(), count)
if err != nil {
return err
}
@@ -220,7 +312,11 @@ func (this *FileList) Purge(count int, callback func(hash string) error) error {
}
func (this *FileList) CleanAll() error {
_, err := this.db.Exec("DELETE FROM cacheItems")
if this.isClosed {
return nil
}
_, err := this.deleteAllStmt.Exec()
if err != nil {
return err
}
@@ -229,8 +325,12 @@ func (this *FileList) CleanAll() error {
}
func (this *FileList) Stat(check func(hash string) bool) (*Stat, error) {
if this.isClosed {
return &Stat{}, nil
}
// 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些
row := this.db.QueryRow("SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM cacheItems")
row := this.statStmt.QueryRow(time.Now().Unix())
if row.Err() != nil {
return nil, row.Err()
}
@@ -258,3 +358,47 @@ func (this *FileList) OnAdd(f func(item *Item)) {
func (this *FileList) OnRemove(f func(item *Item)) {
this.onRemove = f
}
func (this *FileList) Close() error {
this.isClosed = true
if this.db != nil {
_ = this.existsByHashStmt.Close()
_ = this.insertStmt.Close()
_ = this.selectByHashStmt.Close()
_ = this.deleteByHashStmt.Close()
_ = this.statStmt.Close()
_ = this.purgeStmt.Close()
_ = this.deleteAllStmt.Close()
return this.db.Close()
}
return nil
}
func (this *FileList) removeOldTables() error {
rows, err := this.db.Query(`SELECT "name" FROM sqlite_master WHERE "type"='table'`)
if err != nil {
return err
}
defer func() {
_ = rows.Close()
}()
for rows.Next() {
var name string
err = rows.Scan(&name)
if err != nil {
return err
}
if lists.ContainsString(this.oldTables, name) {
// 异步执行
go func() {
remotelogs.Println("CACHE", "remove old table '"+name+"' ...")
_, _ = this.db.Exec(`DROP TABLE "` + name + `"`)
remotelogs.Println("CACHE", "remove old table '"+name+"' done")
}()
}
}
return nil
}

View File

@@ -4,8 +4,10 @@ package caches
import (
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/rands"
stringutil "github.com/iwind/TeaGo/utils/string"
"strconv"
"sync"
"testing"
"time"
)
@@ -31,6 +33,8 @@ func TestFileList_Add(t *testing.T) {
HeaderSize: 1,
MetaSize: 2,
BodySize: 3,
Host: "teaos.cn",
ServerId: 1,
})
if err != nil {
t.Fatal(err)
@@ -44,11 +48,12 @@ func TestFileList_Add_Many(t *testing.T) {
if err != nil {
t.Fatal(err)
}
for i := 0; i < 100_0000; i++ {
before := time.Now()
for i := 0; i < 2000_0000; i++ {
u := "http://edge.teaos.cn/123456" + strconv.Itoa(i)
err = list.Add(stringutil.Md5(u), &Item{
_ = list.Add(stringutil.Md5(u), &Item{
Key: u,
ExpiredAt: time.Now().Unix(),
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1,
MetaSize: 2,
BodySize: 3,
@@ -56,6 +61,10 @@ func TestFileList_Add_Many(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if i > 0 && i%10_000 == 0 {
t.Log(i, int(10000/time.Since(before).Seconds()), "qps")
before = time.Now()
}
}
t.Log("ok")
}
@@ -66,6 +75,10 @@ func TestFileList_Exist(t *testing.T) {
if err != nil {
t.Fatal(err)
}
before := time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
{
exists, err := list.Exist(stringutil.Md5("123456"))
if err != nil {
@@ -74,7 +87,7 @@ func TestFileList_Exist(t *testing.T) {
t.Log("exists:", exists)
}
{
exists, err := list.Exist(stringutil.Md5("654321"))
exists, err := list.Exist(stringutil.Md5("http://edge.teaos.cn/1234561"))
if err != nil {
t.Fatal(err)
}
@@ -82,18 +95,70 @@ func TestFileList_Exist(t *testing.T) {
}
}
func TestFileList_FindKeysWithPrefix(t *testing.T) {
func TestFileList_Exist_Many_DB(t *testing.T) {
// 测试在多个数据库下的性能
var listSlice = []ListInterface{}
for i := 1; i <= 10; i++ {
list := NewFileList(Tea.Root + "/data/data" + strconv.Itoa(i))
err := list.Init()
if err != nil {
t.Fatal(err)
}
listSlice = append(listSlice, list)
}
var wg = sync.WaitGroup{}
var threads = 8
wg.Add(threads)
var count = 200_000
var countLocker sync.Mutex
var tasks = make(chan int, count)
for i := 0; i < count; i++ {
tasks <- i
}
var hash = stringutil.Md5("http://edge.teaos.cn/1234561")
before := time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
for i := 0; i < threads; i++ {
go func() {
defer wg.Done()
for {
select {
case <-tasks:
countLocker.Lock()
count--
countLocker.Unlock()
var list = listSlice[rands.Int(0, len(listSlice)-1)]
_, _ = list.Exist(hash)
default:
return
}
}
}()
}
wg.Wait()
t.Log("left:", count)
}
func TestFileList_CleanPrefix(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
before := time.Now()
keys, err := list.FindKeysWithPrefix("1234")
err = list.CleanPrefix("1234")
if err != nil {
t.Fatal(err)
}
t.Log("keys:", keys)
t.Log(time.Since(before).Seconds()*1000, "ms")
}
@@ -170,3 +235,14 @@ func TestFileList_CleanAll(t *testing.T) {
t.Log("ok")
t.Log(list.Count())
}
func BenchmarkFileList_Exist(b *testing.B) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
b.Fatal(err)
}
for i := 0; i < b.N; i++ {
_, _ = list.Exist("f0eb5b87e0b0041f3917002c0707475f")
}
}

View File

@@ -3,23 +3,31 @@
package caches
type ListInterface interface {
// Init 初始化
Init() error
// Reset 重置数据
Reset() error
// Add 添加内容
Add(hash string, item *Item) error
// Exist 检查内容是否存在
Exist(hash string) (bool, error)
// FindKeysWithPrefix 根据前缀进行查找
FindKeysWithPrefix(prefix string) (keys []string, err error)
// CleanPrefix 清除某个前缀的缓存
CleanPrefix(prefix string) error
// Remove 删除内容
Remove(hash string) error
// Purge 清理过期数据
Purge(count int, callback func(hash string) error) error
// CleanAll 清除所有缓存
CleanAll() error
// Stat 统计
Stat(check func(hash string) bool) (*Stat, error)
// Count 总数量
@@ -30,4 +38,7 @@ type ListInterface interface {
// OnRemove 删除事件
OnRemove(f func(item *Item))
// Close 关闭
Close() error
}

View File

@@ -1,42 +1,76 @@
package caches
import (
"github.com/iwind/TeaGo/logs"
"strconv"
"strings"
"sync"
"testing"
)
// MemoryList 内存缓存列表管理
type MemoryList struct {
m map[string]*Item // hash => item
itemMaps map[string]map[string]*Item // prefix => { hash => item }
prefixes []string
locker sync.RWMutex
onAdd func(item *Item)
onRemove func(item *Item)
purgeIndex int
}
func NewMemoryList() ListInterface {
return &MemoryList{
m: map[string]*Item{},
itemMaps: map[string]map[string]*Item{},
}
}
func (this *MemoryList) Init() error {
// 内存列表不需要初始化
this.prefixes = []string{"000"}
for i := 100; i <= 999; i++ {
this.prefixes = append(this.prefixes, strconv.Itoa(i))
}
for _, prefix := range this.prefixes {
this.itemMaps[prefix] = map[string]*Item{}
}
return nil
}
func (this *MemoryList) Reset() error {
this.locker.Lock()
this.m = map[string]*Item{}
for key := range this.itemMaps {
this.itemMaps[key] = map[string]*Item{}
}
this.locker.Unlock()
return nil
}
func (this *MemoryList) Add(hash string, item *Item) error {
this.locker.Lock()
prefix := this.prefix(hash)
itemMap, ok := this.itemMaps[prefix]
if !ok {
itemMap = map[string]*Item{}
this.itemMaps[prefix] = itemMap
}
// 先删除,为了可以正确触发统计
oldItem, ok := itemMap[hash]
if ok {
if this.onRemove != nil {
this.onRemove(oldItem)
}
}
// 添加
if this.onAdd != nil {
this.onAdd(item)
}
this.m[hash] = item
itemMap[hash] = item
this.locker.Unlock()
return nil
}
@@ -45,7 +79,12 @@ func (this *MemoryList) Exist(hash string) (bool, error) {
this.locker.RLock()
defer this.locker.RUnlock()
item, ok := this.m[hash]
prefix := this.prefix(hash)
itemMap, ok := this.itemMaps[prefix]
if !ok {
return false, nil
}
item, ok := itemMap[hash]
if !ok {
return false, nil
}
@@ -53,29 +92,37 @@ func (this *MemoryList) Exist(hash string) (bool, error) {
return !item.IsExpired(), nil
}
// FindKeysWithPrefix 根据前缀进行查找
func (this *MemoryList) FindKeysWithPrefix(prefix string) (keys []string, err error) {
// CleanPrefix 根据前缀进行清除
func (this *MemoryList) CleanPrefix(prefix string) error {
this.locker.RLock()
defer this.locker.RUnlock()
// TODO 需要优化性能支持千万级数据低于1s的处理速度
for _, item := range this.m {
if strings.HasPrefix(item.Key, prefix) {
keys = append(keys, item.Key)
for _, itemMap := range this.itemMaps {
for _, item := range itemMap {
if strings.HasPrefix(item.Key, prefix) {
item.ExpiredAt = 0
}
}
}
return
return nil
}
func (this *MemoryList) Remove(hash string) error {
this.locker.Lock()
item, ok := this.m[hash]
itemMap, ok := this.itemMaps[this.prefix(hash)]
if !ok {
this.locker.Unlock()
return nil
}
item, ok := itemMap[hash]
if ok {
if this.onRemove != nil {
this.onRemove(item)
}
delete(this.m, hash)
delete(itemMap, hash)
}
this.locker.Unlock()
@@ -88,7 +135,20 @@ func (this *MemoryList) Remove(hash string) error {
func (this *MemoryList) Purge(count int, callback func(hash string) error) error {
this.locker.Lock()
deletedHashList := []string{}
for hash, item := range this.m {
if this.purgeIndex >= len(this.prefixes) {
this.purgeIndex = 0
}
prefix := this.prefixes[this.purgeIndex]
this.purgeIndex++
itemMap, ok := this.itemMaps[prefix]
if !ok {
this.locker.Unlock()
return nil
}
for hash, item := range itemMap {
if count <= 0 {
break
}
@@ -97,7 +157,7 @@ func (this *MemoryList) Purge(count int, callback func(hash string) error) error
if this.onRemove != nil {
this.onRemove(item)
}
delete(this.m, hash)
delete(itemMap, hash)
deletedHashList = append(deletedHashList, hash)
}
@@ -129,13 +189,15 @@ func (this *MemoryList) Stat(check func(hash string) bool) (*Stat, error) {
Count: 0,
Size: 0,
}
for hash, item := range this.m {
if !item.IsExpired() {
// 检查文件是否存在、内容是否正确等
if check != nil && check(hash) {
result.Count++
result.ValueSize += item.Size()
result.Size += item.TotalSize()
for _, itemMap := range this.itemMaps {
for hash, item := range itemMap {
if !item.IsExpired() {
// 检查文件是否存在、内容是否正确等
if check != nil && check(hash) {
result.Count++
result.ValueSize += item.Size()
result.Size += item.TotalSize()
}
}
}
}
@@ -145,9 +207,12 @@ func (this *MemoryList) Stat(check func(hash string) bool) (*Stat, error) {
// Count 总数量
func (this *MemoryList) Count() (int64, error) {
this.locker.RLock()
count := int64(len(this.m))
var count = 0
for _, itemMap := range this.itemMaps {
count += len(itemMap)
}
this.locker.RUnlock()
return count, nil
return int64(count), nil
}
// OnAdd 添加事件
@@ -159,3 +224,31 @@ func (this *MemoryList) OnAdd(f func(item *Item)) {
func (this *MemoryList) OnRemove(f func(item *Item)) {
this.onRemove = f
}
func (this *MemoryList) Close() error {
return nil
}
func (this *MemoryList) print(t *testing.T) {
this.locker.Lock()
for _, itemMap := range this.itemMaps {
if len(itemMap) > 0 {
logs.PrintAsJSON(itemMap, t)
}
}
this.locker.Unlock()
}
func (this *MemoryList) prefix(hash string) string {
var prefix string
if len(hash) > 3 {
prefix = hash[:3]
} else {
prefix = hash
}
_, ok := this.itemMaps[prefix]
if !ok {
prefix = "000"
}
return prefix
}

View File

@@ -3,14 +3,17 @@ package caches
import (
"fmt"
"github.com/cespare/xxhash"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/rands"
"math/rand"
"strconv"
"testing"
"time"
)
func TestList_Add(t *testing.T) {
list := &MemoryList{}
func TestMemoryList_Add(t *testing.T) {
list := NewMemoryList().(*MemoryList)
_ = list.Init()
_ = list.Add("a", &Item{
Key: "a1",
ExpiredAt: time.Now().Unix() + 3600,
@@ -21,11 +24,18 @@ func TestList_Add(t *testing.T) {
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
t.Log(list.m)
_ = list.Add("123456", &Item{
Key: "c1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
t.Log(list.prefixes)
logs.PrintAsJSON(list.itemMaps, t)
}
func TestList_Remove(t *testing.T) {
list := &MemoryList{}
func TestMemoryList_Remove(t *testing.T) {
list := NewMemoryList().(*MemoryList)
_ = list.Init()
_ = list.Add("a", &Item{
Key: "a1",
ExpiredAt: time.Now().Unix() + 3600,
@@ -37,11 +47,12 @@ func TestList_Remove(t *testing.T) {
HeaderSize: 1024,
})
_ = list.Remove("b")
t.Log(list.m)
list.print(t)
}
func TestList_Purge(t *testing.T) {
list := &MemoryList{}
func TestMemoryList_Purge(t *testing.T) {
list := NewMemoryList().(*MemoryList)
_ = list.Init()
_ = list.Add("a", &Item{
Key: "a1",
ExpiredAt: time.Now().Unix() + 3600,
@@ -66,11 +77,35 @@ func TestList_Purge(t *testing.T) {
t.Log("delete:", hash)
return nil
})
t.Log(list.m)
list.print(t)
for i := 0; i < 1000; i++ {
_ = list.Purge(100, func(hash string) error {
t.Log("delete:", hash)
return nil
})
t.Log(list.purgeIndex)
}
}
func TestList_Stat(t *testing.T) {
list := &MemoryList{}
func TestMemoryList_Purge_Large_List(t *testing.T) {
list := NewMemoryList().(*MemoryList)
_ = list.Init()
for i := 0; i < 1_000_000; i++ {
_ = list.Add("a"+strconv.Itoa(i), &Item{
Key: "a" + strconv.Itoa(i),
ExpiredAt: time.Now().Unix() + int64(rands.Int(0, 24*3600)),
HeaderSize: 1024,
})
}
time.Sleep(1 * time.Hour)
}
func TestMemoryList_Stat(t *testing.T) {
list := NewMemoryList()
_ = list.Init()
_ = list.Add("a", &Item{
Key: "a1",
ExpiredAt: time.Now().Unix() + 3600,
@@ -99,9 +134,37 @@ func TestList_Stat(t *testing.T) {
t.Log(result)
}
func TestList_FindKeysWithPrefix(t *testing.T) {
list := &MemoryList{}
func TestMemoryList_CleanPrefix(t *testing.T) {
list := NewMemoryList()
_ = list.Init()
before := time.Now()
for i := 0; i < 1_000_000; i++ {
key := "http://www.teaos.cn/hello/" + strconv.Itoa(i/10000) + "/" + strconv.Itoa(i) + ".html"
_ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{
Key: key,
ExpiredAt: time.Now().Unix() + 3600,
BodySize: 0,
HeaderSize: 0,
})
}
t.Log(time.Since(before).Seconds()*1000, "ms")
before = time.Now()
err := list.CleanPrefix("http://www.teaos.cn/hello/10")
if err != nil {
t.Fatal(err)
}
logs.Println(list.Stat(func(hash string) bool {
return true
}))
t.Log(time.Since(before).Seconds()*1000, "ms")
}
func TestMemoryList_GC(t *testing.T) {
list := NewMemoryList().(*MemoryList)
_ = list.Init()
for i := 0; i < 1_000_000; i++ {
key := "http://www.teaos.cn/hello" + strconv.Itoa(i/100000) + "/" + strconv.Itoa(i) + ".html"
_ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{
@@ -111,13 +174,17 @@ func TestList_FindKeysWithPrefix(t *testing.T) {
HeaderSize: 0,
})
}
t.Log(time.Since(before).Seconds()*1000, "ms")
time.Sleep(10 * time.Second)
t.Log("clean...", len(list.itemMaps))
_ = list.CleanAll()
t.Log("cleanAll...", len(list.itemMaps))
before := time.Now()
//runtime.GC()
t.Log("gc cost:", time.Since(before).Seconds()*1000, "ms")
before = time.Now()
keys, err := list.FindKeysWithPrefix("http://www.teaos.cn/hello/5000")
if err != nil {
t.Fatal(err)
}
t.Log(len(keys))
t.Log(time.Since(before).Seconds()*1000, "ms")
timeout := time.NewTimer(2 * time.Minute)
<-timeout.C
t.Log("2 minutes passed")
time.Sleep(30 * time.Minute)
}

View File

@@ -12,6 +12,9 @@ type Reader interface {
// Status 状态码
Status() int
// LastModified 最后修改时间
LastModified() int64
// ReadHeader 读取Header
ReadHeader(buf []byte, callback ReaderFunc) error

View File

@@ -114,6 +114,14 @@ func (this *FileReader) Status() int {
return this.status
}
func (this *FileReader) LastModified() int64 {
stat, err := this.fp.Stat()
if err != nil {
return 0
}
return stat.ModTime().Unix()
}
func (this *FileReader) HeaderSize() int64 {
return int64(this.headerSize)
}

View File

@@ -24,6 +24,10 @@ func (this *MemoryReader) Status() int {
return this.item.Status
}
func (this *MemoryReader) LastModified() int64 {
return this.item.ModifiedAt
}
func (this *MemoryReader) HeaderSize() int64 {
return int64(len(this.item.HeaderValue))
}

View File

@@ -36,12 +36,6 @@ const (
SizeMeta = SizeExpiresAt + SizeStatus + SizeURLLength + SizeHeaderLength + SizeBodyLength
)
var (
ErrNotFound = errors.New("cache not found")
ErrFileIsWriting = errors.New("the file is writing")
ErrInvalidRange = errors.New("invalid range")
)
// FileStorage 文件缓存
// 文件结构:
// [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data]
@@ -51,14 +45,16 @@ type FileStorage struct {
memoryStorage *MemoryStorage // 一级缓存
totalSize int64
list ListInterface
locker sync.RWMutex
ticker *utils.Ticker
list ListInterface
writingKeyMap map[string]bool // key => bool
locker sync.RWMutex
ticker *utils.Ticker
}
func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage {
return &FileStorage{
policy: policy,
policy: policy,
writingKeyMap: map[string]bool{},
}
}
@@ -195,9 +191,10 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
}
}
_, path := this.keyPath(key)
hash, path := this.keyPath(key)
// TODO 尝试使用mmap加快读取速度
var isOk = false
fp, err := os.OpenFile(path, os.O_RDONLY, 0444)
if err != nil {
if !os.IsNotExist(err) {
@@ -205,6 +202,21 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
}
return nil, ErrNotFound
}
defer func() {
if !isOk {
_ = fp.Close()
_ = os.Remove(path)
}
}()
// 检查文件记录是否已过期
exists, err := this.list.Exist(hash)
if err != nil {
return nil, err
}
if !exists {
return nil, ErrNotFound
}
reader := NewFileReader(fp)
if err != nil {
@@ -214,6 +226,8 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
if err != nil {
return nil, err
}
isOk = true
return reader, nil
}
@@ -227,17 +241,36 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
}
}
// 是否正在写入
var isWriting = false
this.locker.Lock()
_, ok := this.writingKeyMap[key]
this.locker.Unlock()
if ok {
return nil, ErrFileIsWriting
}
this.locker.Lock()
this.writingKeyMap[key] = true
this.locker.Unlock()
defer func() {
if !isWriting {
this.locker.Lock()
delete(this.writingKeyMap, key)
this.locker.Unlock()
}
}()
// 检查是否超出最大值
count, err := this.list.Count()
if err != nil {
return nil, err
}
if this.policy.MaxKeys > 0 && count > this.policy.MaxKeys {
return nil, errors.New("write file cache failed: too many keys in cache storage")
return nil, NewCapacityError("write file cache failed: too many keys in cache storage")
}
capacityBytes := this.diskCapacityBytes()
if capacityBytes > 0 && capacityBytes <= this.totalSize {
return nil, errors.New("write file cache failed: over disk size, current total size: " + strconv.FormatInt(this.totalSize, 10) + " bytes, capacity: " + strconv.FormatInt(capacityBytes, 10))
return nil, NewCapacityError("write file cache failed: over disk size, current total size: " + strconv.FormatInt(this.totalSize, 10) + " bytes, capacity: " + strconv.FormatInt(capacityBytes, 10))
}
hash := stringutil.Md5(key)
@@ -264,6 +297,7 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
if err != nil {
return nil, err
}
isWriting = true
isOk := false
removeOnFailure := true
@@ -348,7 +382,11 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
isOk = true
return NewFileWriter(writer, key, expiredAt), nil
return NewFileWriter(writer, key, expiredAt, func() {
this.locker.Lock()
delete(this.writingKeyMap, key)
this.locker.Unlock()
}), nil
}
// AddToList 添加到List
@@ -435,6 +473,7 @@ func (this *FileStorage) CleanAll() error {
return nil
}
// 改成待删除
subDirs, err := fp.Readdir(-1)
if err != nil {
return err
@@ -451,7 +490,33 @@ func (this *FileStorage) CleanAll() error {
continue
}
// 删除目录
// 修改目录
tmpDir := dir + "/" + subDir + "-deleted"
err = os.Rename(dir+"/"+subDir, tmpDir)
if err != nil {
return err
}
}
// 重新遍历待删除
fp2, err := os.Open(dir)
if err != nil {
return err
}
defer func() {
_ = fp2.Close()
}()
subDirs, err = fp2.Readdir(-1)
if err != nil {
return err
}
for _, info := range subDirs {
subDir := info.Name()
if !strings.HasSuffix(subDir, "-deleted") {
continue
}
// 删除
err = os.RemoveAll(dir + "/" + subDir)
if err != nil {
return err
@@ -473,33 +538,18 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
// 目录
if urlType == "dir" {
resultKeys := []string{}
for _, key := range keys {
subKeys, err := this.list.FindKeysWithPrefix(key)
err := this.list.CleanPrefix(key)
if err != nil {
return err
}
resultKeys = append(resultKeys, subKeys...)
}
keys = resultKeys
}
// 文件
for _, key := range keys {
hash, path := this.keyPath(key)
exists, err := this.list.Exist(hash)
if err != nil {
return err
}
if !exists {
err := os.Remove(path)
if err != nil && !os.IsNotExist(err) {
return err
}
continue
}
err = os.Remove(path)
err := os.Remove(path)
if err != nil && !os.IsNotExist(err) {
return err
}
@@ -525,6 +575,8 @@ func (this *FileStorage) Stop() {
if this.ticker != nil {
this.ticker.Stop()
}
_ = this.list.Close()
}
// TotalDiskSize 消耗的磁盘尺寸
@@ -695,7 +747,7 @@ func (this *FileStorage) decodeFile(path string) (*Item, error) {
// 清理任务
func (this *FileStorage) purgeLoop() {
_ = this.list.Purge(1000, func(hash string) error {
err := this.list.Purge(1000, func(hash string) error {
path := this.hashPath(hash)
err := os.Remove(path)
if err != nil && !os.IsNotExist(err) {
@@ -703,6 +755,9 @@ func (this *FileStorage) purgeLoop() {
}
return nil
})
if err != nil {
remotelogs.Warn("CACHE", "purge file storage failed: " + err.Error())
}
}
func (this *FileStorage) readToBuff(fp *os.File, buf []byte) (ok bool, err error) {

View File

@@ -3,7 +3,7 @@ package caches
import (
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/errors"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/cespare/xxhash"
"strconv"
@@ -18,6 +18,11 @@ type MemoryItem struct {
BodyValue []byte
Status int
IsDone bool
ModifiedAt int64
}
func (this *MemoryItem) IsExpired() bool {
return this.ExpiredAt < utils.UnixTime()
}
type MemoryStorage struct {
@@ -28,19 +33,23 @@ type MemoryStorage struct {
ticker *utils.Ticker
purgeDuration time.Duration
totalSize int64
writingKeyMap map[string]bool // key => bool
}
func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage {
return &MemoryStorage{
policy: policy,
list: NewMemoryList(),
locker: &sync.RWMutex{},
valuesMap: map[uint64]*MemoryItem{},
policy: policy,
list: NewMemoryList(),
locker: &sync.RWMutex{},
valuesMap: map[uint64]*MemoryItem{},
writingKeyMap: map[string]bool{},
}
}
// Init 初始化
func (this *MemoryStorage) Init() error {
_ = this.list.Init()
this.list.OnAdd(func(item *Item) {
atomic.AddInt64(&this.totalSize, item.TotalSize())
})
@@ -49,7 +58,7 @@ func (this *MemoryStorage) Init() error {
})
if this.purgeDuration <= 0 {
this.purgeDuration = 30 * time.Second
this.purgeDuration = 10 * time.Second
}
// 启动定时清理任务
@@ -91,26 +100,54 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) {
// OpenWriter 打开缓存写入器等待写入
func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
this.locker.Lock()
defer this.locker.Unlock()
// 是否正在写入
var isWriting = false
_, ok := this.writingKeyMap[key]
if ok {
return nil, ErrFileIsWriting
}
this.writingKeyMap[key] = true
defer func() {
if !isWriting {
delete(this.writingKeyMap, key)
}
}()
// 检查是否过期
hash := this.hash(key)
item, ok := this.valuesMap[hash]
if ok && !item.IsExpired() {
return nil, ErrFileIsWriting
}
// 检查是否超出最大值
totalKeys, err := this.list.Count()
if err != nil {
return nil, err
}
if this.policy.MaxKeys > 0 && totalKeys > this.policy.MaxKeys {
return nil, errors.New("write memory cache failed: too many keys in cache storage")
return nil, NewCapacityError("write memory cache failed: too many keys in cache storage")
}
capacityBytes := this.memoryCapacityBytes()
if capacityBytes > 0 && capacityBytes <= this.totalSize {
return nil, errors.New("write memory cache failed: over memory size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
return nil, NewCapacityError("write memory cache failed: over memory size: " + strconv.FormatInt(capacityBytes, 10) + ", current size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
}
// 先删除
err = this.Delete(key)
err = this.deleteWithoutKey(key)
if err != nil {
return nil, err
}
return NewMemoryWriter(this.valuesMap, key, expiredAt, status, this.locker), nil
isWriting = true
return NewMemoryWriter(this.valuesMap, key, expiredAt, status, this.locker, func() {
this.locker.Lock()
delete(this.writingKeyMap, key)
this.locker.Unlock()
}), nil
}
// Delete 删除某个键值对应的缓存
@@ -147,15 +184,12 @@ func (this *MemoryStorage) CleanAll() error {
func (this *MemoryStorage) Purge(keys []string, urlType string) error {
// 目录
if urlType == "dir" {
resultKeys := []string{}
for _, key := range keys {
subKeys, err := this.list.FindKeysWithPrefix(key)
err := this.list.CleanPrefix(key)
if err != nil {
return err
}
resultKeys = append(resultKeys, subKeys...)
}
keys = resultKeys
}
for _, key := range keys {
@@ -170,13 +204,19 @@ func (this *MemoryStorage) Purge(keys []string, urlType string) error {
// Stop 停止缓存策略
func (this *MemoryStorage) Stop() {
this.locker.Lock()
defer this.locker.Unlock()
this.valuesMap = map[uint64]*MemoryItem{}
this.writingKeyMap = map[string]bool{}
_ = this.list.Reset()
if this.ticker != nil {
this.ticker.Stop()
}
_ = this.list.Close()
this.locker.Unlock()
remotelogs.Println("CACHE", "close memory storage '"+strconv.FormatInt(this.policy.Id, 10)+"'")
}
// Policy 获取当前存储的Policy
@@ -235,3 +275,10 @@ func (this *MemoryStorage) memoryCapacityBytes() int64 {
}
return c1
}
func (this *MemoryStorage) deleteWithoutKey(key string) error {
hash := this.hash(key)
delete(this.valuesMap, hash)
_ = this.list.Remove(fmt.Sprintf("%d", hash))
return nil
}

View File

@@ -14,17 +14,19 @@ type FileWriter struct {
headerSize int64
bodySize int64
expiredAt int64
endFunc func()
}
func NewFileWriter(rawWriter *os.File, key string, expiredAt int64) *FileWriter {
func NewFileWriter(rawWriter *os.File, key string, expiredAt int64, endFunc func()) *FileWriter {
return &FileWriter{
key: key,
rawWriter: rawWriter,
expiredAt: expiredAt,
endFunc: endFunc,
}
}
// 写入数据
// WriteHeader 写入数据
func (this *FileWriter) WriteHeader(data []byte) (n int, err error) {
n, err = this.rawWriter.Write(data)
this.headerSize += int64(n)
@@ -34,7 +36,7 @@ func (this *FileWriter) WriteHeader(data []byte) (n int, err error) {
return
}
// 写入Header长度数据
// WriteHeaderLength 写入Header长度数据
func (this *FileWriter) WriteHeaderLength(headerLength int) error {
bytes4 := make([]byte, 4)
binary.BigEndian.PutUint32(bytes4, uint32(headerLength))
@@ -51,7 +53,7 @@ func (this *FileWriter) WriteHeaderLength(headerLength int) error {
return nil
}
// 写入数据
// Write 写入数据
func (this *FileWriter) Write(data []byte) (n int, err error) {
n, err = this.rawWriter.Write(data)
this.bodySize += int64(n)
@@ -61,7 +63,7 @@ func (this *FileWriter) Write(data []byte) (n int, err error) {
return
}
// 写入Body长度数据
// WriteBodyLength 写入Body长度数据
func (this *FileWriter) WriteBodyLength(bodyLength int64) error {
bytes8 := make([]byte, 8)
binary.BigEndian.PutUint64(bytes8, uint64(bodyLength))
@@ -78,8 +80,10 @@ func (this *FileWriter) WriteBodyLength(bodyLength int64) error {
return nil
}
// 关闭
// Close 关闭
func (this *FileWriter) Close() error {
defer this.endFunc()
err := this.WriteHeaderLength(types.Int(this.headerSize))
if err != nil {
return err
@@ -103,8 +107,10 @@ func (this *FileWriter) Close() error {
return err
}
// 丢弃
// Discard 丢弃
func (this *FileWriter) Discard() error {
defer this.endFunc()
_ = this.rawWriter.Close()
err := os.Remove(this.rawWriter.Name())
@@ -127,7 +133,7 @@ func (this *FileWriter) Key() string {
return this.key
}
// 内容类型
// ItemType 获取内容类型
func (this *FileWriter) ItemType() ItemType {
return ItemTypeFile
}

View File

@@ -3,6 +3,7 @@ package caches
import (
"github.com/cespare/xxhash"
"sync"
"time"
)
type MemoryWriter struct {
@@ -14,52 +15,59 @@ type MemoryWriter struct {
bodySize int64
status int
hash uint64
item *MemoryItem
hash uint64
item *MemoryItem
endFunc func()
}
func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, status int, locker *sync.RWMutex) *MemoryWriter {
func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, status int, locker *sync.RWMutex, endFunc func()) *MemoryWriter {
w := &MemoryWriter{
m: m,
key: key,
expiredAt: expiredAt,
locker: locker,
item: &MemoryItem{
ExpiredAt: expiredAt,
Status: status,
ExpiredAt: expiredAt,
ModifiedAt: time.Now().Unix(),
Status: status,
},
status: status,
status: status,
endFunc: endFunc,
}
w.hash = w.calculateHash(key)
return w
}
// 写入数据
// WriteHeader 写入数据
func (this *MemoryWriter) WriteHeader(data []byte) (n int, err error) {
this.headerSize += int64(len(data))
this.item.HeaderValue = append(this.item.HeaderValue, data...)
return len(data), nil
}
// 写入数据
// Write 写入数据
func (this *MemoryWriter) Write(data []byte) (n int, err error) {
this.bodySize += int64(len(data))
this.item.BodyValue = append(this.item.BodyValue, data...)
return len(data), nil
}
// 数据尺寸
// HeaderSize 数据尺寸
func (this *MemoryWriter) HeaderSize() int64 {
return this.headerSize
}
// BodySize 主体内容尺寸
func (this *MemoryWriter) BodySize() int64 {
return this.bodySize
}
// 关闭
// Close 关闭
func (this *MemoryWriter) Close() error {
// 需要在Locker之外
defer this.endFunc()
if this.item == nil {
return nil
}
@@ -72,25 +80,28 @@ func (this *MemoryWriter) Close() error {
return nil
}
// 丢弃
// Discard 丢弃
func (this *MemoryWriter) Discard() error {
// 需要在Locker之外
defer this.endFunc()
this.locker.Lock()
delete(this.m, this.hash)
this.locker.Unlock()
return nil
}
// Key
// Key 获取Key
func (this *MemoryWriter) Key() string {
return this.key
}
// 过期时间
// ExpiredAt 过期时间
func (this *MemoryWriter) ExpiredAt() int64 {
return this.expiredAt
}
// 内容类型
// ItemType 内容类型
func (this *MemoryWriter) ItemType() ItemType {
return ItemTypeMemory
}

View File

@@ -1,7 +1,7 @@
package teaconst
const (
Version = "0.1.1"
Version = "0.2.0"
ProductName = "Edge Node"
ProcessName = "edge-node"

View File

@@ -1,6 +1,7 @@
package nodes
import (
"context"
"encoding/json"
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs"
@@ -13,6 +14,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/logs"
"io"
"net/http"
"os/exec"
@@ -55,10 +57,16 @@ func (this *APIStream) loop() error {
return errors.Wrap(err)
}
isQuiting := false
ctx, cancelFunc := context.WithCancel(rpcClient.Context())
nodeStream, err := rpcClient.NodeRPC().NodeStream(ctx)
events.On(events.EventQuit, func() {
isQuiting = true
remotelogs.Println("API_STREAM", "quiting")
if nodeStream != nil {
cancelFunc()
}
})
nodeStream, err := rpcClient.NodeRPC().NodeStream(rpcClient.Context())
if err != nil {
if isQuiting {
return nil
@@ -69,12 +77,14 @@ func (this *APIStream) loop() error {
for {
if isQuiting {
logs.Println("API_STREAM", "quit")
break
}
message, err := nodeStream.Recv()
if err != nil {
if isQuiting {
remotelogs.Println("API_STREAM", "quit")
return nil
}
return errors.Wrap(err)

View File

@@ -77,7 +77,7 @@ Loop:
return err
}
_, err = client.HTTPAccessLogRPC().CreateHTTPAccessLogs(client.Context(), &pb.CreateHTTPAccessLogsRequest{AccessLogs: accessLogs})
_, err = client.HTTPAccessLogRPC().CreateHTTPAccessLogs(client.Context(), &pb.CreateHTTPAccessLogsRequest{HttpAccessLogs: accessLogs})
if err != nil {
return err
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/types"
"golang.org/x/net/http2"
"io"
"net"
"net/http"
"net/url"
@@ -61,6 +62,7 @@ type HTTPRequest struct {
rewriteIsExternalURL bool // 重写目标是否为外部URL
cacheRef *serverconfigs.HTTPCacheRef // 缓存设置
cacheKey string // 缓存使用的Key
isCached bool // 是否已经被缓存
// WAF相关
firewallPolicyId int64
@@ -231,7 +233,11 @@ func (this *HTTPRequest) doEnd() {
// 流量统计
// TODO 增加是否开启开关
if this.Server != nil {
stats.SharedTrafficStatManager.Add(this.Server.Id, this.writer.sentBodyBytes)
if this.isCached {
stats.SharedTrafficStatManager.Add(this.Server.Id, this.writer.sentBodyBytes, this.writer.sentBodyBytes, 1, 1)
} else {
stats.SharedTrafficStatManager.Add(this.Server.Id, this.writer.sentBodyBytes, 0, 1, 0)
}
}
}
@@ -483,7 +489,7 @@ func (this *HTTPRequest) Format(source string) string {
return this.rawURI
case "requestPath":
return this.requestPath()
case "requestPathExtension": // TODO 需要添加到文档中
case "requestPathExtension":
return filepath.Ext(this.requestPath())
case "requestLength":
return strconv.FormatInt(this.requestLength(), 10)
@@ -578,7 +584,6 @@ func (this *HTTPRequest) Format(source string) string {
}
// response.
// TODO 需要在文档中添加说明
if prefix == "response" {
switch suffix {
case "contentType":
@@ -1142,8 +1147,14 @@ func (this *HTTPRequest) canIgnore(err error) bool {
return true
}
// 网络错误
_, ok := err.(*net.OpError)
if ok {
return true
}
// 客户端主动取消
if err == context.Canceled {
if err == context.Canceled || err == io.ErrShortWrite {
return true
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/iwind/TeaGo/logs"
"net/http"
"strconv"
"time"
)
// 读取缓存
@@ -39,6 +40,9 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
continue
}
if cacheRef.Conds.MatchRequest(this.Format) {
if cacheRef.IsReverse {
return
}
this.cacheRef = cacheRef
refType = "server"
break
@@ -53,6 +57,9 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
continue
}
if cacheRef.Conds.MatchRequest(this.Format) {
if cacheRef.IsReverse {
return
}
this.cacheRef = cacheRef
refType = "policy"
break
@@ -108,7 +115,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
}
if !this.canIgnore(err) {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
}
return
}
@@ -142,7 +149,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
})
if err != nil {
if !this.canIgnore(err) {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
}
return
}
@@ -150,6 +157,47 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
if addStatusHeader {
this.writer.Header().Set("X-Cache", "HIT, "+refType+", "+reader.TypeName())
}
// ETag
var respHeader = this.writer.Header()
var eTag = respHeader.Get("ETag")
var lastModifiedAt = reader.LastModified()
if len(eTag) == 0 {
if lastModifiedAt > 0 {
eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "\""
respHeader["ETag"] = []string{eTag}
}
}
// 支持 Last-Modified
var modifiedTime = respHeader.Get("Last-Modified")
if len(modifiedTime) == 0 {
if lastModifiedAt > 0 {
modifiedTime = time.Unix(lastModifiedAt, 0).Format("Mon, 02 Jan 2006 15:04:05 GMT")
if len(respHeader.Get("Last-Modified")) == 0 {
respHeader.Set("Last-Modified", modifiedTime)
}
}
}
// 支持 If-None-Match
if len(eTag) > 0 && this.requestHeader("If-None-Match") == eTag {
// 自定义Header
this.processResponseHeaders(http.StatusNotModified)
this.writer.WriteHeader(http.StatusNotModified)
this.cacheRef = nil
return true
}
// 支持 If-Modified-Since
if len(modifiedTime) > 0 && this.requestHeader("If-Modified-Since") == modifiedTime {
// 自定义Header
this.processResponseHeaders(http.StatusNotModified)
this.writer.WriteHeader(http.StatusNotModified)
this.cacheRef = nil
return true
}
this.processResponseHeaders(reader.Status())
// 输出Body
@@ -234,7 +282,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
return true
}
if !this.canIgnore(err) {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
}
return
}
@@ -277,7 +325,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
})
if err != nil {
if !this.canIgnore(err) {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
}
return true
}
@@ -300,13 +348,14 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
})
if err != nil {
if !this.canIgnore(err) {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
}
return
}
}
}
this.cacheRef = nil // 终止读取不再往下传递
this.isCached = true
this.cacheRef = nil
return true
}

View File

@@ -12,7 +12,7 @@ import (
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
"github.com/iwind/gofcgi/pkg"
"github.com/iwind/gofcgi/pkg/fcgi"
"io"
"net"
"net/url"
@@ -78,7 +78,7 @@ func (this *HTTPRequest) doFastcgi() (shouldStop bool) {
poolSize = 32
}
client, err := pkg.SharedPool(fastcgi.Network(), fastcgi.RealAddress(), uint(poolSize)).Client()
client, err := fcgi.SharedPool(fastcgi.Network(), fastcgi.RealAddress(), uint(poolSize)).Client()
if err != nil {
this.write500(err)
return
@@ -151,7 +151,7 @@ func (this *HTTPRequest) doFastcgi() (shouldStop bool) {
params["HTTP_HOST"] = this.Host
}
fcgiReq := pkg.NewRequest()
fcgiReq := fcgi.NewRequest()
fcgiReq.SetTimeout(fastcgi.ReadTimeoutDuration())
fcgiReq.SetParams(params)
fcgiReq.SetBody(this.RawReq.Body, uint32(this.requestLength()))
@@ -200,14 +200,20 @@ func (this *HTTPRequest) doFastcgi() (shouldStop bool) {
_, err = io.CopyBuffer(this.writer, resp.Body, buf)
pool.Put(buf)
err1 := resp.Body.Close()
if err1 != nil {
remotelogs.Warn("REQUEST_FASTCGI", err1.Error())
closeErr := resp.Body.Close()
if closeErr != nil {
remotelogs.Warn("HTTP_REQUEST_FASTCGI", closeErr.Error())
}
if err != nil && err != io.EOF {
remotelogs.Warn("REQUEST_FASTCGI", err.Error())
remotelogs.Warn("HTTP_REQUEST_FASTCGI", err.Error())
this.addError(err)
}
// 是否成功结束
if err == nil && closeErr == nil {
this.writer.SetOk()
}
return
}

View File

@@ -13,6 +13,9 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
if !u.IsOn {
continue
}
if !u.MatchRequest(this.Format) {
continue
}
if u.MatchPrefix { // 匹配前缀
if strings.HasPrefix(fullURL, u.BeforeURL) {
afterURL := u.AfterURL

View File

@@ -1,6 +1,7 @@
package nodes
import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/logs"
"io"
@@ -50,7 +51,11 @@ func (this *HTTPRequest) doPage(status int) (shouldStop bool) {
_, err = io.CopyBuffer(this.writer, fp, buf)
bytePool1k.Put(buf)
if err != nil {
logs.Error(err)
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_PAGE", "write to client failed: "+err.Error())
}
} else {
this.writer.SetOk()
}
err = fp.Close()
if err != nil {

View File

@@ -35,7 +35,7 @@ func (this *HTTPRequest) doReverseProxy() {
origin := this.reverseProxy.NextOrigin(requestCall)
if origin == nil {
err := errors.New(this.requestPath() + ": no available backends for reverse proxy")
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error())
remotelogs.Error("HTTP_REQUEST_REVERSE_PROXY", err.Error())
this.write502(err)
return
}
@@ -55,7 +55,7 @@ func (this *HTTPRequest) doReverseProxy() {
// 处理Scheme
if origin.Addr == nil {
err := errors.New(this.requestPath() + ": origin '" + strconv.FormatInt(origin.Id, 10) + "' does not has a address")
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error())
remotelogs.Error("HTTP_REQUEST_REVERSE_PROXY", err.Error())
this.write502(err)
return
}
@@ -142,7 +142,7 @@ func (this *HTTPRequest) doReverseProxy() {
// 获取请求客户端
client, err := SharedHTTPClientPool.Client(this.RawReq, origin, originAddr)
if err != nil {
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error())
remotelogs.Error("HTTP_REQUEST_REVERSE_PROXY", err.Error())
this.write502(err)
return
}
@@ -162,7 +162,7 @@ func (this *HTTPRequest) doReverseProxy() {
// TODO 如果超过最大失败次数,则下线
this.write502(err)
remotelogs.Println("REQUEST_REVERSE_PROXY", this.RawReq.URL.String()+"': "+err.Error())
remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", this.RawReq.URL.String()+"': "+err.Error())
} else {
// 是否为客户端方面的错误
isClientError := false
@@ -189,7 +189,7 @@ func (this *HTTPRequest) doReverseProxy() {
if this.doWAFResponse(resp) {
err = resp.Body.Close()
if err != nil {
remotelogs.Warn("REQUEST_REVERSE_PROXY", err.Error())
remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", err.Error())
}
return
}
@@ -201,7 +201,7 @@ func (this *HTTPRequest) doReverseProxy() {
if len(this.web.Pages) > 0 && this.doPage(resp.StatusCode) {
err = resp.Body.Close()
if err != nil {
remotelogs.Warn("REQUEST_REVERSE_PROXY", err.Error())
remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", err.Error())
}
return
}
@@ -254,17 +254,22 @@ func (this *HTTPRequest) doReverseProxy() {
}
pool.Put(buf)
err1 := resp.Body.Close()
if err1 != nil {
closeErr := resp.Body.Close()
if closeErr != nil {
if !this.canIgnore(err) {
remotelogs.Warn("REQUEST_REVERSE_PROXY", err1.Error())
remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", closeErr.Error())
}
}
if err != nil && err != io.EOF {
if !this.canIgnore(err) {
remotelogs.Warn("REQUEST_REVERSE_PROXY", err.Error())
remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", err.Error())
this.addError(err)
}
}
// 是否成功结束
if err == nil && closeErr == nil {
this.writer.SetOk()
}
}

View File

@@ -382,6 +382,9 @@ func (this *HTTPRequest) doRoot() (isBreak bool) {
}
}
// 设置成功
this.writer.SetOk()
return true
}

View File

@@ -1,6 +1,7 @@
package nodes
import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/logs"
"io"
@@ -65,8 +66,16 @@ func (this *HTTPRequest) doShutdown() {
buf := bytePool1k.Get()
_, err = io.CopyBuffer(this.writer, fp, buf)
bytePool1k.Put(buf)
if err != nil {
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_SHUTDOWN", "write to client failed: "+err.Error())
}
} else {
this.writer.SetOk()
}
err = fp.Close()
if err != nil {
logs.Error(err)
remotelogs.Warn("HTTP_REQUEST_SHUTDOWN", "close file failed: "+err.Error())
}
}

View File

@@ -2,6 +2,7 @@ package nodes
import (
"errors"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/logs"
"io"
@@ -68,4 +69,12 @@ func (this *HTTPRequest) doURL(method string, url string, host string, statusCod
buf := pool.Get()
_, err = io.CopyBuffer(this.writer, resp.Body, buf)
pool.Put(buf)
if err != nil {
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_URL", "write to client failed: "+err.Error())
}
} else {
this.writer.SetOk()
}
}

View File

@@ -70,7 +70,7 @@ func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFir
for _, action := range actions {
goNext, err := action.DoHTTP(this.RawReq, this.RawWriter)
if err != nil {
remotelogs.Error("REQUEST", "do action '"+err.Error()+"' failed: "+err.Error())
remotelogs.Error("HTTP_REQUEST_WAF", "do action '"+err.Error()+"' failed: "+err.Error())
return true, false
}
if !goNext {
@@ -101,7 +101,7 @@ func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFir
for _, remoteAddr := range remoteAddrs {
result, err := iplibrary.SharedLibrary.Lookup(remoteAddr)
if err != nil {
remotelogs.Error("REQUEST", "iplibrary lookup failed: "+err.Error())
remotelogs.Error("HTTP_REQUEST_WAF", "iplibrary lookup failed: "+err.Error())
} else if result != nil {
// 检查国家级别封禁
if len(regionConfig.DenyCountryIds) > 0 && len(result.Country) > 0 {
@@ -147,7 +147,7 @@ func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFir
}
goNext, ruleGroup, ruleSet, err := w.MatchRequest(this.RawReq, this.writer)
if err != nil {
remotelogs.Error("REQUEST", this.rawURI+": "+err.Error())
remotelogs.Error("HTTP_REQUEST_WAF", this.rawURI+": "+err.Error())
return
}
@@ -181,7 +181,7 @@ func (this *HTTPRequest) doWAFResponse(resp *http.Response) (blocked bool) {
goNext, ruleGroup, ruleSet, err := w.MatchResponse(this.RawReq, resp, this.writer)
if err != nil {
remotelogs.Error("REQUEST", this.rawURI+": "+err.Error())
remotelogs.Error("HTTP_REQUEST_WAF", this.rawURI+": "+err.Error())
return
}

View File

@@ -14,7 +14,7 @@ import (
"strings"
)
// 响应Writer
// HTTPWriter 响应Writer
type HTTPWriter struct {
req *HTTPRequest
writer http.ResponseWriter
@@ -32,9 +32,11 @@ type HTTPWriter struct {
cacheWriter caches.Writer // 缓存写入
cacheStorage caches.StorageInterface
isOk bool // 是否完全成功
}
// 包装对象
// NewHTTPWriter 包装对象
func NewHTTPWriter(req *HTTPRequest, httpResponseWriter http.ResponseWriter) *HTTPWriter {
return &HTTPWriter{
req: req,
@@ -42,7 +44,7 @@ func NewHTTPWriter(req *HTTPRequest, httpResponseWriter http.ResponseWriter) *HT
}
}
// 重置
// Reset 重置
func (this *HTTPWriter) Reset(httpResponseWriter http.ResponseWriter) {
this.writer = httpResponseWriter
@@ -58,12 +60,12 @@ func (this *HTTPWriter) Reset(httpResponseWriter http.ResponseWriter) {
this.gzipBodyWriter = nil
}
// 设置Gzip
// Gzip 设置Gzip
func (this *HTTPWriter) Gzip(config *serverconfigs.HTTPGzipConfig) {
this.gzipConfig = config
}
// 准备输出
// Prepare 准备输出
func (this *HTTPWriter) Prepare(size int64, status int) {
this.statusCode = status
@@ -71,12 +73,12 @@ func (this *HTTPWriter) Prepare(size int64, status int) {
this.prepareCache(size)
}
// 包装前的原始的Writer
// Raw 包装前的原始的Writer
func (this *HTTPWriter) Raw() http.ResponseWriter {
return this.writer
}
// 获取Header
// Header 获取Header
func (this *HTTPWriter) Header() http.Header {
if this.writer == nil {
return http.Header{}
@@ -84,7 +86,7 @@ func (this *HTTPWriter) Header() http.Header {
return this.writer.Header()
}
// 添加一组Header
// AddHeaders 添加一组Header
func (this *HTTPWriter) AddHeaders(header http.Header) {
if this.writer == nil {
return
@@ -99,7 +101,7 @@ func (this *HTTPWriter) AddHeaders(header http.Header) {
}
}
// 写入数据
// Write 写入数据
func (this *HTTPWriter) Write(data []byte) (n int, err error) {
if this.writer != nil {
if this.gzipWriter != nil {
@@ -115,8 +117,9 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) {
if this.cacheWriter != nil {
_, err = this.cacheWriter.Write(data)
if err != nil {
_ = this.cacheWriter.Discard()
this.cacheWriter = nil
remotelogs.Error("REQUEST_WRITER", "write cache failed: "+err.Error())
remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
}
}
} else {
@@ -128,7 +131,7 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) {
if this.gzipBodyWriter != nil {
_, err := this.gzipBodyWriter.Write(data)
if err != nil {
remotelogs.Error("REQUEST_WRITER", err.Error())
remotelogs.Error("HTTP_WRITER", err.Error())
}
} else {
this.body = append(this.body, data...)
@@ -137,17 +140,17 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) {
return
}
// 写入字符串
// WriteString 写入字符串
func (this *HTTPWriter) WriteString(s string) (n int, err error) {
return this.Write([]byte(s))
}
// 读取发送的字节数
// SentBodyBytes 读取发送的字节数
func (this *HTTPWriter) SentBodyBytes() int64 {
return this.sentBodyBytes
}
// 写入状态码
// WriteHeader 写入状态码
func (this *HTTPWriter) WriteHeader(statusCode int) {
if this.writer != nil {
this.writer.WriteHeader(statusCode)
@@ -155,7 +158,7 @@ func (this *HTTPWriter) WriteHeader(statusCode int) {
this.statusCode = statusCode
}
// 读取状态码
// StatusCode 读取状态码
func (this *HTTPWriter) StatusCode() int {
if this.statusCode == 0 {
return http.StatusOK
@@ -163,22 +166,22 @@ func (this *HTTPWriter) StatusCode() int {
return this.statusCode
}
// 设置拷贝Body数据
// SetBodyCopying 设置拷贝Body数据
func (this *HTTPWriter) SetBodyCopying(b bool) {
this.bodyCopying = b
}
// 判断是否在拷贝Body数据
// BodyIsCopying 判断是否在拷贝Body数据
func (this *HTTPWriter) BodyIsCopying() bool {
return this.bodyCopying
}
// 读取拷贝的Body数据
// Body 读取拷贝的Body数据
func (this *HTTPWriter) Body() []byte {
return this.body
}
// 读取Header二进制数据
// HeaderData 读取Header二进制数据
func (this *HTTPWriter) HeaderData() []byte {
if this.writer == nil {
return nil
@@ -200,7 +203,12 @@ func (this *HTTPWriter) HeaderData() []byte {
return writer.Bytes()
}
// 关闭
// SetOk 设置成功
func (this *HTTPWriter) SetOk() {
this.isOk = true
}
// Close 关闭
func (this *HTTPWriter) Close() {
// gzip writer
if this.gzipWriter != nil {
@@ -214,20 +222,26 @@ func (this *HTTPWriter) Close() {
// cache writer
if this.cacheWriter != nil {
err := this.cacheWriter.Close()
if err == nil {
this.cacheStorage.AddToList(&caches.Item{
Type: this.cacheWriter.ItemType(),
Key: this.cacheWriter.Key(),
ExpiredAt: this.cacheWriter.ExpiredAt(),
HeaderSize: this.cacheWriter.HeaderSize(),
BodySize: this.cacheWriter.BodySize(),
})
if this.isOk {
err := this.cacheWriter.Close()
if err == nil {
this.cacheStorage.AddToList(&caches.Item{
Type: this.cacheWriter.ItemType(),
Key: this.cacheWriter.Key(),
ExpiredAt: this.cacheWriter.ExpiredAt(),
HeaderSize: this.cacheWriter.HeaderSize(),
BodySize: this.cacheWriter.BodySize(),
Host: this.req.Host,
ServerId: this.req.Server.Id,
})
}
} else {
_ = this.cacheWriter.Discard()
}
}
}
// Hijack
// Hijack Hijack
func (this *HTTPWriter) Hijack() (conn net.Conn, buf *bufio.ReadWriter, err error) {
hijack, ok := this.writer.(http.Hijacker)
if ok {
@@ -236,7 +250,7 @@ func (this *HTTPWriter) Hijack() (conn net.Conn, buf *bufio.ReadWriter, err erro
return
}
// Flush
// Flush Flush
func (this *HTTPWriter) Flush() {
flusher, ok := this.writer.(http.Flusher)
if ok {
@@ -284,7 +298,7 @@ func (this *HTTPWriter) prepareGzip(size int64) {
var err error = nil
this.gzipWriter, err = gzip.NewWriterLevel(this.writer, int(this.gzipConfig.Level))
if err != nil {
remotelogs.Error("REQUEST_WRITER", err.Error())
remotelogs.Error("HTTP_WRITER", err.Error())
return
}
@@ -293,7 +307,7 @@ func (this *HTTPWriter) prepareGzip(size int64) {
this.gzipBodyBuffer = bytes.NewBuffer([]byte{})
this.gzipBodyWriter, err = gzip.NewWriterLevel(this.gzipBodyBuffer, int(this.gzipConfig.Level))
if err != nil {
remotelogs.Error("REQUEST_WRITER", err.Error())
remotelogs.Error("HTTP_WRITER", err.Error())
}
}
@@ -375,8 +389,8 @@ func (this *HTTPWriter) prepareCache(size int64) {
expiredAt := utils.UnixTime() + life
cacheWriter, err := storage.OpenWriter(this.req.cacheKey, expiredAt, this.StatusCode())
if err != nil {
if err != caches.ErrFileIsWriting {
remotelogs.Error("REQUEST_WRITER", "write cache failed: "+err.Error())
if !caches.CanIgnoreErr(err) {
remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
}
return
}
@@ -390,7 +404,8 @@ func (this *HTTPWriter) prepareCache(size int64) {
for _, v1 := range v {
_, err = cacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n"))
if err != nil {
remotelogs.Error("REQUEST_WRITER", "write cache failed: "+err.Error())
remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
_ = this.cacheWriter.Discard()
this.cacheWriter = nil
return
}

View File

@@ -43,8 +43,19 @@ func (this *Listener) Listen() error {
return nil
}
protocol := this.group.Protocol()
if protocol.IsUDPFamily() {
return this.listenUDP()
}
return this.listenTCP()
}
netListener, err := this.createListener()
func (this *Listener) listenTCP() error {
if this.group == nil {
return nil
}
protocol := this.group.Protocol()
netListener, err := this.createTCPListener()
if err != nil {
return err
}
@@ -80,11 +91,6 @@ func (this *Listener) Listen() error {
BaseListener: BaseListener{Group: this.group},
Listener: netListener,
}
case serverconfigs.ProtocolUDP:
this.listener = &UDPListener{
BaseListener: BaseListener{Group: this.group},
Listener: netListener,
}
default:
return errors.New("unknown protocol '" + protocol.String() + "'")
}
@@ -108,6 +114,31 @@ func (this *Listener) Listen() error {
return nil
}
func (this *Listener) listenUDP() error {
listener, err := this.createUDPListener()
if err != nil {
return err
}
events.On(events.EventQuit, func() {
remotelogs.Println("LISTENER", "quit "+this.group.FullAddr())
_ = listener.Close()
})
this.listener = &UDPListener{
BaseListener: BaseListener{Group: this.group},
Listener: listener,
}
go func() {
err := this.listener.Serve()
if err != nil {
remotelogs.Error("LISTENER", err.Error())
}
}()
return nil
}
func (this *Listener) Close() error {
if this.listener == nil {
return nil
@@ -115,8 +146,8 @@ func (this *Listener) Close() error {
return this.listener.Close()
}
// 创建监听器
func (this *Listener) createListener() (net.Listener, error) {
// 创建TCP监听器
func (this *Listener) createTCPListener() (net.Listener, error) {
listenConfig := net.ListenConfig{
Control: nil,
KeepAlive: 0,
@@ -131,3 +162,13 @@ func (this *Listener) createListener() (net.Listener, error) {
return listenConfig.Listen(context.Background(), "tcp", this.group.Addr())
}
// 创建UDP监听器
func (this *Listener) createUDPListener() (*net.UDPConn, error) {
// TODO 将来支持udp4/udp6
addr, err := net.ResolveUDPAddr("udp", this.group.Addr())
if err != nil {
return nil, err
}
return net.ListenUDP("udp", addr)
}

View File

@@ -4,6 +4,8 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"golang.org/x/net/http2"
"io"
"log"
"net"
"net/http"
"strings"
@@ -11,6 +13,8 @@ import (
"time"
)
var httpErrorLogger = log.New(io.Discard, "", 0)
type HTTPListener struct {
BaseListener
@@ -37,6 +41,7 @@ func (this *HTTPListener) Serve() error {
Handler: handler,
ReadHeaderTimeout: 3 * time.Second, // TODO 改成可以配置
IdleTimeout: 2 * time.Minute, // TODO 改成可以配置
ErrorLog: httpErrorLogger,
ConnState: func(conn net.Conn, state http.ConnState) {
switch state {
case http.StateNew:

View File

@@ -2,20 +2,20 @@ package nodes
import "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
// 各协议监听器的接口
// ListenerInterface 各协议监听器的接口
type ListenerInterface interface {
// 初始化
// Init 初始化
Init()
// 监听
// Serve 监听
Serve() error
// 关闭
// Close 关闭
Close() error
// 重载配置
// Reload 重载配置
Reload(serverGroup *serverconfigs.ServerGroup)
// 获取当前活跃的连接数
// CountActiveListeners 获取当前活跃的连接数
CountActiveListeners() int
}

View File

@@ -4,10 +4,12 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/lists"
"net/url"
"regexp"
"sync"
"time"
)
var sharedListenerManager = NewListenerManager()
@@ -17,13 +19,31 @@ type ListenerManager struct {
listenersMap map[string]*Listener // addr => *Listener
locker sync.Mutex
lastConfig *nodeconfigs.NodeConfig
retryListenerMap map[string]*Listener // 需要重试的监听器 addr => Listener
ticker *time.Ticker
}
// NewListenerManager 获取新对象
func NewListenerManager() *ListenerManager {
return &ListenerManager{
listenersMap: map[string]*Listener{},
manager := &ListenerManager{
listenersMap: map[string]*Listener{},
retryListenerMap: map[string]*Listener{},
ticker: time.NewTicker(1 * time.Minute),
}
// 提升测试效率
if Tea.IsTesting() {
manager.ticker = time.NewTicker(5 * time.Second)
}
go func() {
for range manager.ticker.C {
manager.retryListeners()
}
}()
return manager
}
// Start 启动监听
@@ -31,6 +51,9 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
this.locker.Lock()
defer this.locker.Unlock()
// 重置数据
this.retryListenerMap = map[string]*Listener{}
// 检查是否有变化
/**if this.lastConfig != nil && this.lastConfig.Version == node.Version {
return nil
@@ -83,6 +106,9 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
listener.Reload(group)
err := listener.Listen()
if err != nil {
// 放入到重试队列中
this.retryListenerMap[addr] = listener
firstServer := group.FirstServer()
if firstServer == nil {
remotelogs.Error("LISTENER_MANAGER", err.Error())
@@ -122,3 +148,18 @@ func (this *ListenerManager) prettyAddress(addr string) string {
}
return u.String()
}
// 重试失败的Listener
func (this *ListenerManager) retryListeners() {
this.locker.Lock()
defer this.locker.Unlock()
for addr, listener := range this.retryListenerMap {
err := listener.Listen()
if err == nil {
delete(this.retryListenerMap, addr)
this.listenersMap[addr] = listener
remotelogs.ServerSuccess(listener.group.FirstServer().Id, "LISTENER_MANAGER", "retry to listen '"+addr+"' successfully")
}
}
}

View File

@@ -80,7 +80,7 @@ func (this *TCPListener) handleConn(conn net.Conn) error {
}
// 记录流量
stats.SharedTrafficStatManager.Add(firstServer.Id, int64(n))
stats.SharedTrafficStatManager.Add(firstServer.Id, int64(n), 0, 0, 0)
}
if err != nil {
closer()

View File

@@ -1,28 +1,201 @@
package nodes
import (
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/stats"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"net"
"sync"
"time"
)
type UDPListener struct {
BaseListener
Listener net.Listener
Listener *net.UDPConn
connMap map[string]*UDPConn
connLocker sync.Mutex
connTicker *utils.Ticker
}
func (this *UDPListener) Serve() error {
// TODO
// TODO 注意管理 CountActiveConnections
return nil
firstServer := this.Group.FirstServer()
if firstServer == nil {
return errors.New("no server available")
}
if firstServer.ReverseProxy == nil {
return errors.New("no ReverseProxy configured for the server")
}
this.connMap = map[string]*UDPConn{}
this.connTicker = utils.NewTicker(1 * time.Minute)
go func() {
for this.connTicker.Next() {
this.gcConns()
}
}()
var buffer = make([]byte, 4*1024)
for {
n, addr, _ := this.Listener.ReadFrom(buffer)
if n > 0 {
this.connLocker.Lock()
conn, ok := this.connMap[addr.String()]
this.connLocker.Unlock()
if ok && !conn.IsOk() {
_ = conn.Close()
ok = false
}
if !ok {
originConn, err := this.connectOrigin(firstServer.ReverseProxy, "")
if err != nil {
remotelogs.Error("UDP_LISTENER", "unable to connect to origin server: "+err.Error())
continue
}
if originConn == nil {
remotelogs.Error("UDP_LISTENER", "unable to find a origin server")
continue
}
conn = NewUDPConn(firstServer.Id, addr, this.Listener, originConn.(*net.UDPConn))
this.connLocker.Lock()
this.connMap[addr.String()] = conn
this.connLocker.Unlock()
}
_, _ = conn.Write(buffer[:n])
}
}
}
func (this *UDPListener) Close() error {
// TODO
return nil
if this.connTicker != nil {
this.connTicker.Stop()
}
// 关闭所有连接
this.connLocker.Lock()
for _, conn := range this.connMap {
_ = conn.Close()
}
this.connLocker.Unlock()
return this.Listener.Close()
}
func (this *UDPListener) Reload(group *serverconfigs.ServerGroup) {
this.Group = group
this.Reset()
}
func (this *UDPListener) connectOrigin(reverseProxy *serverconfigs.ReverseProxyConfig, remoteAddr string) (conn net.Conn, err error) {
if reverseProxy == nil {
return nil, errors.New("no reverse proxy config")
}
retries := 3
for i := 0; i < retries; i++ {
origin := reverseProxy.NextOrigin(nil)
if origin == nil {
continue
}
conn, err = OriginConnect(origin, remoteAddr)
if err != nil {
remotelogs.Error("UDP_LISTENER", "unable to connect origin: "+origin.Addr.Host+":"+origin.Addr.PortRange+": "+err.Error())
continue
} else {
return
}
}
err = errors.New("no origin can be used")
return
}
// 回收连接
func (this *UDPListener) gcConns() {
this.connLocker.Lock()
closingConns := []*UDPConn{}
for addr, conn := range this.connMap {
if !conn.IsOk() {
closingConns = append(closingConns, conn)
delete(this.connMap, addr)
}
}
this.connLocker.Unlock()
for _, conn := range closingConns {
_ = conn.Close()
}
}
// UDPConn 自定义的UDP连接管理
type UDPConn struct {
addr net.Addr
proxyConn net.Conn
serverConn net.Conn
activatedAt int64
isOk bool
isClosed bool
}
func NewUDPConn(serverId int64, addr net.Addr, proxyConn *net.UDPConn, serverConn *net.UDPConn) *UDPConn {
conn := &UDPConn{
addr: addr,
proxyConn: proxyConn,
serverConn: serverConn,
activatedAt: time.Now().Unix(),
isOk: true,
}
go func() {
buffer := bytePool32k.Get()
defer func() {
bytePool32k.Put(buffer)
}()
for {
n, err := serverConn.Read(buffer)
if n > 0 {
conn.activatedAt = time.Now().Unix()
_, writingErr := proxyConn.WriteTo(buffer[:n], addr)
if writingErr != nil {
conn.isOk = false
break
}
// 记录流量
stats.SharedTrafficStatManager.Add(serverId, int64(n), 0, 0, 0)
}
if err != nil {
conn.isOk = false
break
}
}
}()
return conn
}
func (this *UDPConn) Write(b []byte) (n int, err error) {
this.activatedAt = time.Now().Unix()
n, err = this.serverConn.Write(b)
if err != nil {
this.isOk = false
}
return
}
func (this *UDPConn) Close() error {
this.isOk = false
if this.isClosed {
return nil
}
this.isClosed = true
return this.serverConn.Close()
}
func (this *UDPConn) IsOk() bool {
if !this.isOk {
return false
}
return time.Now().Unix()-this.activatedAt < 30 // 如果超过 N 秒没有活动我们认为是超时
}

View File

@@ -33,8 +33,10 @@ import (
var sharedNodeConfig *nodeconfigs.NodeConfig
var nodeTaskNotify = make(chan bool, 8)
var DaemonIsOn = false
var DaemonPid = 0
// 节点
// Node 节点
type Node struct {
isLoaded bool
}
@@ -43,7 +45,7 @@ func NewNode() *Node {
return &Node{}
}
// 检查配置
// Test 检查配置
func (this *Node) Test() error {
// 检查是否能连接API
rpcClient, err := rpc.SharedRPC()
@@ -58,8 +60,15 @@ func (this *Node) Test() error {
return nil
}
// 启动
// Start 启动
func (this *Node) Start() {
_, ok := os.LookupEnv("EdgeDaemon")
if ok {
remotelogs.Println("NODE", "start from daemon")
DaemonIsOn = true
DaemonPid = os.Getppid()
}
// 启动事件
events.Notify(events.EventStart)
@@ -146,7 +155,7 @@ func (this *Node) Start() {
select {}
}
// 实现守护进程
// Daemon 实现守护进程
func (this *Node) Daemon() {
path := os.TempDir() + "/edge-node.sock"
isDebug := lists.ContainsString(os.Args, "debug")
@@ -164,6 +173,10 @@ func (this *Node) Daemon() {
if err != nil {
return err
}
// 可以标记当前是从守护进程启动的
_ = os.Setenv("EdgeDaemon", "on")
cmd := exec.Command(exe)
err = cmd.Start()
if err != nil {
@@ -191,7 +204,7 @@ func (this *Node) Daemon() {
}
}
// 安装系统服务
// InstallSystemService 安装系统服务
func (this *Node) InstallSystemService() error {
shortName := teaconst.SystemdServiceName
@@ -285,6 +298,8 @@ func (this *Node) loop() error {
if err != nil {
return err
}
case "nodeVersionChanged":
go sharedUpgradeManager.Start()
}
}

View File

@@ -9,7 +9,7 @@ import (
"strconv"
)
// 连接源站
// OriginConnect 连接源站
func OriginConnect(origin *serverconfigs.OriginConfig, remoteAddr string) (net.Conn, error) {
if origin.Addr == nil {
return nil, errors.New("origin server address should not be empty")
@@ -70,9 +70,15 @@ func OriginConnect(origin *serverconfigs.OriginConfig, remoteAddr string) (net.C
return tls.Dial("tcp", origin.Addr.Host+":"+origin.Addr.PortRange, &tls.Config{
InsecureSkipVerify: true,
})
case serverconfigs.ProtocolUDP:
addr, err := net.ResolveUDPAddr("udp", origin.Addr.Host+":"+origin.Addr.PortRange)
if err != nil {
return nil, err
}
return net.DialUDP("udp", nil, addr)
}
// TODO 支持从Unix、Pipe、HTTP、HTTPS中读取数据
return nil, errors.New("invalid scheme '" + origin.Addr.Protocol.String() + "'")
return nil, errors.New("invalid origin scheme '" + origin.Addr.Protocol.String() + "'")
}

View File

@@ -0,0 +1,252 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package nodes
import (
"crypto/md5"
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/logs"
stringutil "github.com/iwind/TeaGo/utils/string"
"os"
"os/exec"
"runtime"
"time"
)
var sharedUpgradeManager = NewUpgradeManager()
// UpgradeManager 节点升级管理器
// TODO 需要在集群中设置是否自动更新
type UpgradeManager struct {
isInstalling bool
lastFile string
}
// NewUpgradeManager 获取新对象
func NewUpgradeManager() *UpgradeManager {
return &UpgradeManager{}
}
// Start 启动升级
func (this *UpgradeManager) Start() {
// 测试环境下不更新
if Tea.IsTesting() {
return
}
if this.isInstalling {
return
}
this.isInstalling = true
// 还原安装状态
defer func() {
this.isInstalling = false
}()
remotelogs.Println("UPGRADE_MANAGER", "upgrading node ...")
err := this.install()
if err != nil {
remotelogs.Error("UPGRADE_MANAGER", "download failed: "+err.Error())
return
}
remotelogs.Println("UPGRADE_MANAGER", "upgrade successfully")
go func() {
err = this.restart()
if err != nil {
logs.Println("UPGRADE_MANAGER", err.Error())
}
}()
}
func (this *UpgradeManager) install() error {
// 检查是否有已下载但未安装成功的
if len(this.lastFile) > 0 {
_, err := os.Stat(this.lastFile)
if err == nil {
err = this.unzip(this.lastFile)
if err != nil {
return err
}
this.lastFile = ""
return nil
}
}
// 创建临时文件
dir := Tea.Root + "/tmp"
_, err := os.Stat(dir)
if err != nil {
if os.IsNotExist(err) {
err = os.Mkdir(dir, 0777)
if err != nil {
return err
}
} else {
return err
}
}
remotelogs.Println("UPGRADE_MANAGER", "downloading new node ...")
path := dir + "/edge-node" + ".tmp"
fp, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0777)
if err != nil {
return err
}
isClosed := false
defer func() {
if !isClosed {
_ = fp.Close()
}
}()
client, err := rpc.SharedRPC()
if err != nil {
return err
}
var offset int64
var h = md5.New()
var sum = ""
var filename = ""
for {
resp, err := client.NodeRPC().DownloadNodeInstallationFile(client.Context(), &pb.DownloadNodeInstallationFileRequest{
Os: runtime.GOOS,
Arch: runtime.GOARCH,
ChunkOffset: offset,
})
if err != nil {
return err
}
if len(resp.Sum) == 0 {
return nil
}
sum = resp.Sum
filename = resp.Filename
if stringutil.VersionCompare(resp.Version, teaconst.Version) <= 0 {
return nil
}
if len(resp.ChunkData) == 0 {
break
}
// 写入文件
_, err = fp.Write(resp.ChunkData)
if err != nil {
return err
}
_, err = h.Write(resp.ChunkData)
if err != nil {
return err
}
offset = resp.Offset
}
if len(filename) == 0 {
return nil
}
isClosed = true
err = fp.Close()
if err != nil {
return err
}
if fmt.Sprintf("%x", h.Sum(nil)) != sum {
_ = os.Remove(path)
return nil
}
// 改成zip
zipPath := dir + "/" + filename
err = os.Rename(path, zipPath)
if err != nil {
return err
}
this.lastFile = zipPath
// 解压
err = this.unzip(zipPath)
if err != nil {
return err
}
return nil
}
// 解压
func (this *UpgradeManager) unzip(zipPath string) error {
var isOk = false
defer func() {
if isOk {
// 只有解压并覆盖成功后才会删除
_ = os.Remove(zipPath)
}
}()
// 解压
var target = Tea.Root
if Tea.IsTesting() {
// 测试环境下只解压在tmp目录
target = Tea.Root + "/tmp"
}
// 先改先前的可执行文件
err := os.Rename(target+"/bin/edge-node", target+"/bin/.edge-node.old")
hasBackup := err == nil
defer func() {
if !isOk && hasBackup {
// 失败时还原
_ = os.Rename(target+"/bin/.edge-node.old", target+"/bin/edge-node")
}
}()
unzip := utils.NewUnzip(zipPath, target, "edge-node/")
err = unzip.Run()
if err != nil {
return err
}
isOk = true
return nil
}
// 重启
func (this *UpgradeManager) restart() error {
// 重新启动
if DaemonIsOn && DaemonPid == os.Getppid() {
os.Exit(0) // TODO 试着更优雅重启
} else {
exe, err := os.Executable()
if err != nil {
return err
}
// quit
events.Notify(events.EventQuit)
// 启动
cmd := exec.Command(exe, "start")
err = cmd.Start()
if err != nil {
return err
}
// 退出当前进程
time.Sleep(1 * time.Second)
os.Exit(0)
}
return nil
}

View File

@@ -0,0 +1,16 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package nodes
import (
_ "github.com/iwind/TeaGo/bootstrap"
"testing"
)
func TestUpgradeManager_install(t *testing.T) {
err := NewUpgradeManager().install()
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}

View File

@@ -93,7 +93,7 @@ func Error(tag string, description string) {
}
}
// ServerError 打印错误信息
// ServerError 打印服务相关错误信息
func ServerError(serverId int64, tag string, description string) {
logs.Println("[" + tag + "]" + description)
@@ -117,6 +117,30 @@ func ServerError(serverId int64, tag string, description string) {
}
}
// ServerSuccess 打印服务相关成功信息
func ServerSuccess(serverId int64, tag string, description string) {
logs.Println("[" + tag + "]" + description)
nodeConfig, _ := nodeconfigs.SharedNodeConfig()
if nodeConfig == nil {
return
}
select {
case logChan <- &pb.NodeLog{
Role: teaconst.Role,
Tag: tag,
Description: description,
Level: "success",
NodeId: nodeConfig.Id,
ServerId: serverId,
CreatedAt: time.Now().Unix(),
}:
default:
}
}
// 上传日志
func uploadLogs() error {
logList := []*pb.NodeLog{}

View File

@@ -15,9 +15,16 @@ import (
var SharedTrafficStatManager = NewTrafficStatManager()
type TrafficItem struct {
Bytes int64
CachedBytes int64
CountRequests int64
CountCachedRequests int64
}
// TrafficStatManager 区域流量统计
type TrafficStatManager struct {
m map[string]int64 // [timestamp serverId] => bytes
itemMap map[string]*TrafficItem // [timestamp serverId] => bytes
locker sync.Mutex
configFunc func() *nodeconfigs.NodeConfig
}
@@ -25,7 +32,7 @@ type TrafficStatManager struct {
// NewTrafficStatManager 获取新对象
func NewTrafficStatManager() *TrafficStatManager {
manager := &TrafficStatManager{
m: map[string]int64{},
itemMap: map[string]*TrafficItem{},
}
return manager
@@ -55,7 +62,7 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig)
}
// Add 添加流量
func (this *TrafficStatManager) Add(serverId int64, bytes int64) {
func (this *TrafficStatManager) Add(serverId int64, bytes int64, cachedBytes int64, countRequests int64, countCachedRequests int64) {
if bytes == 0 {
return
}
@@ -64,7 +71,15 @@ func (this *TrafficStatManager) Add(serverId int64, bytes int64) {
key := strconv.FormatInt(timestamp, 10) + strconv.FormatInt(serverId, 10)
this.locker.Lock()
this.m[key] += bytes
item, ok := this.itemMap[key]
if !ok {
item = &TrafficItem{}
this.itemMap[key] = item
}
item.Bytes += bytes
item.CachedBytes += cachedBytes
item.CountRequests += countRequests
item.CountCachedRequests += countCachedRequests
this.locker.Unlock()
}
@@ -81,12 +96,12 @@ func (this *TrafficStatManager) Upload() error {
}
this.locker.Lock()
m := this.m
this.m = map[string]int64{}
m := this.itemMap
this.itemMap = map[string]*TrafficItem{}
this.locker.Unlock()
pbStats := []*pb.ServerDailyStat{}
for key, bytes := range m {
for key, item := range m {
timestamp, err := strconv.ParseInt(key[:10], 10, 64)
if err != nil {
return err
@@ -97,10 +112,13 @@ func (this *TrafficStatManager) Upload() error {
}
pbStats = append(pbStats, &pb.ServerDailyStat{
ServerId: serverId,
RegionId: config.RegionId,
Bytes: bytes,
CreatedAt: timestamp,
ServerId: serverId,
RegionId: config.RegionId,
Bytes: item.Bytes,
CachedBytes: item.CachedBytes,
CountRequests: item.CountRequests,
CountCachedRequests: item.CountCachedRequests,
CreatedAt: timestamp,
})
}
if len(pbStats) == 0 {

View File

@@ -8,15 +8,15 @@ import (
func TestTrafficStatManager_Add(t *testing.T) {
manager := NewTrafficStatManager()
for i := 0; i < 100; i++ {
manager.Add(1, 10)
manager.Add(1, 10, 1, 0)
}
t.Log(manager.m)
t.Log(manager.itemMap)
}
func TestTrafficStatManager_Upload(t *testing.T) {
manager := NewTrafficStatManager()
for i := 0; i < 100; i++ {
manager.Add(1, 10)
manager.Add(1, 10, 1, 0)
}
err := manager.Upload()
if err != nil {
@@ -30,6 +30,6 @@ func BenchmarkTrafficStatManager_Add(b *testing.B) {
manager := NewTrafficStatManager()
for i := 0; i < b.N; i++ {
manager.Add(1, 1024)
manager.Add(1, 1024, 1, 0)
}
}

98
internal/utils/unzip.go Normal file
View File

@@ -0,0 +1,98 @@
package utils
import (
"archive/zip"
"errors"
"io"
"os"
"strings"
)
type Unzip struct {
zipFile string
targetDir string
stripPrefix string
}
func NewUnzip(zipFile string, targetDir string, stripPrefix string) *Unzip {
return &Unzip{
zipFile: zipFile,
targetDir: targetDir,
stripPrefix: stripPrefix,
}
}
func (this *Unzip) Run() error {
if len(this.zipFile) == 0 {
return errors.New("zip file should not be empty")
}
if len(this.targetDir) == 0 {
return errors.New("target dir should not be empty")
}
reader, err := zip.OpenReader(this.zipFile)
if err != nil {
return err
}
defer func() {
_ = reader.Close()
}()
for _, file := range reader.File {
info := file.FileInfo()
filename := file.Name
if len(this.stripPrefix) > 0 {
filename = strings.TrimPrefix(filename, this.stripPrefix)
}
target := this.targetDir + "/" + filename
// 目录
if info.IsDir() {
stat, err := os.Stat(target)
if err != nil {
if !os.IsNotExist(err) {
return err
} else {
err = os.MkdirAll(target, info.Mode())
if err != nil {
return err
}
}
} else if !stat.IsDir() {
err = os.MkdirAll(target, info.Mode())
if err != nil {
return err
}
}
continue
}
// 文件
err := func(file *zip.File, target string) error {
fileReader, err := file.Open()
if err != nil {
return err
}
defer func() {
_ = fileReader.Close()
}()
fileWriter, err := os.OpenFile(target, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, file.FileInfo().Mode())
if err != nil {
return err
}
defer func() {
_ = fileWriter.Close()
}()
_, err = io.Copy(fileWriter, fileReader)
return err
}(file, target)
if err != nil {
return err
}
}
return nil
}