Compare commits

..

42 Commits

Author SHA1 Message Date
刘祥超
d4cca10301 修复一处无法编译的问题 2023-09-21 16:20:29 +08:00
刘祥超
8597884ec5 修复URL域名跳转设置可能不生效的问题 2023-09-20 08:21:22 +08:00
刘祥超
9f469f5b77 访客IP设置中支持多个请求报头/X-Real-IP也可以从变量中读取 2023-09-17 19:15:10 +08:00
刘祥超
5bc4f5b359 优化Ln请求 2023-09-17 18:17:34 +08:00
刘祥超
494ff5b5bb 智能调节清理缓存阈值 2023-09-17 12:05:06 +08:00
刘祥超
bac0060a74 edge-node cache.garbage命令执行时检查Key列表是否已加载完毕 2023-09-17 11:43:46 +08:00
刘祥超
062ca1cfcd 缓存内容报头时忽略X-Cache 2023-09-17 10:54:14 +08:00
刘祥超
8a4373e984 修复节点缓存磁盘容量设置不生效的问题 2023-09-16 09:36:04 +08:00
刘祥超
99670e46a5 增加edge-node cache.garbage命令用于清理垃圾缓存 2023-09-15 18:14:58 +08:00
刘祥超
64642c6680 优化单次清理LFU缓存数量逻辑 2023-09-15 14:46:31 +08:00
刘祥超
f7a7b50eda 优化缓存自动清理 2023-09-14 20:17:48 +08:00
刘祥超
f5265f1832 优化缓存LFU逻辑 2023-09-14 18:30:11 +08:00
刘祥超
2d6f7522fc 优化删除IP名单时操作 2023-09-13 17:17:05 +08:00
刘祥超
cfb15e764d 将版本号修改为1.2.9 2023-09-12 17:17:56 +08:00
刘祥超
406d8de999 本地数据库启动时在必要的情况下尝试恢复数据 2023-09-12 16:53:41 +08:00
刘祥超
2ca79953b9 优化一处测试用例 2023-09-12 16:51:15 +08:00
刘祥超
d067cd8437 IP名单同步时增加调试信息 2023-09-12 16:05:18 +08:00
刘祥超
e4937685bc 系统在进行健康检查时不进行指标统计 2023-09-12 10:45:44 +08:00
刘祥超
6ac2343aa7 更新依赖 2023-09-11 17:15:10 +08:00
刘祥超
1d80bc640d WebP转换增加宽度和高度限制 2023-09-11 16:05:59 +08:00
刘祥超
b19c431b89 优化代码 2023-09-07 15:10:05 +08:00
刘祥超
a1e1b5ef98 修复集群启用“允许使用节点IP访问”时无法访问IPv6的问题 2023-09-07 14:48:27 +08:00
刘祥超
e93275ac5c 重新实现套餐 2023-09-06 16:34:11 +08:00
刘祥超
a6059ab070 修复状未绑定域名中${status}和${statusMessage}变量值 2023-08-27 21:38:35 +08:00
刘祥超
5b0f94f317 优化过时缓存时长(从600秒改为1200秒) 2023-08-27 14:49:28 +08:00
刘祥超
0b4ac55aa6 源站返回50X时,也可以尝试使用过时缓存 2023-08-26 15:46:45 +08:00
刘祥超
9080c78b13 优化内置的消息提示:增加连接信息方便诊断 2023-08-25 15:43:12 +08:00
刘祥超
aa7d67e387 网站设置增加是否支持${serverAddr}选项/增强${serverAddr}安全性 2023-08-25 15:30:59 +08:00
刘祥超
405b3615fe 增加${serverAddr}变量 2023-08-25 15:04:18 +08:00
刘祥超
7534af09ed 优化编译脚本 2023-08-23 18:21:32 +08:00
刘祥超
cd3bf0cad4 优化编译脚本 2023-08-23 17:50:33 +08:00
刘祥超
41ebdfb7d2 优化响应报头 2023-08-23 16:33:19 +08:00
刘祥超
fa7d4963cb 反向代理增加是否重试50X选项,默认为启用 2023-08-20 15:50:31 +08:00
刘祥超
627d9721b3 优化反向代理错误处理代码 2023-08-20 11:45:39 +08:00
刘祥超
8c3cd53dc3 检查硬盘是否已满时同时检测缓存策略中定义的容量 2023-08-20 11:02:09 +08:00
刘祥超
5995be8489 修复访问日志无法记录自定义跳转状态码的问题 2023-08-20 10:10:23 +08:00
刘祥超
eabaa84252 修复磁盘空间可能为0的情形 2023-08-18 15:56:45 +08:00
刘祥超
40e137e69e 优化指标统计性能 2023-08-15 20:12:09 +08:00
刘祥超
6f51fe52f8 优化代码 2023-08-15 15:49:23 +08:00
刘祥超
dd4071e7dc 优化api_node.yaml生成 2023-08-15 10:33:01 +08:00
刘祥超
7e5b3254eb 修改版本号为1.2.8 2023-08-14 12:24:03 +08:00
刘祥超
ad1947379d 自动生成新的配置文件(api_node.yaml) 2023-08-14 12:23:55 +08:00
60 changed files with 1106 additions and 606 deletions

View File

@@ -6,8 +6,10 @@ function build() {
VERSION=$(lookup-version "$ROOT"/../internal/const/const.go)
DIST=$ROOT/"../dist/${NAME}"
MUSL_DIR="/usr/local/opt/musl-cross/bin"
GCC_X86_64_DIR="/usr/local/Cellar/x86_64-unknown-linux-gnu/10.3.0/bin"
GCC_ARM64_DIR="/usr/local/Cellar/aarch64-unknown-linux-gnu/10.3.0/bin"
# for macOS users: precompiled gcc can be downloaded from https://github.com/messense/homebrew-macos-cross-toolchains
GCC_X86_64_DIR="/usr/local/gcc/x86_64-unknown-linux-gnu/bin"
GCC_ARM64_DIR="//usr/local/gcc/aarch64-unknown-linux-gnu/bin"
OS=${1}
ARCH=${2}
@@ -57,7 +59,10 @@ function build() {
# we support TOA on linux only
if [ "$OS" == "linux" ] && [ -f "${ROOT}/edge-toa/edge-toa-${ARCH}" ]
then
mkdir "$DIST/edge-toa"
if [ ! -d "$DIST/edge-toa" ]
then
mkdir "$DIST/edge-toa"
fi
cp "${ROOT}/edge-toa/edge-toa-${ARCH}" "$DIST/edge-toa/edge-toa"
fi
@@ -121,6 +126,11 @@ function build() {
env GOOS="${OS}" GOARCH="${ARCH}" CGO_ENABLED=1 go build -trimpath -tags $BUILD_TAG -o "$DIST"/bin/${NAME} -ldflags="-s -w" "$ROOT"/../cmd/edge-node/main.go
fi
if [ ! -f "${DIST}/bin/${NAME}" ]; then
echo "build failed!"
exit
fi
# delete hidden files
find "$DIST" -name ".DS_Store" -delete
find "$DIST" -name ".gitignore" -delete

View File

@@ -22,6 +22,7 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"time"
)
@@ -30,7 +31,7 @@ func main() {
Version(teaconst.Version).
Product(teaconst.ProductName).
Usage(teaconst.ProcessName + " [-v|start|stop|restart|status|quit|test|reload|service|daemon|pprof|accesslog|uninstall]").
Usage(teaconst.ProcessName + " [trackers|goman|conns|gc|bandwidth|disk]").
Usage(teaconst.ProcessName + " [trackers|goman|conns|gc|bandwidth|disk|cache.garbage]").
Usage(teaconst.ProcessName + " [ip.drop|ip.reject|ip.remove|ip.close] IP")
app.On("start:before", func() {
@@ -464,6 +465,45 @@ func main() {
fmt.Println("Usage: edge-node disk [speed]")
}
})
app.On("cache.garbage", func() {
fmt.Println("scanning ...")
var shouldDelete bool
for _, arg := range os.Args {
if strings.TrimLeft(arg, "-") == "delete" {
shouldDelete = true
}
}
var sock = gosock.NewTmpSock(teaconst.ProcessName)
reply, err := sock.Send(&gosock.Command{
Code: "cache.garbage",
Params: map[string]any{"delete": shouldDelete},
})
if err != nil {
fmt.Println("[ERROR]" + err.Error())
return
}
var params = maps.NewMap(reply.Params)
if params.GetBool("isOk") {
var count = params.GetInt("count")
fmt.Println("found", count, "bad caches")
if count > 0 {
fmt.Println("======")
var sampleFiles = params.GetSlice("sampleFiles")
for _, file := range sampleFiles {
fmt.Println(types.String(file))
}
if count > len(sampleFiles) {
fmt.Println("... more files")
}
}
} else {
fmt.Println("[ERROR]" + params.GetString("error"))
}
})
app.Run(func() {
var node = nodes.NewNode()
node.Start()

30
go.mod
View File

@@ -13,32 +13,33 @@ require (
github.com/aliyun/aliyun-oss-go-sdk v2.2.7+incompatible
github.com/andybalholm/brotli v1.0.5
github.com/aws/aws-sdk-go v1.44.279
github.com/baidubce/bce-sdk-go v0.9.153
github.com/biessek/golang-ico v0.0.0-20180326222316-d348d9ea4670
github.com/cespare/xxhash v1.1.0
github.com/dchest/captcha v0.0.0-20200903113550-03f5f0333e1f
github.com/fsnotify/fsnotify v1.6.0
github.com/go-redis/redis/v8 v8.11.5
github.com/golang/protobuf v1.5.3
github.com/google/nftables v0.1.0
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.4+incompatible
github.com/iwind/TeaGo v0.0.0-20230630104525-161f0b32996d
github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11
github.com/iwind/gosock v0.0.0-20211103081026-ee4652210ca4
github.com/iwind/gowebp v0.0.0-20230615040911-5013dbb9d508
github.com/iwind/gowebp v0.0.0-20230911074406-2e4e7fd0b59f
github.com/klauspost/compress v1.16.5
github.com/mattn/go-sqlite3 v1.14.9
github.com/mattn/go-sqlite3 v1.14.17
github.com/mdlayher/netlink v1.7.1
github.com/miekg/dns v1.1.43
github.com/mssola/useragent v1.0.0
github.com/pires/go-proxyproto v0.6.1
github.com/qiniu/go-sdk/v7 v7.16.0
github.com/quic-go/quic-go v0.36.1
github.com/quic-go/quic-go v0.38.1
github.com/shirou/gopsutil/v3 v3.22.2
github.com/tencentyun/cos-go-sdk-v5 v0.7.41
golang.org/x/image v0.7.0
golang.org/x/net v0.12.0
golang.org/x/sys v0.10.0
golang.org/x/net v0.14.0
golang.org/x/sys v0.11.0
google.golang.org/grpc v1.55.0
google.golang.org/protobuf v1.30.0
gopkg.in/yaml.v3 v3.0.1
)
@@ -50,9 +51,10 @@ require (
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/go-querystring v1.0.0 // indirect
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect
github.com/google/pprof v0.0.0-20230821062121-407c9e7a662f // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/native v1.0.0 // indirect
github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e // indirect
@@ -60,23 +62,21 @@ require (
github.com/mdlayher/socket v0.4.0 // indirect
github.com/mitchellh/mapstructure v1.4.3 // indirect
github.com/mozillazg/go-httpheader v0.2.1 // indirect
github.com/onsi/ginkgo/v2 v2.11.0 // indirect
github.com/onsi/ginkgo/v2 v2.12.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/qtls-go1-19 v0.3.2 // indirect
github.com/quic-go/qtls-go1-20 v0.2.2 // indirect
github.com/quic-go/qtls-go1-20 v0.3.3 // indirect
github.com/tdewolff/minify/v2 v2.12.7 // indirect
github.com/tdewolff/parse/v2 v2.6.6 // indirect
github.com/tklauser/go-sysconf v0.3.9 // indirect
github.com/tklauser/numcpus v0.3.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.11.0 // indirect
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/protobuf v1.30.0 // indirect
)

57
go.sum
View File

@@ -7,6 +7,8 @@ github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/aws/aws-sdk-go v1.44.279 h1:g23dxnYjIiPlQo0gIKNR0zVPsSvo1bj5frWln+5sfhk=
github.com/aws/aws-sdk-go v1.44.279/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/baidubce/bce-sdk-go v0.9.153 h1:h5l2EXehe4C4/bdlAPBaULrbnEDgIu5HOYgniN7bjGM=
github.com/baidubce/bce-sdk-go v0.9.153/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg=
github.com/biessek/golang-ico v0.0.0-20180326222316-d348d9ea4670 h1:FQPKKjDhzG0T4ew6dm6MGrXb4PRAi8ZmTuYuxcF62BM=
github.com/biessek/golang-ico v0.0.0-20180326222316-d348d9ea4670/go.mod h1:iRWAFbKXMMkVQyxZ1PfGlkBr1TjATx1zy2MRprV7A3Q=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
@@ -53,8 +55,10 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 h1:n6vlPhxsA+BW/XsS5+uqi7GyzaLa5MH7qlSLBZtRdiA=
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8/go.mod h1:Jh3hGz2jkYak8qXPD19ryItVnUgpgeqzdkY/D0EaeuA=
github.com/google/pprof v0.0.0-20230817174616-7a8ec2ada47b h1:h9U78+dx9a4BKdQkBBos92HalKpaGKHrp+3Uo6yTodo=
github.com/google/pprof v0.0.0-20230817174616-7a8ec2ada47b/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik=
github.com/google/pprof v0.0.0-20230821062121-407c9e7a662f h1:pDhu5sgp8yJlEF/g6osliIIpF9K4F5jvkULXa4daRDQ=
github.com/google/pprof v0.0.0-20230821062121-407c9e7a662f/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.4+incompatible h1:XRAk4HBDLCYEdPLWtKf5iZhOi7lfx17aY0oSO9+mcg8=
@@ -69,6 +73,8 @@ github.com/iwind/gosock v0.0.0-20211103081026-ee4652210ca4 h1:VWGsCqTzObdlbf7UUE
github.com/iwind/gosock v0.0.0-20211103081026-ee4652210ca4/go.mod h1:H5Q7SXwbx3a97ecJkaS2sD77gspzE7HFUafBO0peEyA=
github.com/iwind/gowebp v0.0.0-20230615040911-5013dbb9d508 h1:fjKiHAyPQmdwuw1DQ2BI1JTbhUWAtI3Kr9wIZQBdRgQ=
github.com/iwind/gowebp v0.0.0-20230615040911-5013dbb9d508/go.mod h1:Re7TEhwL+ygnxFg52fC0PWy01ULAIZp2QR0q5WwEOQA=
github.com/iwind/gowebp v0.0.0-20230911074406-2e4e7fd0b59f h1:b+YNSK4PgRU4u5YuYW8W4dHO3LNsG7XvX2dJQK0jOf8=
github.com/iwind/gowebp v0.0.0-20230911074406-2e4e7fd0b59f/go.mod h1:Re7TEhwL+ygnxFg52fC0PWy01ULAIZp2QR0q5WwEOQA=
github.com/iwind/nftables v0.0.0-20230419014751-9f023a644ad4 h1:RPAH9Sj9l/20zH5zU5/iJGszfwPq6eLjoiC/n/asulA=
github.com/iwind/nftables v0.0.0-20230419014751-9f023a644ad4/go.mod h1:7OLL+86wZKfBnAJxNxmdcZ0ebbgdp/A28fcagx9oJqA=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
@@ -93,8 +99,8 @@ github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ic
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/matryer/try v0.0.0-20161228173917-9ac251b645a2/go.mod h1:0KeJpeMD6o+O4hW7qJOT7vyQPKrWmj26uf5wMc/IiIs=
github.com/mattn/go-sqlite3 v1.14.9 h1:10HX2Td0ocZpYEjhilsuo6WWtUqttj2Kb0KtD86/KYA=
github.com/mattn/go-sqlite3 v1.14.9/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM=
github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/mdlayher/netlink v1.7.1 h1:FdUaT/e33HjEXagwELR8R3/KL1Fq5x3G5jgHLp/BTmg=
github.com/mdlayher/netlink v1.7.1/go.mod h1:nKO5CSjE/DJjVhk/TNp6vCE1ktVxEA8VEh8drhZzxsQ=
github.com/mdlayher/socket v0.4.0 h1:280wsy40IC9M9q1uPGcLBwXpcTQDtoGwVt+BNoITxIw=
@@ -111,7 +117,10 @@ github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU=
github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM=
github.com/onsi/ginkgo/v2 v2.12.0 h1:UIVDowFPwpg6yMUpPjGkYvf06K3RAiJXUhCxEwQVHRI=
github.com/onsi/ginkgo/v2 v2.12.0/go.mod h1:ZNEzXISYlqpb8S36iN71ifqLi3vVD1rVJGvWRCJOUpQ=
github.com/onsi/gomega v1.27.8 h1:gegWiwZjBsf2DgiSbf5hpokZ98JVDMcWkUiigk6/KXc=
github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI=
github.com/pires/go-proxyproto v0.6.1 h1:EBupykFmo22SDjv4fQVQd2J9NOoLPmyZA/15ldOGkPw=
github.com/pires/go-proxyproto v0.6.1/go.mod h1:Odh9VFOZJCf9G8cLW5o435Xf1J95Jw9Gw5rnCjcwzAY=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
@@ -126,12 +135,14 @@ github.com/qiniu/go-sdk/v7 v7.16.0/go.mod h1:nqoYCNo53ZlGA521RvRethvxUDvXKt4gtYX
github.com/qiniu/x v1.10.5/go.mod h1:03Ni9tj+N2h2aKnAz+6N0Xfl8FwMEDRC2PAlxekASDs=
github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
github.com/quic-go/qtls-go1-19 v0.3.2 h1:tFxjCFcTQzK+oMxG6Zcvp4Dq8dx4yD3dDiIiyc86Z5U=
github.com/quic-go/qtls-go1-19 v0.3.2/go.mod h1:ySOI96ew8lnoKPtSqx2BlI5wCpUVPT05RMAlajtnyOI=
github.com/quic-go/qtls-go1-20 v0.2.2 h1:WLOPx6OY/hxtTxKV1Zrq20FtXtDEkeY00CGQm8GEa3E=
github.com/quic-go/qtls-go1-20 v0.2.2/go.mod h1:JKtK6mjbAVcUTN/9jZpvLbGxvdWIKS8uT7EiStoU1SM=
github.com/quic-go/quic-go v0.36.1 h1:WsG73nVtnDy1TiACxFxhQ3TqaW+DipmqzLEtNlAwZyY=
github.com/quic-go/quic-go v0.36.1/go.mod h1:zPetvwDlILVxt15n3hr3Gf/I3mDf7LpLKPhR4Ez0AZQ=
github.com/quic-go/qtls-go1-20 v0.3.2 h1:rRgN3WfnKbyik4dBV8A6girlJVxGand/d+jVKbQq5GI=
github.com/quic-go/qtls-go1-20 v0.3.2/go.mod h1:X9Nh97ZL80Z+bX/gUXMbipO6OxdiDi58b/fMC9mAL+k=
github.com/quic-go/qtls-go1-20 v0.3.3 h1:17/glZSLI9P9fDAeyCHBFSWSqJcwx1byhLwP5eUIDCM=
github.com/quic-go/qtls-go1-20 v0.3.3/go.mod h1:X9Nh97ZL80Z+bX/gUXMbipO6OxdiDi58b/fMC9mAL+k=
github.com/quic-go/quic-go v0.38.0 h1:T45lASr5q/TrVwt+jrVccmqHhPL2XuSyoCLVCpfOSLc=
github.com/quic-go/quic-go v0.38.0/go.mod h1:MPCuRq7KBK2hNcfKj/1iD1BGuN3eAYMeNxp3T42LRUg=
github.com/quic-go/quic-go v0.38.1 h1:M36YWA5dEhEeT+slOu/SwMEucbYd0YFidxG3KlGPZaE=
github.com/quic-go/quic-go v0.38.1/go.mod h1:ijnZM7JsFIkp4cRyjxJNIzdSfCLmUMg9wdyhGmg+SN4=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
@@ -170,10 +181,10 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 h1:m64FZMko/V45gv0bNmrNYoDEq8U5YUhetc9cBWKS1TQ=
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63/go.mod h1:0v4NqG35kSWCMzLaMeX+IQrlSnVE/bqGSyC2cz/9Le8=
golang.org/x/image v0.7.0 h1:gzS29xtG1J5ybQlv0PuyfE3nmc6R4qB73m6LUUmvFuw=
golang.org/x/image v0.7.0/go.mod h1:nd/q4ef1AKKYl/4kft7g+6UyGbdiqWqTP1ZAbRoV7Rg=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
@@ -188,8 +199,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50=
golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -214,8 +225,8 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
@@ -228,8 +239,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@@ -237,8 +248,8 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.11.0 h1:EMCa6U9S2LtZXLAMoWiR/R8dAQFRqbAitmbJ2UKhoi8=
golang.org/x/tools v0.11.0/go.mod h1:anzJrxPjNtfgiYQYirP2CPGzGLxrH2u2QBhn6Bf3qY8=
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 h1:Vve/L0v7CXXuxUmaMGIEK/dEeq7uiqb5qBgQrZzIE7E=
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846/go.mod h1:Sc0INKfu04TlqNoRA1hgpFZbhYXHPr4V5DzpSBTPqQM=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -263,3 +274,5 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
rogchap.com/v8go v0.9.0 h1:wYbUCO4h6fjTamziHrzyrPnpFNuzPpjZY+nfmZjNaew=
rogchap.com/v8go v0.9.0/go.mod h1:MxgP3pL2MW4dpme/72QRs8sgNMmM0pRc8DPhcuLWPAs=

View File

@@ -3,7 +3,6 @@ package caches
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
"strings"
"time"
)
type ItemType = int
@@ -16,7 +15,7 @@ const (
// 计算当前周
// 不要用YW因为需要计算两周是否临近
func currentWeek() int32 {
return int32(time.Now().Unix() / 86400)
return int32(fasttime.Now().Unix() / 86400)
}
type Item struct {
@@ -29,10 +28,7 @@ type Item struct {
MetaSize int64 `json:"metaSize"`
Host string `json:"host"` // 主机名
ServerId int64 `json:"serverId"` // 服务ID
Week1Hits int64 `json:"week1Hits"`
Week2Hits int64 `json:"week2Hits"`
Week int32 `json:"week"`
Week int32 `json:"week"`
}
func (this *Item) IsExpired() bool {
@@ -47,20 +43,6 @@ func (this *Item) Size() int64 {
return this.HeaderSize + this.BodySize
}
func (this *Item) IncreaseHit(week int32) {
if this.Week == week {
this.Week2Hits++
} else {
if week-this.Week == 1 {
this.Week1Hits = this.Week2Hits
} else {
this.Week1Hits = 0
}
this.Week2Hits = 1
this.Week = week
}
}
func (this *Item) RequestURI() string {
var schemeIndex = strings.Index(this.Key, "://")
if schemeIndex <= 0 {

View File

@@ -1,8 +1,9 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
package caches_test
import (
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
@@ -11,27 +12,14 @@ import (
"time"
)
func TestItem_IncreaseHit(t *testing.T) {
var week = currentWeek()
var item = &Item{}
//item.Week = 2704
item.Week2Hits = 100
item.IncreaseHit(week)
t.Log("week:", item.Week, "week1:", item.Week1Hits, "week2:", item.Week2Hits)
item.IncreaseHit(week)
t.Log("week:", item.Week, "week1:", item.Week1Hits, "week2:", item.Week2Hits)
}
func TestItems_Memory(t *testing.T) {
var stat = &runtime.MemStats{}
runtime.ReadMemStats(stat)
var memory1 = stat.HeapInuse
var items = []*Item{}
var items = []*caches.Item{}
for i := 0; i < 10_000_000; i++ {
items = append(items, &Item{
items = append(items, &caches.Item{
Key: types.String(i),
})
}
@@ -41,18 +29,11 @@ func TestItems_Memory(t *testing.T) {
t.Log(memory1, memory2, (memory2-memory1)/1024/1024, "M")
var weekItems = make(map[string]*Item, 10_000_000)
for _, item := range items {
weekItems[item.Key] = item
}
runtime.ReadMemStats(stat)
var memory3 = stat.HeapInuse
t.Log(memory2, memory3, (memory3-memory2)/1024/1024, "M")
time.Sleep(1 * time.Second)
t.Log(len(items), len(weekItems))
}
func TestItems_Memory2(t *testing.T) {
@@ -88,7 +69,7 @@ func TestItem_RequestURI(t *testing.T) {
"https://goedge.cn:8080/hello/world",
"https://goedge.cn/hello/world?v=1&t=123",
} {
var item = &Item{Key: u}
var item = &caches.Item{Key: u}
t.Log(u, "=>", item.RequestURI())
}
}

View File

@@ -4,6 +4,7 @@ package caches
import (
"database/sql"
"errors"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
@@ -134,7 +135,7 @@ func (this *FileList) Exist(hash string) (bool, error) {
var expiredAt int64
err := row.Scan(&expiredAt)
if err != nil {
if err == sql.ErrNoRows {
if errors.Is(err, sql.ErrNoRows) {
err = nil
}
return false, err
@@ -143,6 +144,18 @@ func (this *FileList) Exist(hash string) (bool, error) {
return true, nil
}
func (this *FileList) ExistQuick(hash string) (isReady bool, found bool) {
var db = this.GetDB(hash)
if !db.IsReady() || !db.HashMapIsLoaded() {
return
}
isReady = true
found = db.hashMap.Exist(hash)
return
}
// CleanPrefix 清理某个前缀的缓存数据
func (this *FileList) CleanPrefix(prefix string) error {
if len(prefix) == 0 {
@@ -256,16 +269,10 @@ func (this *FileList) PurgeLFU(count int, callback func(hash string) error) erro
// 不在 rows.Next() 循环中操作是为了避免死锁
for _, hash := range hashStrings {
notFound, err := this.remove(hash)
_, err = this.remove(hash)
if err != nil {
return err
}
if notFound {
err = db.DeleteHitAsync(hash)
if err != nil {
return db.WrapError(err)
}
}
err = callback(hash)
if err != nil {
@@ -372,6 +379,15 @@ func (this *FileList) GetDB(hash string) *FileListDB {
return this.dbList[fnv.HashString(hash)%CountFileDB]
}
func (this *FileList) HashMapIsLoaded() bool {
for _, db := range this.dbList {
if !db.HashMapIsLoaded() {
return false
}
}
return true
}
func (this *FileList) remove(hash string) (notFound bool, err error) {
var db = this.GetDB(hash)
@@ -393,11 +409,6 @@ func (this *FileList) remove(hash string) (notFound bool, err error) {
return false, db.WrapError(err)
}
err = db.DeleteHitAsync(hash)
if err != nil {
return false, db.WrapError(err)
}
if this.onRemove != nil {
// when remove file item, no any extra information needed
this.onRemove(nil)
@@ -493,9 +504,6 @@ func (this *FileList) UpgradeV3(oldDir string, brokenOnError bool) error {
MetaSize: metaSize,
Host: host,
ServerId: serverId,
Week1Hits: 0,
Week2Hits: 0,
Week: 0,
})
if err != nil {
if brokenOnError {

View File

@@ -34,10 +34,10 @@ type FileListDB struct {
hashMap *FileListHashMap
itemsTableName string
hitsTableName string
isClosed bool
isReady bool
isClosed bool // 是否已关闭
isReady bool // 是否已完成初始化
hashMapIsLoaded bool // Hash是否已加载
// cacheItems
existsByHashStmt *dbs.Stmt // 根据hash检查是否存在
@@ -57,11 +57,6 @@ type FileListDB struct {
deleteAllStmt *dbs.Stmt // 删除所有数据
listOlderItemsStmt *dbs.Stmt // 读取较早存储的缓存
updateAccessWeekSQL string // 修改访问日期
// hits
insertHitSQL string // 写入数据
increaseHitSQL string // 增加点击量
deleteHitByHashSQL string // 根据hash删除数据
}
func NewFileListDB() *FileListDB {
@@ -142,7 +137,6 @@ func (this *FileListDB) Open(dbPath string) error {
func (this *FileListDB) Init() error {
this.itemsTableName = "cacheItems"
this.hitsTableName = "hits"
// 创建
var err = this.initTables(1)
@@ -200,35 +194,10 @@ func (this *FileListDB) Init() error {
this.updateAccessWeekSQL = `UPDATE "` + this.itemsTableName + `" SET "accessWeek"=? WHERE "hash"=?`
this.insertHitSQL = `INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?)`
this.increaseHitSQL = `INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?) ON CONFLICT("hash") DO UPDATE SET "week1Hits"=IIF("week"=?, "week1Hits", "week2Hits"), "week2Hits"=IIF("week"=?, "week2Hits"+1, 1), "week"=?`
this.deleteHitByHashSQL = `DELETE FROM "` + this.hitsTableName + `" WHERE "hash"=?`
this.isReady = true
// 加载HashMap
go func() {
err := this.hashMap.Load(this)
if err != nil {
remotelogs.Error("LIST_FILE_DB", "load hash map failed: "+err.Error()+"(file: "+this.dbPath+")")
// 自动修复错误
// TODO 将来希望能尽可能恢复以往数据库中的内容
if strings.Contains(err.Error(), "database is closed") || strings.Contains(err.Error(), "database disk image is malformed") {
_ = this.Close()
this.deleteDB()
remotelogs.Println("LIST_FILE_DB", "recreating the database (file:"+this.dbPath+") ...")
err = this.Open(this.dbPath)
if err != nil {
remotelogs.Error("LIST_FILE_DB", "recreate the database failed: "+err.Error()+" (file:"+this.dbPath+")")
} else {
_ = this.Init()
}
}
}
}()
go this.loadHashMap()
return nil
}
@@ -352,25 +321,18 @@ func (this *FileListDB) ListHashes(lastId int64) (hashList []string, maxId int64
func (this *FileListDB) IncreaseHitAsync(hash string) error {
var week = timeutil.Format("YW")
this.writeBatch.Add(this.increaseHitSQL, hash, week, week, week, week)
this.writeBatch.Add(this.updateAccessWeekSQL, week, hash)
return nil
}
func (this *FileListDB) DeleteHitAsync(hash string) error {
this.writeBatch.Add(this.deleteHitByHashSQL, hash)
return nil
}
func (this *FileListDB) CleanPrefix(prefix string) error {
if !this.isReady {
return nil
}
var count = int64(10000)
var staleLife = 600 // TODO 需要可以设置
var unixTime = fasttime.Now().Unix() // 只删除当前的,不删除新的
for {
result, err := this.writeDB.Exec(`UPDATE "`+this.itemsTableName+`" SET expiredAt=0,staleAt=? WHERE id IN (SELECT id FROM "`+this.itemsTableName+`" WHERE expiredAt>0 AND createdAt<=? AND INSTR("key", ?)=1 LIMIT `+types.String(count)+`)`, unixTime+int64(staleLife), unixTime, prefix)
result, err := this.writeDB.Exec(`UPDATE "`+this.itemsTableName+`" SET expiredAt=0,staleAt=? WHERE id IN (SELECT id FROM "`+this.itemsTableName+`" WHERE expiredAt>0 AND createdAt<=? AND INSTR("key", ?)=1 LIMIT `+types.String(count)+`)`, unixTime+DefaultStaleCacheSeconds, unixTime, prefix)
if err != nil {
return this.WrapError(err)
}
@@ -414,15 +376,14 @@ func (this *FileListDB) CleanMatchKey(key string) error {
queryKey = strings.Replace(queryKey, "*", "%", 1)
// TODO 检查大批量数据下的操作性能
var staleLife = 600 // TODO 需要可以设置
var unixTime = fasttime.Now().Unix() // 只删除当前的,不删除新的
_, err = this.writeDB.Exec(`UPDATE "`+this.itemsTableName+`" SET "expiredAt"=0, "staleAt"=? WHERE "host" GLOB ? AND "host" NOT GLOB ? AND "key" LIKE ? ESCAPE '\'`, unixTime+int64(staleLife), host, "*."+host, queryKey)
_, err = this.writeDB.Exec(`UPDATE "`+this.itemsTableName+`" SET "expiredAt"=0, "staleAt"=? WHERE "host" GLOB ? AND "host" NOT GLOB ? AND "key" LIKE ? ESCAPE '\'`, unixTime+DefaultStaleCacheSeconds, host, "*."+host, queryKey)
if err != nil {
return err
}
_, err = this.writeDB.Exec(`UPDATE "`+this.itemsTableName+`" SET "expiredAt"=0, "staleAt"=? WHERE "host" GLOB ? AND "host" NOT GLOB ? AND "key" LIKE ? ESCAPE '\'`, unixTime+int64(staleLife), host, "*."+host, queryKey+SuffixAll+"%")
_, err = this.writeDB.Exec(`UPDATE "`+this.itemsTableName+`" SET "expiredAt"=0, "staleAt"=? WHERE "host" GLOB ? AND "host" NOT GLOB ? AND "key" LIKE ? ESCAPE '\'`, unixTime+DefaultStaleCacheSeconds, host, "*."+host, queryKey+SuffixAll+"%")
if err != nil {
return err
}
@@ -456,10 +417,9 @@ func (this *FileListDB) CleanMatchPrefix(prefix string) error {
queryPrefix += "%"
// TODO 检查大批量数据下的操作性能
var staleLife = 600 // TODO 需要可以设置
var unixTime = fasttime.Now().Unix() // 只删除当前的,不删除新的
_, err = this.writeDB.Exec(`UPDATE "`+this.itemsTableName+`" SET "expiredAt"=0, "staleAt"=? WHERE "host" GLOB ? AND "host" NOT GLOB ? AND "key" LIKE ? ESCAPE '\'`, unixTime+int64(staleLife), host, "*."+host, queryPrefix)
_, err = this.writeDB.Exec(`UPDATE "`+this.itemsTableName+`" SET "expiredAt"=0, "staleAt"=? WHERE "host" GLOB ? AND "host" NOT GLOB ? AND "key" LIKE ? ESCAPE '\'`, unixTime+DefaultStaleCacheSeconds, host, "*."+host, queryPrefix)
return err
}
@@ -543,6 +503,10 @@ func (this *FileListDB) WrapError(err error) error {
return fmt.Errorf("%w (file: %s)", err, this.dbPath)
}
func (this *FileListDB) HashMapIsLoaded() bool {
return this.hashMapIsLoaded
}
// 初始化
func (this *FileListDB) initTables(times int) error {
{
@@ -603,32 +567,9 @@ ALTER TABLE "cacheItems" ADD "accessWeek" varchar(6);
}
}
// 删除hits表
{
_, err := this.writeDB.Exec(`CREATE TABLE IF NOT EXISTS "` + this.hitsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
"week1Hits" integer DEFAULT 0,
"week2Hits" integer DEFAULT 0,
"week" varchar(6)
);
CREATE UNIQUE INDEX IF NOT EXISTS "hits_hash"
ON "` + this.hitsTableName + `" (
"hash" ASC
);
`)
if err != nil {
// 尝试删除重建
if times < 3 {
_, dropErr := this.writeDB.Exec(`DROP TABLE "` + this.hitsTableName + `"`)
if dropErr == nil {
return this.initTables(times + 1)
}
return this.WrapError(err)
}
return this.WrapError(err)
}
_, _ = this.writeDB.Exec(`DROP TABLE "hits"`)
}
return nil
@@ -678,3 +619,30 @@ func (this *FileListDB) deleteDB() {
_ = os.Remove(this.dbPath + "-shm")
_ = os.Remove(this.dbPath + "-wal")
}
// 加载Hash列表
func (this *FileListDB) loadHashMap() {
this.hashMapIsLoaded = false
err := this.hashMap.Load(this)
if err != nil {
remotelogs.Error("LIST_FILE_DB", "load hash map failed: "+err.Error()+"(file: "+this.dbPath+")")
// 自动修复错误
// TODO 将来希望能尽可能恢复以往数据库中的内容
if strings.Contains(err.Error(), "database is closed") || strings.Contains(err.Error(), "database disk image is malformed") {
_ = this.Close()
this.deleteDB()
remotelogs.Println("LIST_FILE_DB", "recreating the database (file:"+this.dbPath+") ...")
err = this.Open(this.dbPath)
if err != nil {
remotelogs.Error("LIST_FILE_DB", "recreate the database failed: "+err.Error()+" (file:"+this.dbPath+")")
} else {
_ = this.Init()
}
}
return
}
this.hashMapIsLoaded = true
}

View File

@@ -7,7 +7,6 @@ import (
"github.com/iwind/TeaGo/Tea"
_ "github.com/iwind/TeaGo/bootstrap"
"testing"
"time"
)
func TestFileListDB_ListLFUItems(t *testing.T) {
@@ -34,32 +33,6 @@ func TestFileListDB_ListLFUItems(t *testing.T) {
t.Log("[", len(hashList), "]", hashList)
}
func TestFileListDB_IncreaseHitAsync(t *testing.T) {
var db = caches.NewFileListDB()
defer func() {
_ = db.Close()
}()
err := db.Open(Tea.Root + "/data/cache-db-large.db")
if err != nil {
t.Fatal(err)
}
err = db.Init()
if err != nil {
t.Fatal(err)
}
err = db.IncreaseHitAsync("4598e5231ba47d6ec7aa9ea640ff2eaf")
if err != nil {
t.Fatal(err)
}
// wait transaction
time.Sleep(1 * time.Second)
}
func TestFileListDB_CleanMatchKey(t *testing.T) {
var db = caches.NewFileListDB()

View File

@@ -281,11 +281,6 @@ func TestFileList_PurgeLFU(t *testing.T) {
t.Fatal(err)
}
err = list.IncreaseHit(stringutil.Md5("123456"))
if err != nil {
t.Fatal(err)
}
var count = 0
err = list.PurgeLFU(caches.CountFileDB*2, func(hash string) error {
t.Log(hash)
@@ -356,41 +351,6 @@ func TestFileList_CleanAll(t *testing.T) {
t.Log(list.Count())
}
func TestFileList_IncreaseHit(t *testing.T) {
var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1")
defer func() {
_ = list.Close()
}()
err := list.Init()
if err != nil {
t.Fatal(err)
}
defer func() {
_ = list.Close()
}()
var before = time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
var count = 1_000_000
if !testutils.IsSingleTesting() {
count = 10
}
for i := 0; i < count; i++ {
err = list.IncreaseHit(stringutil.Md5("abc" + types.String(i)))
}
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}
func TestFileList_UpgradeV3(t *testing.T) {
var list = caches.NewFileList(Tea.Root + "/data/cache-index/p43").(*caches.FileList)

View File

@@ -2,7 +2,6 @@ package caches
import (
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/iwind/TeaGo/logs"
"net"
"net/url"
@@ -19,9 +18,6 @@ type MemoryList struct {
itemMaps map[string]map[string]*Item // prefix => { hash => item }
weekItemMaps map[int32]map[string]zero.Zero // week => { hash => Zero }
minWeek int32
prefixes []string
locker sync.RWMutex
onAdd func(item *Item)
@@ -32,9 +28,7 @@ type MemoryList struct {
func NewMemoryList() ListInterface {
return &MemoryList{
itemMaps: map[string]map[string]*Item{},
weekItemMaps: map[int32]map[string]zero.Zero{},
minWeek: currentWeek(),
itemMaps: map[string]map[string]*Item{},
}
}
@@ -56,7 +50,6 @@ func (this *MemoryList) Reset() error {
for key := range this.itemMaps {
this.itemMaps[key] = map[string]*Item{}
}
this.weekItemMaps = map[int32]map[string]zero.Zero{}
this.locker.Unlock()
atomic.StoreInt64(&this.count, 0)
@@ -65,10 +58,6 @@ func (this *MemoryList) Reset() error {
}
func (this *MemoryList) Add(hash string, item *Item) error {
if item.Week == 0 {
item.Week = currentWeek()
}
this.locker.Lock()
prefix := this.prefix(hash)
@@ -81,14 +70,6 @@ func (this *MemoryList) Add(hash string, item *Item) error {
// 先删除,为了可以正确触发统计
oldItem, ok := itemMap[hash]
if ok {
// 从week map中删除
if oldItem.Week > 0 {
wm, ok := this.weekItemMaps[oldItem.Week]
if ok {
delete(wm, hash)
}
}
// 回调
if this.onRemove != nil {
this.onRemove(oldItem)
@@ -104,14 +85,6 @@ func (this *MemoryList) Add(hash string, item *Item) error {
itemMap[hash] = item
// week map
wm, ok := this.weekItemMaps[item.Week]
if ok {
wm[hash] = zero.New()
} else {
this.weekItemMaps[item.Week] = map[string]zero.Zero{hash: zero.New()}
}
this.locker.Unlock()
return nil
}
@@ -242,14 +215,6 @@ func (this *MemoryList) Remove(hash string) error {
atomic.AddInt64(&this.count, -1)
delete(itemMap, hash)
// week map
if item.Week > 0 {
wm, ok := this.weekItemMaps[item.Week]
if ok {
delete(wm, hash)
}
}
}
this.locker.Unlock()
@@ -261,12 +226,12 @@ func (this *MemoryList) Remove(hash string) error {
// callback 每次发现过期key的调用
func (this *MemoryList) Purge(count int, callback func(hash string) error) (int, error) {
this.locker.Lock()
deletedHashList := []string{}
var deletedHashList = []string{}
if this.purgeIndex >= len(this.prefixes) {
this.purgeIndex = 0
}
prefix := this.prefixes[this.purgeIndex]
var prefix = this.prefixes[this.purgeIndex]
this.purgeIndex++
@@ -290,14 +255,6 @@ func (this *MemoryList) Purge(count int, callback func(hash string) error) (int,
delete(itemMap, hash)
deletedHashList = append(deletedHashList, hash)
// week map
if item.Week > 0 {
wm, ok := this.weekItemMaps[item.Week]
if ok {
delete(wm, hash)
}
}
countFound++
}
@@ -322,63 +279,48 @@ func (this *MemoryList) PurgeLFU(count int, callback func(hash string) error) er
return nil
}
var week = currentWeek()
if this.minWeek > week {
this.minWeek = week
}
var deletedHashList = []string{}
var week = currentWeek()
var round = 0
this.locker.Lock()
Loop:
for w := this.minWeek; w <= week; w++ {
this.minWeek = w
for {
var found = false
round++
for _, itemMap := range this.itemMaps {
for hash, item := range itemMap {
found = true
this.locker.Lock()
wm, ok := this.weekItemMaps[w]
if ok {
var wc = len(wm)
if wc == 0 {
delete(this.weekItemMaps, w)
} else {
if wc <= count {
delete(this.weekItemMaps, w)
if week-item.Week <= 1 /** 最近有在使用 **/ && round <= 3 /** 查找轮数过多还不满足数量要求的就不再限制 **/ {
continue
}
// TODO 未来支持按照点击量排序
for hash := range wm {
count--
if count < 0 {
this.locker.Unlock()
break Loop
}
delete(wm, hash)
itemMap, ok := this.itemMaps[this.prefix(hash)]
if !ok {
continue
}
item, ok := itemMap[hash]
if !ok {
continue
}
if this.onRemove != nil {
this.onRemove(item)
}
atomic.AddInt64(&this.count, -1)
delete(itemMap, hash)
deletedHashList = append(deletedHashList, hash)
if this.onRemove != nil {
this.onRemove(item)
}
atomic.AddInt64(&this.count, -1)
delete(itemMap, hash)
deletedHashList = append(deletedHashList, hash)
count--
if count <= 0 {
break Loop
}
break
}
} else {
delete(this.weekItemMaps, w)
}
this.locker.Unlock()
if !found {
break
}
}
this.locker.Unlock()
// 执行外部操作
for _, hash := range deletedHashList {
if callback != nil {
@@ -451,23 +393,7 @@ func (this *MemoryList) IncreaseHit(hash string) error {
item, ok := itemMap[hash]
if ok {
var week = currentWeek()
// 交换位置
if item.Week > 0 && item.Week != week {
wm, ok := this.weekItemMaps[item.Week]
if ok {
delete(wm, hash)
}
wm, ok = this.weekItemMaps[week]
if ok {
wm[hash] = zero.New()
} else {
this.weekItemMaps[week] = map[string]zero.Zero{hash: zero.New()}
}
}
item.IncreaseHit(week)
item.Week = currentWeek()
}
this.locker.Unlock()

View File

@@ -6,7 +6,9 @@ import (
"github.com/cespare/xxhash"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
"math/rand"
"sort"
"strconv"
"testing"
"time"
@@ -32,7 +34,6 @@ func TestMemoryList_Add(t *testing.T) {
})
t.Log(list.prefixes)
logs.PrintAsJSON(list.itemMaps, t)
logs.PrintAsJSON(list.weekItemMaps, t)
t.Log(list.Count())
}
@@ -51,7 +52,6 @@ func TestMemoryList_Remove(t *testing.T) {
})
_ = list.Remove("b")
list.print(t)
logs.PrintAsJSON(list.weekItemMaps, t)
t.Log(list.Count())
}
@@ -83,7 +83,6 @@ func TestMemoryList_Purge(t *testing.T) {
return nil
})
list.print(t)
logs.PrintAsJSON(list.weekItemMaps, t)
for i := 0; i < 1000; i++ {
_, _ = list.Purge(100, func(hash string) error {
@@ -172,27 +171,64 @@ func TestMemoryList_CleanPrefix(t *testing.T) {
t.Log(time.Since(before).Seconds()*1000, "ms")
}
func TestMapRandomDelete(t *testing.T) {
var countMap = map[int]int{} // k => count
for j := 0; j < 1_000_000; j++ {
var m = map[int]bool{}
for i := 0; i < 100; i++ {
m[i] = true
}
var count = 0
for k := range m {
delete(m, k)
count++
if count >= 10 {
break
}
}
for k := range m {
countMap[k]++
}
}
var counts = []int{}
for _, count := range countMap {
counts = append(counts, count)
}
sort.Ints(counts)
t.Log("["+types.String(len(counts))+"]", counts)
}
func TestMemoryList_PurgeLFU(t *testing.T) {
var list = NewMemoryList().(*MemoryList)
list.minWeek = 2704
var before = time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
t.Log("current week:", currentWeek())
_ = list.Add("1", &Item{})
_ = list.Add("2", &Item{})
_ = list.Add("3", &Item{})
_ = list.Add("4", &Item{})
_ = list.Add("5", &Item{})
_ = list.Add("6", &Item{Week: 2704})
_ = list.Add("7", &Item{Week: 2704})
_ = list.Add("8", &Item{Week: 2705})
err := list.PurgeLFU(2, func(hash string) error {
//_ = list.IncreaseHit("1")
//_ = list.IncreaseHit("2")
//_ = list.IncreaseHit("3")
//_ = list.IncreaseHit("4")
//_ = list.IncreaseHit("5")
count, err := list.Count()
if err != nil {
t.Fatal(err)
}
t.Log("count items before purge:", count)
err = list.PurgeLFU(5, func(hash string) error {
t.Log("purge lfu:", hash)
return nil
})
@@ -201,40 +237,18 @@ func TestMemoryList_PurgeLFU(t *testing.T) {
}
t.Log("ok")
logs.PrintAsJSON(list.weekItemMaps, t)
t.Log(list.Count())
}
func TestMemoryList_IncreaseHit(t *testing.T) {
var list = NewMemoryList().(*MemoryList)
var item = &Item{}
item.Week = 2705
item.Week2Hits = 100
_ = list.Add("a", &Item{})
_ = list.Add("a", item)
t.Log("hits1:", item.Week1Hits, "hits2:", item.Week2Hits, "week:", item.Week)
logs.PrintAsJSON(list.weekItemMaps, t)
_ = list.IncreaseHit("a")
t.Log("hits1:", item.Week1Hits, "hits2:", item.Week2Hits, "week:", item.Week)
logs.PrintAsJSON(list.weekItemMaps, t)
_ = list.IncreaseHit("a")
t.Log("hits1:", item.Week1Hits, "hits2:", item.Week2Hits, "week:", item.Week)
logs.PrintAsJSON(list.weekItemMaps, t)
count, err = list.Count()
if err != nil {
t.Fatal(err)
}
t.Log("count items left:", count)
}
func TestMemoryList_CleanAll(t *testing.T) {
var list = NewMemoryList().(*MemoryList)
var item = &Item{}
item.Week = 2705
item.Week2Hits = 100
_ = list.Add("a", &Item{})
_ = list.CleanAll()
logs.PrintAsJSON(list.itemMaps, t)
logs.PrintAsJSON(list.weekItemMaps, t)
t.Log(list.Count())
}

View File

@@ -256,3 +256,19 @@ func (this *Manager) FindAllStorages() []StorageInterface {
}
return storages
}
// ScanGarbageCaches 清理目录中“失联”的缓存文件
func (this *Manager) ScanGarbageCaches(callback func(path string) error) error {
var storages = this.FindAllStorages()
for _, storage := range storages {
fileStorage, ok := storage.(*FileStorage)
if !ok {
continue
}
err := fileStorage.ScanGarbageCaches(callback)
if err != nil {
return err
}
}
return nil
}

View File

@@ -20,7 +20,7 @@ type PartialFileReader struct {
func NewPartialFileReader(fp *os.File) *PartialFileReader {
return &PartialFileReader{
FileReader: NewFileReader(fp),
rangePath: partialRangesFilePath(fp.Name()),
rangePath: PartialRangesFilePath(fp.Name()),
}
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
setutils "github.com/TeaOSLab/EdgeNode/internal/utils/sets"
"github.com/TeaOSLab/EdgeNode/internal/utils/sizes"
@@ -58,6 +59,8 @@ const (
FileToMemoryMaxSize = 32 * sizes.M // 可以从文件写入到内存的最大文件尺寸
FileTmpSuffix = ".tmp"
DefaultMinDiskFreeSpace uint64 = 5 << 30 // 当前磁盘最小剩余空间
DefaultStaleCacheSeconds = 1200 // 过时缓存留存时间
HashKeyLength = 32
)
var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool
@@ -890,7 +893,7 @@ func (this *FileStorage) Stop() {
// TotalDiskSize 消耗的磁盘尺寸
func (this *FileStorage) TotalDiskSize() int64 {
stat, err := fsutils.StatCache(this.options.Dir)
stat, err := fsutils.StatDeviceCache(this.options.Dir)
if err == nil {
return int64(stat.UsedSize())
}
@@ -930,7 +933,7 @@ func (this *FileStorage) keyPath(key string) (hash string, path string, diskIsFu
// 获取Hash对应的文件路径
func (this *FileStorage) hashPath(hash string) (path string, diskIsFull bool) {
if len(hash) != 32 {
if len(hash) != HashKeyLength {
return "", false
}
var dir string
@@ -994,28 +997,23 @@ func (this *FileStorage) initList() error {
// 清理任务
// TODO purge每个分区
func (this *FileStorage) purgeLoop() {
// 检查磁盘剩余空间
this.checkDiskSpace()
// 计算是否应该开启LFU清理
var capacityBytes = this.diskCapacityBytes()
var startLFU = false
var lfuFreePercent = this.policy.PersistenceLFUFreePercent
if lfuFreePercent <= 0 {
lfuFreePercent = 5
}
var hasFullDisk = this.mainDiskIsFull
if !hasFullDisk {
var subDirs = this.subDirs // copy slice
for _, subDir := range subDirs {
if subDir.IsFull {
hasFullDisk = true
break
// 2TB级别以上
if capacityBytes>>30 > 2000 {
lfuFreePercent = 100 /** GB **/ / float32(capacityBytes>>30) * 100 /** % **/
if lfuFreePercent > 3 {
lfuFreePercent = 3
}
}
}
var hasFullDisk = this.hasFullDisk()
if hasFullDisk {
startLFU = true
} else {
@@ -1079,13 +1077,32 @@ func (this *FileStorage) purgeLoop() {
// 磁盘空间不足时,清除老旧的缓存
if startLFU {
var maxCount = 2000
var maxLoops = 5
if fsutils.DiskIsFast() {
maxCount = 5000
} else if fsutils.DiskIsExtremelyFast() {
maxCount = 10000
}
var total, _ = this.list.Count()
if total > 0 {
var count = types.Int(math.Ceil(float64(total) * float64(lfuFreePercent*2) / 100))
if count > 0 {
for {
maxLoops--
if maxLoops <= 0 {
break
}
// 开始清理
var count = types.Int(math.Ceil(float64(total) * float64(lfuFreePercent*2) / 100))
if count <= 0 {
break
}
// 限制单次清理的条数,防止占用太多系统资源
if count > 2000 {
count = 2000
if count > maxCount {
count = maxCount
}
remotelogs.Println("CACHE", "LFU purge policy '"+this.policy.Name+"' id: "+types.String(this.policy.Id)+", count: "+types.String(count))
@@ -1101,6 +1118,11 @@ func (this *FileStorage) purgeLoop() {
if err != nil {
remotelogs.Warn("CACHE", "purge file storage in LFU failed: "+err.Error())
}
// 检查硬盘空间状态
if !this.hasFullDisk() {
break
}
}
}
}
@@ -1227,8 +1249,9 @@ func (this *FileStorage) hotLoop() {
func (this *FileStorage) diskCapacityBytes() int64 {
var c1 = this.policy.CapacityBytes()
if SharedManager.MaxDiskCapacity != nil {
var c2 = SharedManager.MaxDiskCapacity.Bytes()
var nodeCapacity = SharedManager.MaxDiskCapacity // copy
if nodeCapacity != nil {
var c2 = nodeCapacity.Bytes()
if c2 > 0 {
return c2
}
@@ -1335,7 +1358,7 @@ func (this *FileStorage) removeCacheFile(path string) error {
err = nil
// 删除Partial相关
var partialPath = partialRangesFilePath(path)
var partialPath = PartialRangesFilePath(path)
if openFileCache != nil {
openFileCache.Close(partialPath)
}
@@ -1418,7 +1441,7 @@ func (this *FileStorage) initOpenFileCache() {
}
func (this *FileStorage) runMemoryStorageSafety(f func(memoryStorage *MemoryStorage)) {
var memoryStorage = this.memoryStorage
var memoryStorage = this.memoryStorage // copy
if memoryStorage != nil {
f(memoryStorage)
}
@@ -1434,20 +1457,55 @@ func (this *FileStorage) checkDiskSpace() {
}
if options != nil && len(options.Dir) > 0 {
stat, err := fsutils.Stat(options.Dir)
stat, err := fsutils.StatDevice(options.Dir)
if err == nil {
this.mainDiskIsFull = stat.FreeSize() < minFreeSize
// check capacity (only on main directory) when node capacity had not been set
if !this.mainDiskIsFull {
var capacityBytes int64
var maxDiskCapacity = SharedManager.MaxDiskCapacity // copy
if maxDiskCapacity != nil && maxDiskCapacity.Bytes() > 0 {
capacityBytes = SharedManager.MaxDiskCapacity.Bytes()
} else {
var policy = this.policy // copy
if policy != nil {
capacityBytes = policy.CapacityBytes() // copy
}
}
if capacityBytes > 0 && stat.UsedSize() >= uint64(capacityBytes) {
this.mainDiskIsFull = true
}
}
}
}
var subDirs = this.subDirs // copy slice
for _, subDir := range subDirs {
stat, err := fsutils.Stat(subDir.Path)
stat, err := fsutils.StatDevice(subDir.Path)
if err == nil {
subDir.IsFull = stat.FreeSize() < minFreeSize
}
}
}
// 检查是否有已满的磁盘分区
func (this *FileStorage) hasFullDisk() bool {
this.checkDiskSpace()
var hasFullDisk = this.mainDiskIsFull
if !hasFullDisk {
var subDirs = this.subDirs // copy slice
for _, subDir := range subDirs {
if subDir.IsFull {
hasFullDisk = true
break
}
}
}
return hasFullDisk
}
// 获取目录
func (this *FileStorage) subDir(hash string) (dirPath string, dirIsFull bool) {
var suffix = "/p" + types.String(this.policy.Id) + "/" + hash[:2] + "/" + hash[2:4]
@@ -1477,6 +1535,89 @@ func (this *FileStorage) subDir(hash string) (dirPath string, dirIsFull bool) {
return subDir.Path + suffix, subDir.IsFull
}
// ScanGarbageCaches 清理目录中“失联”的缓存文件
// “失联”为不在HashMap中的文件
func (this *FileStorage) ScanGarbageCaches(fileCallback func(path string) error) error {
if !this.list.(*FileList).HashMapIsLoaded() {
return errors.New("cache list is loading")
}
var mainDir = this.options.Dir
var allDirs = []string{mainDir}
var subDirs = this.subDirs // copy
for _, subDir := range subDirs {
allDirs = append(allDirs, subDir.Path)
}
for _, subDir := range allDirs {
var dir0 = subDir + "/p" + types.String(this.policy.Id)
dir1Matches, err := filepath.Glob(dir0 + "/*")
if err != nil {
// ignore error
continue
}
for _, dir1 := range dir1Matches {
if len(filepath.Base(dir1)) != 2 {
continue
}
dir2Matches, err := filepath.Glob(dir1 + "/*")
if err != nil {
// ignore error
continue
}
for _, dir2 := range dir2Matches {
if len(filepath.Base(dir2)) != 2 {
continue
}
fileMatches, err := filepath.Glob(dir2 + "/*.cache")
if err != nil {
// ignore error
continue
}
for _, file := range fileMatches {
var filename = filepath.Base(file)
var hash = strings.TrimSuffix(filename, ".cache")
if len(hash) != HashKeyLength {
continue
}
isReady, found := this.list.(*FileList).ExistQuick(hash)
if !isReady {
continue
}
if found {
continue
}
// 检查文件正在被写入
stat, err := os.Stat(file)
if err != nil {
continue
}
if fasttime.Now().Unix()-stat.ModTime().Unix() < 300 /** 5 minutes **/ {
continue
}
if fileCallback != nil {
err = fileCallback(file)
if err != nil {
return err
}
}
}
}
}
}
return nil
}
// 计算字节数字代号
func (this *FileStorage) charCode(r byte) uint8 {
if r >= '0' && r <= '9' {
return r - '0'

View File

@@ -5,6 +5,7 @@ import (
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
"github.com/iwind/TeaGo/Tea"
_ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/logs"
@@ -577,6 +578,31 @@ func TestFileStorage_RemoveCacheFile(t *testing.T) {
t.Log(storage.removeCacheFile("/Users/WorkSpace/EdgeProject/EdgeCache/p43/15/7e/157eba0dfc6dfb6fbbf20b1f9e584674.cache"))
}
func TestFileStorage_ScanGarbageCaches(t *testing.T) {
if !testutils.IsSingleTesting() {
return
}
var storage = NewFileStorage(&serverconfigs.HTTPCachePolicy{
Id: 43,
Options: map[string]any{"dir": "/Users/WorkSpace/EdgeProject/EdgeCache"},
})
err := storage.Init()
if err != nil {
t.Fatal(err)
}
err = storage.ScanGarbageCaches(func(path string) {
t.Log(path)
}, func(path string) error {
t.Log(path, PartialRangesFilePath(path))
return nil
})
if err != nil {
t.Fatal(err)
}
}
func BenchmarkFileStorage_Read(b *testing.B) {
runtime.GOMAXPROCS(1)

View File

@@ -12,7 +12,6 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils/sizes"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/cespare/xxhash"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
"github.com/shirou/gopsutil/v3/load"
"math"
@@ -137,17 +136,6 @@ func (this *MemoryStorage) OpenReader(key string, useStale bool, isPartial bool)
}
this.locker.RUnlock()
// 增加点击量
// 1/1000采样
// TODO 考虑是否在缓存策略里设置
if rands.Int(0, 1000) == 0 {
var hitErr = this.list.IncreaseHit(types.String(hash))
if hitErr != nil {
// 此错误可以忽略
remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error())
}
}
return reader, nil
}
this.locker.RUnlock()

View File

@@ -4,8 +4,8 @@ package caches
import "strings"
// 获取 ranges 文件路径
func partialRangesFilePath(path string) string {
// PartialRangesFilePath 获取 ranges 文件路径
func PartialRangesFilePath(path string) string {
// ranges路径
var dotIndex = strings.LastIndex(path, ".")
var rangePath string

View File

@@ -43,7 +43,7 @@ func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, metaH
isPartial: isPartial,
bodyOffset: bodyOffset,
ranges: ranges,
rangePath: partialRangesFilePath(rawWriter.Name()),
rangePath: PartialRangesFilePath(rawWriter.Name()),
metaHeaderSize: metaHeaderSize,
metaBodySize: metaBodySize,
}

View File

@@ -67,6 +67,12 @@ func LoadAPIConfig() (*APIConfig, error) {
return nil, errors.New("init error: " + err.Error())
}
// 自动生成新的配置文件
if filename == oldConfigFileName {
config.OldRPC.Endpoints = nil
_ = config.WriteFile(Tea.ConfigFile(ConfigFileName))
}
return config, nil
}
return nil, errors.New("no config file '" + ConfigFileName + "' found")

View File

@@ -1,7 +1,7 @@
package teaconst
const (
Version = "1.2.7"
Version = "1.2.9"
ProductName = "Edge Node"
ProcessName = "edge-node"

View File

@@ -4,6 +4,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/expires"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
"github.com/iwind/TeaGo/logs"
"sort"
"sync"
)
@@ -14,6 +15,8 @@ var GlobalWhiteIPList = NewIPList()
// IPList IP名单
// TODO IP名单可以分片关闭这样让每一片的数据量减少查询更快
type IPList struct {
isDeleted bool
itemsMap map[uint64]*IPItem // id => item
sortedItems []*IPItem
allItemsMap map[uint64]*IPItem // id => item
@@ -38,11 +41,19 @@ func NewIPList() *IPList {
}
func (this *IPList) Add(item *IPItem) {
if this.isDeleted {
return
}
this.addItem(item, true)
}
// AddDelay 延迟添加需要手工调用Sort()函数
func (this *IPList) AddDelay(item *IPItem) {
if this.isDeleted {
return
}
this.addItem(item, false)
}
@@ -60,6 +71,10 @@ func (this *IPList) Delete(itemId uint64) {
// Contains 判断是否包含某个IP
func (this *IPList) Contains(ip uint64) bool {
if this.isDeleted {
return false
}
this.locker.RLock()
if len(this.allItemsMap) > 0 {
this.locker.RUnlock()
@@ -75,6 +90,10 @@ func (this *IPList) Contains(ip uint64) bool {
// ContainsExpires 判断是否包含某个IP
func (this *IPList) ContainsExpires(ip uint64) (expiresAt int64, ok bool) {
if this.isDeleted {
return
}
this.locker.RLock()
if len(this.allItemsMap) > 0 {
this.locker.RUnlock()
@@ -94,6 +113,10 @@ func (this *IPList) ContainsExpires(ip uint64) (expiresAt int64, ok bool) {
// ContainsIPStrings 是否包含一组IP中的任意一个并返回匹配的第一个Item
func (this *IPList) ContainsIPStrings(ipStrings []string) (item *IPItem, found bool) {
if this.isDeleted {
return
}
if len(ipStrings) == 0 {
return
}
@@ -125,6 +148,11 @@ func (this *IPList) ContainsIPStrings(ipStrings []string) (item *IPItem, found b
return
}
func (this *IPList) SetDeleted() {
this.isDeleted = true
logs.Println("set deleted:", this.isDeleted) // TODO
}
func (this *IPList) addItem(item *IPItem, sortable bool) {
if item == nil {
return

View File

@@ -22,10 +22,10 @@ func TestIPListDB_AddItem(t *testing.T) {
IpFrom: "192.168.1.101",
IpTo: "",
Version: 1024,
ExpiredAt: time.Now().Unix(),
ExpiredAt: time.Now().Unix() + 3600,
Reason: "",
ListId: 2,
IsDeleted: true,
IsDeleted: false,
Type: "ipv4",
EventLevel: "error",
ListType: "black",

View File

@@ -229,6 +229,11 @@ func (this *IPListManager) DeleteExpiredItems() {
func (this *IPListManager) processItems(items []*pb.IPItem, fromRemote bool) {
var changedLists = map[*IPList]zero.Zero{}
for _, item := range items {
// 调试
if Tea.IsTesting() {
this.debugItem(item)
}
var list *IPList
// TODO 实现节点专有List
if item.ServerId > 0 { // 服务专有List
@@ -300,3 +305,12 @@ func (this *IPListManager) processItems(items []*pb.IPItem, fromRemote bool) {
}
}
}
// 调试IP信息
func (this *IPListManager) debugItem(item *pb.IPItem) {
if item.IsDeleted {
remotelogs.Debug("IP_ITEM_DEBUG", "delete '"+item.IpFrom+"'")
} else {
remotelogs.Debug("IP_ITEM_DEBUG", "add '"+item.IpFrom+"'")
}
}

View File

@@ -26,9 +26,9 @@ func init() {
type Manager struct {
isQuiting bool
tasks map[int64]*Task // itemId => *Task
categoryTasks map[string][]*Task // category => []*Task
locker sync.RWMutex
taskMap map[int64]*Task // itemId => *Task
categoryTaskMap map[string][]*Task // category => []*Task
locker sync.RWMutex
hasHTTPMetrics bool
hasTCPMetrics bool
@@ -37,8 +37,8 @@ type Manager struct {
func NewManager() *Manager {
return &Manager{
tasks: map[int64]*Task{},
categoryTasks: map[string][]*Task{},
taskMap: map[int64]*Task{},
categoryTaskMap: map[string][]*Task{},
}
}
@@ -56,7 +56,7 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) {
}
// 停用以前的 或 修改现在的
for itemId, task := range this.tasks {
for itemId, task := range this.taskMap {
newItem, ok := newMap[itemId]
if !ok || !newItem.IsOn { // 停用以前的
remotelogs.Println("METRIC_MANAGER", "stop task '"+strconv.FormatInt(itemId, 10)+"'")
@@ -64,7 +64,7 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) {
if err != nil {
remotelogs.Error("METRIC_MANAGER", "stop task '"+strconv.FormatInt(itemId, 10)+"' failed: "+err.Error())
}
delete(this.tasks, itemId)
delete(this.taskMap, itemId)
} else { // 更新已存在的
if newItem.Version != task.item.Version {
remotelogs.Println("METRIC_MANAGER", "update task '"+strconv.FormatInt(itemId, 10)+"'")
@@ -78,7 +78,7 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) {
if !newItem.IsOn {
continue
}
_, ok := this.tasks[newItem.Id]
_, ok := this.taskMap[newItem.Id]
if !ok {
remotelogs.Println("METRIC_MANAGER", "start task '"+strconv.FormatInt(newItem.Id, 10)+"'")
task := NewTask(newItem)
@@ -92,7 +92,7 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) {
remotelogs.Error("METRIC_MANAGER", "start task failed: "+err.Error())
continue
}
this.tasks[newItem.Id] = task
this.taskMap[newItem.Id] = task
}
}
@@ -100,11 +100,11 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) {
this.hasHTTPMetrics = false
this.hasTCPMetrics = false
this.hasUDPMetrics = false
this.categoryTasks = map[string][]*Task{}
for _, task := range this.tasks {
tasks := this.categoryTasks[task.item.Category]
this.categoryTaskMap = map[string][]*Task{}
for _, task := range this.taskMap {
var tasks = this.categoryTaskMap[task.item.Category]
tasks = append(tasks, task)
this.categoryTasks[task.item.Category] = tasks
this.categoryTaskMap[task.item.Category] = tasks
switch task.item.Category {
case serverconfigs.MetricItemCategoryHTTP:
@@ -124,10 +124,12 @@ func (this *Manager) Add(obj MetricInterface) {
}
this.locker.RLock()
for _, task := range this.categoryTasks[obj.MetricCategory()] {
var tasks = this.categoryTaskMap[obj.MetricCategory()]
this.locker.RUnlock()
for _, task := range tasks {
task.Add(obj)
}
this.locker.RUnlock()
}
func (this *Manager) HasHTTPMetrics() bool {
@@ -149,9 +151,9 @@ func (this *Manager) Quit() {
remotelogs.Println("METRIC_MANAGER", "quit")
this.locker.Lock()
for _, task := range this.tasks {
for _, task := range this.taskMap {
_ = task.Stop()
}
this.tasks = map[int64]*Task{}
this.taskMap = map[int64]*Task{}
this.locker.Unlock()
}

View File

@@ -11,7 +11,7 @@ func TestNewManager(t *testing.T) {
var manager = NewManager()
{
manager.Update([]*serverconfigs.MetricItemConfig{})
for _, task := range manager.tasks {
for _, task := range manager.taskMap {
t.Log(task.item.Id)
}
}
@@ -28,7 +28,7 @@ func TestNewManager(t *testing.T) {
Id: 3,
},
})
for _, task := range manager.tasks {
for _, task := range manager.taskMap {
t.Log("task:", task.item.Id)
}
}
@@ -43,7 +43,7 @@ func TestNewManager(t *testing.T) {
Id: 2,
},
})
for _, task := range manager.tasks {
for _, task := range manager.taskMap {
t.Log("task:", task.item.Id)
}
}
@@ -56,7 +56,7 @@ func TestNewManager(t *testing.T) {
Version: 1,
},
})
for _, task := range manager.tasks {
for _, task := range manager.taskMap {
t.Log("task:", task.item.Id)
}
}

View File

@@ -17,6 +17,6 @@ type Stat struct {
}
func SumStat(serverId int64, keys []string, time string, version int32, itemId int64) string {
keysData := strings.Join(keys, "$EDGE$")
var keysData = strings.Join(keys, "$EDGE$")
return strconv.FormatUint(fnv.HashString(strconv.FormatInt(serverId, 10)+"@"+keysData+"@"+time+"@"+strconv.Itoa(int(version))+"@"+strconv.FormatInt(itemId, 10)), 10)
}

View File

@@ -0,0 +1,20 @@
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package metrics_test
import (
"github.com/TeaOSLab/EdgeNode/internal/metrics"
timeutil "github.com/iwind/TeaGo/utils/time"
"runtime"
"testing"
)
func BenchmarkSumStat(b *testing.B) {
runtime.GOMAXPROCS(2)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
metrics.SumStat(1, []string{"1.2.3.4"}, timeutil.Format("Ymd"), 1, 1)
}
})
}

View File

@@ -19,6 +19,7 @@ import (
"os"
"strconv"
"sync"
"sync/atomic"
"time"
)
@@ -59,7 +60,7 @@ type Task struct {
serverIdMapLocker sync.Mutex
statsMap map[string]*Stat // 待写入队列hash => *Stat
statsLocker sync.Mutex
statsLocker sync.RWMutex
statsTicker *utils.Ticker
}
@@ -237,7 +238,7 @@ func (this *Task) Add(obj MetricInterface) {
var keys = []string{}
for _, key := range this.item.Keys {
k := obj.MetricKey(key)
var k = obj.MetricKey(key)
// 忽略499状态
if key == "${status}" && k == "499" {
@@ -253,14 +254,19 @@ func (this *Task) Add(obj MetricInterface) {
}
var hash = SumStat(obj.MetricServerId(), keys, this.item.CurrentTime(), this.item.Version, this.item.Id)
this.statsLocker.Lock()
var countItems int
this.statsLocker.RLock()
oldStat, ok := this.statsMap[hash]
if !ok {
countItems = len(this.statsMap)
}
this.statsLocker.RUnlock()
if ok {
oldStat.Value += v
oldStat.Hash = hash
atomic.AddInt64(&oldStat.Value, 1)
} else {
// 防止过载
if len(this.statsMap) < MaxQueueSize {
if countItems < MaxQueueSize {
this.statsLocker.Lock()
this.statsMap[hash] = &Stat{
ServerId: obj.MetricServerId(),
Keys: keys,
@@ -268,9 +274,9 @@ func (this *Task) Add(obj MetricInterface) {
Time: this.item.CurrentTime(),
Hash: hash,
}
this.statsLocker.Unlock()
}
}
this.statsLocker.Unlock()
}
// Stop 停止任务

View File

@@ -4,11 +4,15 @@ package metrics_test
import (
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/metrics"
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
_ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/rands"
"log"
"runtime"
"sync"
"testing"
"time"
)
@@ -213,3 +217,65 @@ func TestTask_Upload(t *testing.T) {
}
t.Log("ok")
}
var testingTask *metrics.Task
var testingTaskInitOnce = &sync.Once{}
func initTestingTask() {
testingTask = metrics.NewTask(&serverconfigs.MetricItemConfig{
Id: 1,
IsOn: false,
Category: "tcp",
Period: 1,
PeriodUnit: serverconfigs.MetricItemPeriodUnitDay,
Keys: []string{"${remoteAddr}"},
Value: "${countRequest}",
})
err := testingTask.Init()
if err != nil {
log.Fatal(err)
}
err = testingTask.Start()
if err != nil {
log.Fatal(err)
}
}
func BenchmarkTask_Add(b *testing.B) {
runtime.GOMAXPROCS(1)
testingTaskInitOnce.Do(func() {
initTestingTask()
})
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
testingTask.Add(&taskRequest{})
}
})
}
type taskRequest struct {
}
func (this *taskRequest) MetricKey(key string) string {
return configutils.ParseVariables(key, func(varName string) (value string) {
return "1.2.3.4"
})
}
func (this *taskRequest) MetricValue(value string) (result int64, ok bool) {
return 1, true
}
func (this *taskRequest) MetricServerId() int64 {
return 1
}
func (this *taskRequest) MetricCategory() string {
return "http"
}

View File

@@ -198,9 +198,9 @@ func (this *ClientConn) Write(b []byte) (n int, err error) {
var cost = time.Since(before).Seconds()
if cost > 1 {
stats.SharedBandwidthStatManager.AddBandwidth(this.userId, this.serverId, int64(float64(n)/cost), int64(n))
stats.SharedBandwidthStatManager.AddBandwidth(this.userId, this.userPlanId, this.serverId, int64(float64(n)/cost), int64(n))
} else {
stats.SharedBandwidthStatManager.AddBandwidth(this.userId, this.serverId, int64(n), int64(n))
stats.SharedBandwidthStatManager.AddBandwidth(this.userId, this.userPlanId, this.serverId, int64(n), int64(n))
}
}
}

View File

@@ -16,6 +16,7 @@ type BaseClientConn struct {
isBound bool
userId int64
userPlanId int64
serverId int64
remoteAddr string
hasLimit bool
@@ -106,11 +107,31 @@ func (this *BaseClientConn) SetUserId(userId int64) {
}
}
func (this *BaseClientConn) SetUserPlanId(userPlanId int64) {
this.userPlanId = userPlanId
// 设置包装前连接
switch conn := this.rawConn.(type) {
case *tls.Conn:
nativeConn, ok := conn.NetConn().(ClientConnInterface)
if ok {
nativeConn.SetUserPlanId(userPlanId)
}
case *ClientConn:
conn.SetUserPlanId(userPlanId)
}
}
// UserId 获取当前连接所属服务的用户ID
func (this *BaseClientConn) UserId() int64 {
return this.userId
}
// UserPlanId 用户套餐ID
func (this *BaseClientConn) UserPlanId() int64 {
return this.userPlanId
}
// RawIP 原本IP
func (this *BaseClientConn) RawIP() string {
if len(this.rawIP) > 0 {

View File

@@ -18,9 +18,12 @@ type ClientConnInterface interface {
// SetServerId 设置服务ID
SetServerId(serverId int64) (goNext bool)
// SetUserId 设置所属服务的用户ID
// SetUserId 设置所属网站的用户ID
SetUserId(userId int64)
// SetUserPlanId 设置
SetUserPlanId(userPlanId int64)
// UserId 获取当前连接所属服务的用户ID
UserId() int64

View File

@@ -38,15 +38,16 @@ type HTTPRequest struct {
requestId string
// 外部参数
RawReq *http.Request
RawWriter http.ResponseWriter
ReqServer *serverconfigs.ServerConfig
ReqHost string // 请求的Host
ServerName string // 实际匹配到的Host
ServerAddr string // 实际启动的服务器监听地址
IsHTTP bool
IsHTTPS bool
IsHTTP3 bool
RawReq *http.Request
RawWriter http.ResponseWriter
ReqServer *serverconfigs.ServerConfig
ReqHost string // 请求的Host
ServerName string // 实际匹配到的Host
ServerAddr string // 实际启动的服务器监听地址
IsHTTP bool
IsHTTPS bool
IsHTTP3 bool
isHealthCheck bool
// 共享参数
nodeConfig *nodeconfigs.NodeConfig
@@ -168,10 +169,9 @@ func (this *HTTPRequest) Do() {
}
// 处理健康检查
var isHealthCheck = false
var healthCheckKey = this.RawReq.Header.Get(serverconfigs.HealthCheckHeaderName)
if len(healthCheckKey) > 0 {
if this.doHealthCheck(healthCheckKey, &isHealthCheck) {
if this.doHealthCheck(healthCheckKey, &this.isHealthCheck) {
this.doEnd()
return
}
@@ -198,14 +198,14 @@ func (this *HTTPRequest) Do() {
}
// 流量限制
if this.ReqServer.TrafficLimit != nil && this.ReqServer.TrafficLimit.IsOn && !this.ReqServer.TrafficLimit.IsEmpty() && this.ReqServer.TrafficLimitStatus != nil && this.ReqServer.TrafficLimitStatus.IsValid() {
if this.ReqServer.TrafficLimitStatus != nil && this.ReqServer.TrafficLimitStatus.IsValid() {
this.doTrafficLimit()
this.doEnd()
return
}
// UAM
if !isHealthCheck {
if !this.isHealthCheck {
if this.web.UAM != nil {
if this.web.UAM.IsOn {
if this.doUAM() {
@@ -223,7 +223,7 @@ func (this *HTTPRequest) Do() {
}
// CC
if !isHealthCheck {
if !this.isHealthCheck {
if this.web.CC != nil {
if this.web.CC.IsOn {
if this.doCC() {
@@ -388,7 +388,7 @@ func (this *HTTPRequest) doEnd() {
// 流量统计
// TODO 增加是否开启开关
if this.ReqServer != nil && this.ReqServer.Id > 0 {
if this.ReqServer != nil && this.ReqServer.Id > 0 && !this.isHealthCheck /** 健康检查时不统计 **/ {
var totalBytes int64 = 0
var requestConn = this.RawReq.Context().Value(HTTPConnContextKey)
@@ -854,6 +854,24 @@ func (this *HTTPRequest) Format(source string) string {
return this.requestHeadersString()
case "serverName":
return this.ServerName
case "serverAddr":
var nodeConfig = this.nodeConfig
if nodeConfig != nil && nodeConfig.GlobalServerConfig != nil && nodeConfig.GlobalServerConfig.HTTPAll.EnableServerAddrVariable {
if len(this.requestRemoteAddrs()) > 1 {
return "" // hidden for security
}
var requestConn = this.RawReq.Context().Value(HTTPConnContextKey)
if requestConn != nil {
conn, ok := requestConn.(net.Conn)
if ok {
host, _, _ := net.SplitHostPort(conn.LocalAddr().String())
if len(host) > 0 {
return host
}
}
}
}
return ""
case "serverPort":
return strconv.Itoa(this.requestServerPort())
case "hostname":
@@ -1153,10 +1171,32 @@ func (this *HTTPRequest) requestRemoteAddr(supportVar bool) string {
this.web.RemoteAddr != nil &&
this.web.RemoteAddr.IsOn &&
!this.web.RemoteAddr.IsEmpty() {
var remoteAddr = this.Format(this.web.RemoteAddr.Value)
if net.ParseIP(remoteAddr) != nil {
this.remoteAddr = remoteAddr
return remoteAddr
if this.web.RemoteAddr.HasValues() { // multiple values
for _, value := range this.web.RemoteAddr.Values() {
var remoteAddr = this.Format(value)
if len(remoteAddr) > 0 && net.ParseIP(remoteAddr) != nil {
this.remoteAddr = remoteAddr
return remoteAddr
}
}
} else { // single value
var remoteAddr = this.Format(this.web.RemoteAddr.Value)
if len(remoteAddr) > 0 && net.ParseIP(remoteAddr) != nil {
this.remoteAddr = remoteAddr
return remoteAddr
}
}
// 如果是从Header中读取则直接返回原始IP
if this.web.RemoteAddr.Type == serverconfigs.HTTPRemoteAddrTypeRequestHeader {
var remoteAddr = this.RawReq.RemoteAddr
host, _, err := net.SplitHostPort(remoteAddr)
if err == nil {
this.remoteAddr = host
return host
} else {
return remoteAddr
}
}
}
@@ -1222,7 +1262,7 @@ func (this *HTTPRequest) requestRemoteAddrs() (result []string) {
var forwardedFor = this.RawReq.Header.Get("X-Forwarded-For")
if len(forwardedFor) > 0 {
commaIndex := strings.Index(forwardedFor, ",")
if commaIndex > 0 {
if commaIndex > 0 && !lists.ContainsString(result, forwardedFor[:commaIndex]) {
result = append(result, forwardedFor[:commaIndex])
}
}
@@ -1230,7 +1270,7 @@ func (this *HTTPRequest) requestRemoteAddrs() (result []string) {
// Real-IP
{
realIP, ok := this.RawReq.Header["X-Real-IP"]
if ok && len(realIP) > 0 {
if ok && len(realIP) > 0 && !lists.ContainsString(result, realIP[0]) {
result = append(result, realIP[0])
}
}
@@ -1238,7 +1278,7 @@ func (this *HTTPRequest) requestRemoteAddrs() (result []string) {
// Real-Ip
{
realIP, ok := this.RawReq.Header["X-Real-Ip"]
if ok && len(realIP) > 0 {
if ok && len(realIP) > 0 && !lists.ContainsString(result, realIP[0]) {
result = append(result, realIP[0])
}
}
@@ -1248,7 +1288,9 @@ func (this *HTTPRequest) requestRemoteAddrs() (result []string) {
var remoteAddr = this.RawReq.RemoteAddr
host, _, err := net.SplitHostPort(remoteAddr)
if err == nil {
result = append(result, host)
if !lists.ContainsString(result, host) {
result = append(result, host)
}
} else {
result = append(result, remoteAddr)
}
@@ -1565,10 +1607,10 @@ func (this *HTTPRequest) setForwardHeaders(header http.Header) {
this.RawReq.Header.Set("Connection", "keep-alive")
}
remoteAddr := this.RawReq.RemoteAddr
host, _, err := net.SplitHostPort(remoteAddr)
var rawRemoteAddr = this.RawReq.RemoteAddr
host, _, err := net.SplitHostPort(rawRemoteAddr)
if err == nil {
remoteAddr = host
rawRemoteAddr = host
}
// x-real-ip
@@ -1576,24 +1618,24 @@ func (this *HTTPRequest) setForwardHeaders(header http.Header) {
_, ok1 := header["X-Real-IP"]
_, ok2 := header["X-Real-Ip"]
if !ok1 && !ok2 {
header["X-Real-IP"] = []string{remoteAddr}
header["X-Real-IP"] = []string{this.requestRemoteAddr(true)}
}
}
// X-Forwarded-For
if this.reverseProxy != nil && this.reverseProxy.ShouldAddXForwardedForHeader() {
forwardedFor, ok := header["X-Forwarded-For"]
if ok {
if ok && len(forwardedFor) > 0 { // already exists
_, hasForwardHeader := this.RawReq.Header["X-Forwarded-For"]
if hasForwardHeader {
header["X-Forwarded-For"] = []string{strings.Join(forwardedFor, ", ") + ", " + remoteAddr}
header["X-Forwarded-For"] = []string{strings.Join(forwardedFor, ", ") + ", " + rawRemoteAddr}
}
} else {
var clientRemoteAddr = this.requestRemoteAddr(true)
if len(clientRemoteAddr) > 0 && clientRemoteAddr != remoteAddr {
header["X-Forwarded-For"] = []string{clientRemoteAddr + ", " + remoteAddr}
if len(clientRemoteAddr) > 0 && clientRemoteAddr != rawRemoteAddr {
header["X-Forwarded-For"] = []string{clientRemoteAddr + ", " + rawRemoteAddr}
} else {
header["X-Forwarded-For"] = []string{remoteAddr}
header["X-Forwarded-For"] = []string{rawRemoteAddr}
}
}
}

View File

@@ -9,16 +9,20 @@ import (
)
const httpStatusPageTemplate = `<!DOCTYPE html>
<html>
<html lang="en">
<head>
<title>${status} ${statusMessage}</title>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
<style>
address { line-height: 1.8; }
</style>
</head>
<body>
<h1>${status} ${statusMessage}</h1>
<p>${message}</p>
<address>Connection: ${remoteAddr} (Client) -&gt; ${serverAddr} (Server)</address>
<address>Request ID: ${requestId}.</address>
</body>
@@ -39,8 +43,6 @@ func (this *HTTPRequest) writeCode(statusCode int, enMessage string, zhMessage s
return types.String(statusCode)
case "statusMessage":
return http.StatusText(statusCode)
case "requestId":
return this.requestId
case "message":
var acceptLanguages = this.RawReq.Header.Get("Accept-Language")
if len(acceptLanguages) > 0 {
@@ -54,7 +56,7 @@ func (this *HTTPRequest) writeCode(statusCode int, enMessage string, zhMessage s
}
return enMessage
}
return "${" + varName + "}"
return this.Format("${" + varName + "}")
})
this.ProcessResponseHeaders(this.writer.Header(), statusCode)
@@ -107,7 +109,7 @@ func (this *HTTPRequest) write50x(err error, statusCode int, enMessage string, z
}
return "The site is unavailable now, cause: " + enMessage + "."
}
return "${" + varName + "}"
return this.Format("${" + varName + "}")
})
this.ProcessResponseHeaders(this.writer.Header(), statusCode)

View File

@@ -55,7 +55,7 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
}
this.ProcessResponseHeaders(this.writer.Header(), status)
http.Redirect(this.RawWriter, this.RawReq, afterURL, status)
httpRedirect(this.writer, this.RawReq, afterURL, status)
return true
}
} else if u.MatchRegexp { // 正则匹配
@@ -97,7 +97,7 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
}
this.ProcessResponseHeaders(this.writer.Header(), status)
http.Redirect(this.RawWriter, this.RawReq, afterURL, status)
httpRedirect(this.writer, this.RawReq, afterURL, status)
return true
} else { // 精准匹配
if fullURL == u.RealBeforeURL() {
@@ -120,7 +120,7 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
}
this.ProcessResponseHeaders(this.writer.Header(), status)
http.Redirect(this.RawWriter, this.RawReq, afterURL, status)
httpRedirect(this.writer, this.RawReq, afterURL, status)
return true
}
}
@@ -139,11 +139,6 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
}
}
// 如果跳转前后域名一致,则终止
if u.DomainAfter == reqHost {
return false
}
var scheme = u.DomainAfterScheme
if len(scheme) == 0 {
scheme = this.requestScheme()
@@ -155,6 +150,11 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
return false
}
// 如果跳转前后域名一致,则终止
if u.DomainAfter == reqHost {
return false
}
this.ProcessResponseHeaders(this.writer.Header(), status)
// 参数
@@ -163,7 +163,7 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
afterURL += this.uri[qIndex:]
}
http.Redirect(this.RawWriter, this.RawReq, afterURL, status)
httpRedirect(this.writer, this.RawReq, afterURL, status)
return true
}
} else if u.Type == serverconfigs.HTTPHostRedirectTypePort {
@@ -212,7 +212,7 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
}
this.ProcessResponseHeaders(this.writer.Header(), status)
http.Redirect(this.RawWriter, this.RawReq, afterURL, status)
httpRedirect(this.writer, this.RawReq, afterURL, status)
return true
}
}

View File

@@ -20,6 +20,6 @@ func (this *HTTPRequest) checkLnRequest() bool {
return false
}
func (this *HTTPRequest) getLnOrigin(excludingNodeIds []int64) (originConfig *serverconfigs.OriginConfig, lnNodeId int64, hasMultipleNodes bool) {
func (this *HTTPRequest) getLnOrigin(excludingNodeIds []int64, urlHash uint64) (originConfig *serverconfigs.OriginConfig, lnNodeId int64, hasMultipleNodes bool) {
return nil, 0, false
}

View File

@@ -7,9 +7,9 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/waf"
"github.com/iwind/TeaGo/types"
"net"
"net/http"
"time"
)
@@ -48,7 +48,8 @@ func (this *HTTPRequest) doMismatch() {
}
// 是否正在访问IP
if globalServerConfig.HTTPAll.NodeIPShowPage && net.ParseIP(this.ReqHost) != nil {
if globalServerConfig.HTTPAll.NodeIPShowPage && utils.IsWildIP(this.ReqHost) {
this.writer.statusCode = statusCode
var contentHTML = this.Format(globalServerConfig.HTTPAll.NodeIPPageHTML)
this.writer.Header().Set("Content-Type", "text/html; charset=utf-8")
this.writer.Header().Set("Content-Length", types.String(len(contentHTML)))
@@ -71,8 +72,9 @@ func (this *HTTPRequest) doMismatch() {
}
// 处理当前连接
if mismatchAction != nil && mismatchAction.Code == "page" {
if mismatchAction != nil && mismatchAction.Code == serverconfigs.DomainMismatchActionPage {
if mismatchAction.Options != nil {
this.writer.statusCode = statusCode
var contentHTML = this.Format(mismatchAction.Options.GetString("contentHTML"))
this.writer.Header().Set("Content-Type", "text/html; charset=utf-8")
this.writer.Header().Set("Content-Length", types.String(len(contentHTML)))

View File

@@ -43,7 +43,7 @@ func (this *HTTPRequest) doRedirectToHTTPS(redirectToHTTPSConfig *serverconfigs.
var newURL = "https://" + host + this.RawReq.RequestURI
this.ProcessResponseHeaders(this.writer.Header(), statusCode)
http.Redirect(this.writer, this.RawReq, newURL, statusCode)
httpRedirect(this.writer, this.RawReq, newURL, statusCode)
return true
}

View File

@@ -7,6 +7,8 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/fnv"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/types"
"io"
"net/http"
@@ -65,7 +67,7 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
// 二级节点
var hasMultipleLnNodes = false
if this.cacheRef != nil || (this.nodeConfig != nil && this.nodeConfig.GlobalServerConfig != nil && this.nodeConfig.GlobalServerConfig.HTTPAll.ForceLnRequest) {
origin, lnNodeId, hasMultipleLnNodes = this.getLnOrigin(failedLnNodeIds)
origin, lnNodeId, hasMultipleLnNodes = this.getLnOrigin(failedLnNodeIds, fnv.HashString(this.URL()))
if origin != nil {
// 强制变更原来访问的域名
requestHost = this.ReqHost
@@ -306,7 +308,8 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
if requestErr != nil {
// 客户端取消请求,则不提示
httpErr, ok := requestErr.(*url.Error)
var httpErr *url.Error
ok := errors.As(requestErr, &httpErr)
if !ok {
if isHTTPOrigin {
SharedOriginStateManager.Fail(origin, requestHost, this.reverseProxy, func() {
@@ -320,7 +323,7 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
this.write50x(requestErr, http.StatusBadGateway, "Failed to read origin site", "源站读取失败", true)
}
remotelogs.WarnServer("HTTP_REQUEST_REVERSE_PROXY", this.RawReq.URL.String()+": Request origin server failed: "+requestErr.Error())
} else if httpErr.Err != context.Canceled {
} else if !errors.Is(httpErr, context.Canceled) {
if isHTTPOrigin {
SharedOriginStateManager.Fail(origin, requestHost, this.reverseProxy, func() {
this.reverseProxy.ResetScheduling()
@@ -353,7 +356,7 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
// 是否为客户端方面的错误
var isClientError = false
if ok {
if httpErr.Err == context.Canceled {
if errors.Is(httpErr, context.Canceled) {
// 如果是服务器端主动关闭,则无需提示
if this.isConnClosed() {
this.disableLog = true
@@ -373,6 +376,35 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
return
}
// 50x
if resp != nil &&
resp.StatusCode >= 500 &&
resp.StatusCode < 510 &&
this.reverseProxy.Retry50X &&
(originId > 0 || (lnNodeId > 0 && hasMultipleLnNodes)) &&
!isLastRetry {
if resp.Body != nil {
_ = resp.Body.Close()
}
shouldRetry = true
return
}
// 尝试从缓存中恢复
if resp != nil &&
resp.StatusCode >= 500 && // support 50X only
resp.StatusCode < 510 &&
this.cacheCanTryStale &&
this.web.Cache.Stale != nil &&
this.web.Cache.Stale.IsOn &&
(len(this.web.Cache.Stale.Status) == 0 || lists.ContainsInt(this.web.Cache.Stale.Status, resp.StatusCode)) {
var ok = this.doCacheRead(true)
if ok {
return
}
}
// 记录相关数据
this.originStatus = int32(resp.StatusCode)

View File

@@ -31,10 +31,10 @@ func (this *HTTPRequest) doRewrite() (shouldShop bool) {
if this.rewriteRule.Mode == serverconfigs.HTTPRewriteModeRedirect {
if this.rewriteRule.RedirectStatus > 0 {
this.ProcessResponseHeaders(this.writer.Header(), this.rewriteRule.RedirectStatus)
http.Redirect(this.writer, this.RawReq, this.rewriteReplace, this.rewriteRule.RedirectStatus)
httpRedirect(this.writer, this.RawReq, this.rewriteReplace, this.rewriteRule.RedirectStatus)
} else {
this.ProcessResponseHeaders(this.writer.Header(), http.StatusTemporaryRedirect)
http.Redirect(this.writer, this.RawReq, this.rewriteReplace, http.StatusTemporaryRedirect)
httpRedirect(this.writer, this.RawReq, this.rewriteReplace, http.StatusTemporaryRedirect)
}
return true
}

View File

@@ -8,15 +8,17 @@ import (
// 流量限制
func (this *HTTPRequest) doTrafficLimit() {
var config = this.ReqServer.TrafficLimit
this.tags = append(this.tags, "bandwidth")
this.tags = append(this.tags, "trafficLimit")
var statusCode = 509
this.writer.statusCode = statusCode
this.ProcessResponseHeaders(this.writer.Header(), statusCode)
this.writer.Header().Set("Content-Type", "text/html; charset=utf-8")
this.writer.WriteHeader(statusCode)
if len(config.NoticePageBody) != 0 {
var config = this.ReqServer.TrafficLimit
if config != nil && len(config.NoticePageBody) != 0 {
_, _ = this.writer.WriteString(this.Format(config.NoticePageBody))
} else {
_, _ = this.writer.WriteString(this.Format(serverconfigs.DefaultTrafficLimitNoticePageBody))

View File

@@ -16,6 +16,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
"github.com/TeaOSLab/EdgeNode/internal/utils/readers"
setutils "github.com/TeaOSLab/EdgeNode/internal/utils/sets"
"github.com/TeaOSLab/EdgeNode/internal/utils/writers"
_ "github.com/biessek/golang-ico"
"github.com/iwind/TeaGo/lists"
@@ -39,6 +40,7 @@ import (
var webpMaxBufferSize int64 = 1_000_000_000
var webpTotalBufferSize int64 = 0
var webpIgnoreURLSet = setutils.NewFixedSet(131072)
func init() {
if !teaconst.IsMain {
@@ -47,7 +49,7 @@ func init() {
var systemMemory = utils.SystemMemoryGB() / 8
if systemMemory > 0 {
webpMaxBufferSize = int64(systemMemory) * 1024 * 1024 * 1024
webpMaxBufferSize = int64(systemMemory) << 30
}
}
@@ -547,6 +549,11 @@ func (this *HTTPWriter) PrepareWebP(resp *http.Response, size int64) {
this.req.web.WebP.IsOn &&
this.req.web.WebP.MatchResponse(contentType, size, filepath.Ext(this.req.Path()), this.req.Format) &&
this.req.web.WebP.MatchAccept(this.req.requestHeader("Accept")) {
// 检查是否已经因为尺寸过大而忽略
if webpIgnoreURLSet.Has(this.req.URL()) {
return
}
// 如果已经是WebP不再重复处理
// TODO 考虑是否需要很严格的匹配
if strings.Contains(contentType, "image/webp") {
@@ -787,12 +794,10 @@ func (this *HTTPWriter) AddHeaders(header http.Header) {
continue
}
switch key {
case "ETag":
case "Accept-CH", "ETag", "Content-MD5", "IM", "P3P", "WWW-Authenticate", "X-Request-ID":
newHeaders[key] = value
default:
for _, v := range value {
newHeaders.Add(key, v)
}
newHeaders[http.CanonicalHeaderKey(key)] = value
}
}
}
@@ -908,7 +913,7 @@ func (this *HTTPWriter) SendResp(resp *http.Response) (int64, error) {
// Redirect 跳转
func (this *HTTPWriter) Redirect(status int, url string) {
http.Redirect(this.rawWriter, this.req.RawReq, url, status)
httpRedirect(this, this.req.RawReq, url, status)
this.isFinished = true
}
@@ -984,7 +989,7 @@ func (this *HTTPWriter) DelayRead() bool {
// 计算stale时长
func (this *HTTPWriter) calculateStaleLife() int {
var staleLife = 600 // TODO 可以在缓存策略里设置此时间
var staleLife = caches.DefaultStaleCacheSeconds
var staleConfig = this.req.web.Cache.Stale
if staleConfig != nil && staleConfig.IsOn {
// 从Header中读取stale-if-error
@@ -1074,12 +1079,24 @@ func (this *HTTPWriter) finishWebP() {
var err error
if isGif {
gifImage, err = gif.DecodeAll(reader)
if gifImage != nil && (gifImage.Config.Width > gowebp.WebPMaxDimension || gifImage.Config.Height > gowebp.WebPMaxDimension) {
webpIgnoreURLSet.Push(this.req.URL())
return
}
} else {
imageData, _, err = image.Decode(reader)
if imageData != nil {
var bound = imageData.Bounds()
if bound.Max.X > gowebp.WebPMaxDimension || bound.Max.Y > gowebp.WebPMaxDimension {
webpIgnoreURLSet.Push(this.req.URL())
return
}
}
}
if err != nil {
// 发生了错误终止处理
webpIgnoreURLSet.Push(this.req.URL())
return
}
@@ -1267,7 +1284,7 @@ func (this *HTTPWriter) calculateHeaderLength() (result int) {
func (this *HTTPWriter) shouldIgnoreHeader(name string) bool {
switch name {
case "Set-Cookie", "Strict-Transport-Security", "Alt-Svc", "Upgrade":
case "Set-Cookie", "Strict-Transport-Security", "Alt-Svc", "Upgrade", "X-Cache":
return true
default:
return (this.isPartial && name == "Content-Range")

View File

@@ -7,6 +7,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/sslconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/types"
"net"
)
@@ -179,10 +180,10 @@ func (this *BaseListener) findNamedServer(name string) (serverConfig *serverconf
if globalServerConfig != nil &&
len(globalServerConfig.HTTPAll.DefaultDomain) > 0 &&
(!matchDomainStrictly || configutils.MatchDomains(globalServerConfig.HTTPAll.AllowMismatchDomains, name) || (globalServerConfig.HTTPAll.AllowNodeIP && net.ParseIP(name) != nil)) {
(!matchDomainStrictly || configutils.MatchDomains(globalServerConfig.HTTPAll.AllowMismatchDomains, name) || (globalServerConfig.HTTPAll.AllowNodeIP && utils.IsWildIP(name))) {
if globalServerConfig.HTTPAll.AllowNodeIP &&
globalServerConfig.HTTPAll.NodeIPShowPage &&
net.ParseIP(name) != nil {
utils.IsWildIP(name) {
return
} else {
var defaultDomain = globalServerConfig.HTTPAll.DefaultDomain
@@ -193,7 +194,7 @@ func (this *BaseListener) findNamedServer(name string) (serverConfig *serverconf
}
}
if matchDomainStrictly && !configutils.MatchDomains(globalServerConfig.HTTPAll.AllowMismatchDomains, name) && (!globalServerConfig.HTTPAll.AllowNodeIP || net.ParseIP(name) == nil) {
if matchDomainStrictly && !configutils.MatchDomains(globalServerConfig.HTTPAll.AllowMismatchDomains, name) && (!globalServerConfig.HTTPAll.AllowNodeIP || !utils.IsWildIP(name)) {
return
}

View File

@@ -177,6 +177,12 @@ func (this *HTTPListener) ServeHTTP(rawWriter http.ResponseWriter, rawReq *http.
return
}
clientConn.SetUserId(server.UserId)
var userPlanId int64
if server.UserPlan != nil && server.UserPlan.Id > 0 {
userPlanId = server.UserPlan.Id
}
clientConn.SetUserPlanId(userPlanId)
}
}
}

View File

@@ -80,6 +80,12 @@ func (this *TCPListener) handleConn(conn net.Conn) error {
return nil
}
clientConn.SetUserId(server.UserId)
var userPlanId int64
if server.UserPlan != nil && server.UserPlan.Id > 0 {
userPlanId = server.UserPlan.Id
}
clientConn.SetUserPlanId(userPlanId)
} else {
tlsConn, ok := conn.(*tls.Conn)
if ok {
@@ -92,6 +98,12 @@ func (this *TCPListener) handleConn(conn net.Conn) error {
return nil
}
clientConn.SetUserId(server.UserId)
var userPlanId int64
if server.UserPlan != nil && server.UserPlan.Id > 0 {
userPlanId = server.UserPlan.Id
}
clientConn.SetUserPlanId(userPlanId)
}
}
}

View File

@@ -404,7 +404,11 @@ func NewUDPConn(server *serverconfigs.ServerConfig, addr net.Addr, proxyListener
stats.SharedTrafficStatManager.Add(server.UserId, server.Id, "", int64(n), 0, 0, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId())
// 带宽
stats.SharedBandwidthStatManager.AddBandwidth(server.UserId, server.Id, int64(n), int64(n))
var userPlanId int64
if server.UserPlan != nil && server.UserPlan.Id > 0 {
userPlanId = server.UserPlan.Id
}
stats.SharedBandwidthStatManager.AddBandwidth(server.UserId, userPlanId, server.Id, int64(n), int64(n))
}
}
if err != nil {

View File

@@ -808,6 +808,36 @@ func (this *Node) listenSock() error {
_ = cmd.Reply(&gosock.Command{Params: maps.Map{
"stats": m,
}})
case "cache.garbage":
var shouldDelete = maps.NewMap(cmd.Params).GetBool("delete")
var count = 0
var sampleFiles = []string{}
err := caches.SharedManager.ScanGarbageCaches(func(path string) error {
count++
if len(sampleFiles) < 10 {
sampleFiles = append(sampleFiles, path)
}
if shouldDelete {
_ = os.Remove(path) // .cache
_ = os.Remove(caches.PartialRangesFilePath(path)) // @range.cache
}
return nil
})
if err != nil {
_ = cmd.Reply(&gosock.Command{Params: maps.Map{
"isOk": false,
"error": err.Error(),
}})
} else {
_ = cmd.Reply(&gosock.Command{Params: maps.Map{
"isOk": true,
"count": count,
"sampleFiles": sampleFiles,
}})
}
}
})

View File

@@ -264,7 +264,9 @@ func (this *NodeStatusExecutor) updateDisk(status *nodeconfigs.NodeStatus) {
}
}
status.DiskTotal = total
status.DiskUsage = float64(totalUsage) / float64(total)
if total > 0 {
status.DiskUsage = float64(totalUsage) / float64(total)
}
status.DiskMaxUsage = maxUsage / 100
// 记录监控数据
@@ -280,7 +282,7 @@ func (this *NodeStatusExecutor) updateCacheSpace(status *nodeconfigs.NodeStatus)
var result = []maps.Map{}
var cachePaths = caches.SharedManager.FindAllCachePaths()
for _, path := range cachePaths {
stat, err := fsutils.Stat(path)
stat, err := fsutils.StatDevice(path)
if err != nil {
return
}

View File

@@ -4,6 +4,7 @@ package nodes
import (
"encoding/json"
"errors"
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
@@ -16,9 +17,12 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/waf"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/types"
"os"
"strings"
"time"
)
@@ -94,7 +98,12 @@ func (this *Node) execTask(rpcClient *rpc.RPCClient, task *pb.NodeTask) error {
case "toaChanged":
err = this.execTOAChangedTask()
default:
remotelogs.Error("NODE", "task '"+types.String(task.Id)+"', type '"+task.Type+"' has not been handled")
// 特殊任务
if strings.HasPrefix(task.Type, "ipListDeleted") { // 删除IP名单
err = this.execDeleteIPList(task.Type)
} else { // 未处理的任务
remotelogs.Error("NODE", "task '"+types.String(task.Id)+"', type '"+task.Type+"' has not been handled")
}
}
return err
@@ -285,6 +294,34 @@ func (this *Node) execUpdatingServersTask(rpcClient *rpc.RPCClient) error {
return nil
}
// 删除IP名单
func (this *Node) execDeleteIPList(taskType string) error {
optionsString, ok := strings.CutPrefix(taskType, "ipListDeleted@")
if !ok {
return errors.New("invalid task type '" + taskType + "'")
}
var optionMap = maps.Map{}
err := json.Unmarshal([]byte(optionsString), &optionMap)
if err != nil {
return fmt.Errorf("decode options failed: %w, options: %s", err, optionsString)
}
var listId = optionMap.GetInt64("listId")
if listId <= 0 {
return nil
}
// 标记已被删除
waf.AddDeletedIPList(listId)
var list = iplibrary.SharedIPListManager.FindList(listId)
if list != nil {
list.SetDeleted()
}
return nil
}
// 标记任务完成
func (this *Node) finishTask(taskId int64, taskVersion int64, taskErr error) (success bool) {
if taskId <= 0 {

View File

@@ -62,6 +62,7 @@ type BandwidthStat struct {
CountRequests int64 `json:"countRequests"`
CountCachedRequests int64 `json:"countCachedRequests"`
CountAttackRequests int64 `json:"countAttackRequests"`
UserPlanId int64 `json:"userPlanId"`
}
// BandwidthStatManager 服务带宽统计
@@ -153,6 +154,7 @@ func (this *BandwidthStatManager) Loop() error {
CountRequests: stat.CountRequests,
CountCachedRequests: stat.CountCachedRequests,
CountAttackRequests: stat.CountAttackRequests,
UserPlanId: stat.UserPlanId,
NodeRegionId: regionId,
})
delete(this.m, key)
@@ -178,7 +180,7 @@ func (this *BandwidthStatManager) Loop() error {
}
// AddBandwidth 添加带宽数据
func (this *BandwidthStatManager) AddBandwidth(userId int64, serverId int64, peekBytes int64, totalBytes int64) {
func (this *BandwidthStatManager) AddBandwidth(userId int64, userPlanId int64, serverId int64, peekBytes int64, totalBytes int64) {
if serverId <= 0 || (peekBytes == 0 && totalBytes == 0) {
return
}
@@ -217,6 +219,7 @@ func (this *BandwidthStatManager) AddBandwidth(userId int64, serverId int64, pee
Day: day,
TimeAt: timeAt,
UserId: userId,
UserPlanId: userPlanId,
ServerId: serverId,
CurrentBytes: peekBytes,
MaxBytes: peekBytes,

View File

@@ -12,22 +12,22 @@ import (
func TestBandwidthStatManager_Add(t *testing.T) {
var manager = stats.NewBandwidthStatManager()
manager.AddBandwidth(1, 1, 10, 10)
manager.AddBandwidth(1, 1, 10, 10)
manager.AddBandwidth(1, 1, 10, 10)
manager.AddBandwidth(1, 0, 1, 10, 10)
manager.AddBandwidth(1, 0, 1, 10, 10)
manager.AddBandwidth(1, 0, 1, 10, 10)
time.Sleep(1 * time.Second)
manager.AddBandwidth(1, 1, 85, 85)
manager.AddBandwidth(1, 0, 1, 85, 85)
time.Sleep(1 * time.Second)
manager.AddBandwidth(1, 1, 25, 25)
manager.AddBandwidth(1, 1, 75, 75)
manager.AddBandwidth(1, 0, 1, 25, 25)
manager.AddBandwidth(1, 0, 1, 75, 75)
manager.Inspect()
}
func TestBandwidthStatManager_Loop(t *testing.T) {
var manager = stats.NewBandwidthStatManager()
manager.AddBandwidth(1, 1, 10, 10)
manager.AddBandwidth(1, 1, 10, 10)
manager.AddBandwidth(1, 1, 10, 10)
manager.AddBandwidth(1, 0, 1, 10, 10)
manager.AddBandwidth(1, 0, 1, 10, 10)
manager.AddBandwidth(1, 0, 1, 10, 10)
err := manager.Loop()
if err != nil {
t.Fatal(err)
@@ -40,7 +40,7 @@ func BenchmarkBandwidthStatManager_Add(b *testing.B) {
var i int
for pb.Next() {
i++
manager.AddBandwidth(1, int64(i%100), 10, 10)
manager.AddBandwidth(1, 0, int64(i%100), 10, 10)
}
})
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fs"
_ "github.com/mattn/go-sqlite3"
"os"
"strings"
"sync"
"time"
@@ -47,15 +48,17 @@ func open(dsn string, lock bool) (*DB, error) {
return nil, errors.New("can not open database when process is quiting")
}
// decode path
var path = dsn
var queryIndex = strings.Index(dsn, "?")
if queryIndex >= 0 {
path = path[:queryIndex]
}
path = strings.TrimSpace(strings.TrimPrefix(path, "file:"))
// locker
var locker *fsutils.Locker
if lock {
var path = dsn
var queryIndex = strings.Index(dsn, "?")
if queryIndex >= 0 {
path = path[:queryIndex]
}
path = strings.TrimSpace(strings.TrimPrefix(path, "file:"))
locker = fsutils.NewLocker(path)
err := locker.Lock()
if err != nil {
@@ -64,12 +67,30 @@ func open(dsn string, lock bool) (*DB, error) {
}
}
// check if closed successfully last time, if not we recover it
var walPath = path + "-wal"
_, statWalErr := os.Stat(walPath)
var shouldRecover = statWalErr == nil
// open
rawDB, err := sql.Open("sqlite3", dsn)
if err != nil {
return nil, err
}
if shouldRecover {
err = rawDB.Close()
if err != nil {
return nil, err
}
// open again
rawDB, err = sql.Open("sqlite3", dsn)
if err != nil {
return nil, err
}
}
var db = NewDB(rawDB, dsn)
db.locker = locker
return db, nil

View File

@@ -14,3 +14,11 @@ func TestHash(t *testing.T) {
t.Log(key + " => " + types.String(h))
}
}
func BenchmarkHashString(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
fnv.HashString("abcdefh")
}
})
}

View File

@@ -8,8 +8,8 @@ import (
"sync"
)
// Stat device contains the path
func Stat(path string) (*StatResult, error) {
// StatDevice device contains the path
func StatDevice(path string) (*StatResult, error) {
var stat = &unix.Statfs_t{}
err := unix.Statfs(path, stat)
if err != nil {
@@ -23,8 +23,8 @@ var cacheMap = map[string]*StatResult{} // path => StatResult
const cacheLife = 3 // seconds
// StatCache stat device with cache
func StatCache(path string) (*StatResult, error) {
// StatDeviceCache stat device with cache
func StatDeviceCache(path string) (*StatResult, error) {
locker.RLock()
stat, ok := cacheMap[path]
if ok && stat.updatedAt >= fasttime.Now().Unix()-cacheLife {
@@ -36,7 +36,7 @@ func StatCache(path string) (*StatResult, error) {
locker.Lock()
defer locker.Unlock()
stat, err := Stat(path)
stat, err := StatDevice(path)
if err != nil {
return nil, err
}

View File

@@ -10,7 +10,7 @@ import (
)
func TestStat(t *testing.T) {
stat, err := fsutils.Stat("/usr/local")
stat, err := fsutils.StatDevice("/usr/local")
if err != nil {
t.Fatal(err)
}
@@ -19,7 +19,7 @@ func TestStat(t *testing.T) {
func TestStatCache(t *testing.T) {
for i := 0; i < 10; i++ {
stat, err := fsutils.StatCache("/usr/local")
stat, err := fsutils.StatDeviceCache("/usr/local")
if err != nil {
t.Fatal(err)
}
@@ -40,16 +40,16 @@ func TestConcurrent(t *testing.T) {
go func() {
defer wg.Done()
_, _ = fsutils.Stat("/usr/local")
_, _ = fsutils.StatDevice("/usr/local")
}()
}
wg.Wait()
}
func BenchmarkStat(b *testing.B) {
func BenchmarkStatDevice(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := fsutils.Stat("/usr/local")
_, err := fsutils.StatDevice("/usr/local")
if err != nil {
b.Fatal(err)
}
@@ -57,10 +57,10 @@ func BenchmarkStat(b *testing.B) {
})
}
func BenchmarkStatCache(b *testing.B) {
func BenchmarkStatCacheDevice(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := fsutils.StatCache("/usr/local")
_, err := fsutils.StatDeviceCache("/usr/local")
if err != nil {
b.Fatal(err)
}

View File

@@ -72,3 +72,18 @@ func IsIPv6(ip string) bool {
}
return !IsIPv4(ip)
}
// IsWildIP 宽泛地判断一个数据是否为IP
func IsWildIP(v string) bool {
var l = len(v)
if l == 0 {
return false
}
// for [IPv6]
if v[0] == '[' && v[l-1] == ']' {
return IsWildIP(v[1 : l-1])
}
return net.ParseIP(v) != nil
}

View File

@@ -1,51 +1,65 @@
package utils
package utils_test
import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/assert"
"testing"
)
func TestIP2Long(t *testing.T) {
t.Log(IP2Long("0.0.0.0"))
t.Log(IP2Long("1.0.0.0"))
t.Log(IP2Long("0.0.0.0.0"))
t.Log(IP2Long("2001:db8:0:1::101"))
t.Log(IP2Long("2001:db8:0:1::102"))
t.Log(IP2Long("::1"))
t.Log(utils.IP2Long("0.0.0.0"))
t.Log(utils.IP2Long("1.0.0.0"))
t.Log(utils.IP2Long("0.0.0.0.0"))
t.Log(utils.IP2Long("2001:db8:0:1::101"))
t.Log(utils.IP2Long("2001:db8:0:1::102"))
t.Log(utils.IP2Long("::1"))
}
func TestIsLocalIP(t *testing.T) {
var a = assert.NewAssertion(t)
a.IsFalse(IsLocalIP("a"))
a.IsFalse(IsLocalIP("1.2.3"))
a.IsTrue(IsLocalIP("127.0.0.1"))
a.IsTrue(IsLocalIP("192.168.0.1"))
a.IsTrue(IsLocalIP("10.0.0.1"))
a.IsTrue(IsLocalIP("172.16.0.1"))
a.IsTrue(IsLocalIP("::1"))
a.IsFalse(IsLocalIP("::1:2:3"))
a.IsFalse(IsLocalIP("8.8.8.8"))
a.IsFalse(utils.IsLocalIP("a"))
a.IsFalse(utils.IsLocalIP("1.2.3"))
a.IsTrue(utils.IsLocalIP("127.0.0.1"))
a.IsTrue(utils.IsLocalIP("192.168.0.1"))
a.IsTrue(utils.IsLocalIP("10.0.0.1"))
a.IsTrue(utils.IsLocalIP("172.16.0.1"))
a.IsTrue(utils.IsLocalIP("::1"))
a.IsFalse(utils.IsLocalIP("::1:2:3"))
a.IsFalse(utils.IsLocalIP("8.8.8.8"))
}
func TestIsIPv4(t *testing.T) {
var a = assert.NewAssertion(t)
a.IsTrue(IsIPv4("192.168.1.1"))
a.IsTrue(IsIPv4("0.0.0.0"))
a.IsFalse(IsIPv4("192.168.1.256"))
a.IsFalse(IsIPv4("192.168.1"))
a.IsFalse(IsIPv4("::1"))
a.IsFalse(IsIPv4("2001:0db8:85a3:0000:0000:8a2e:0370:7334"))
a.IsFalse(IsIPv4("::ffff:192.168.0.1"))
a.IsTrue(utils.IsIPv4("192.168.1.1"))
a.IsTrue(utils.IsIPv4("0.0.0.0"))
a.IsFalse(utils.IsIPv4("192.168.1.256"))
a.IsFalse(utils.IsIPv4("192.168.1"))
a.IsFalse(utils.IsIPv4("::1"))
a.IsFalse(utils.IsIPv4("2001:0db8:85a3:0000:0000:8a2e:0370:7334"))
a.IsFalse(utils.IsIPv4("::ffff:192.168.0.1"))
}
func TestIsIPv6(t *testing.T) {
var a = assert.NewAssertion(t)
a.IsFalse(IsIPv6("192.168.1.1"))
a.IsFloat32(IsIPv6("0.0.0.0"))
a.IsFalse(IsIPv6("192.168.1.256"))
a.IsFalse(IsIPv6("192.168.1"))
a.IsTrue(IsIPv6("::1"))
a.IsTrue(IsIPv6("2001:0db8:85a3:0000:0000:8a2e:0370:7334"))
a.IsTrue(IsIPv4("::ffff:192.168.0.1"))
a.IsTrue(IsIPv6("::ffff:192.168.0.1"))
a.IsFalse(utils.IsIPv6("192.168.1.1"))
a.IsFloat32(utils.IsIPv6("0.0.0.0"))
a.IsFalse(utils.IsIPv6("192.168.1.256"))
a.IsFalse(utils.IsIPv6("192.168.1"))
a.IsTrue(utils.IsIPv6("::1"))
a.IsTrue(utils.IsIPv6("2001:0db8:85a3:0000:0000:8a2e:0370:7334"))
a.IsTrue(utils.IsIPv4("::ffff:192.168.0.1"))
a.IsTrue(utils.IsIPv6("::ffff:192.168.0.1"))
}
func TestIsWildIP(t *testing.T) {
var a = assert.NewAssertion(t)
a.IsTrue(utils.IsWildIP("192.168.1.100"))
a.IsTrue(utils.IsWildIP("::1"))
a.IsTrue(utils.IsWildIP("2001:0db8:85a3:0000:0000:8a2e:0370:7334"))
a.IsTrue(utils.IsWildIP("[2001:0db8:85a3:0000:0000:8a2e:0370:7334]"))
a.IsFalse(utils.IsWildIP(""))
a.IsFalse(utils.IsWildIP("[]"))
a.IsFalse(utils.IsWildIP("[1]"))
a.IsFalse(utils.IsWildIP("192.168.2.256"))
a.IsFalse(utils.IsWildIP("192.168.2"))
}

View File

@@ -108,11 +108,12 @@ func init() {
type RecordIPAction struct {
BaseAction
Type string `yaml:"type" json:"type"`
IPListId int64 `yaml:"ipListId" json:"ipListId"`
Level string `yaml:"level" json:"level"`
Timeout int32 `yaml:"timeout" json:"timeout"`
Scope string `yaml:"scope" json:"scope"`
Type string `yaml:"type" json:"type"`
IPListId int64 `yaml:"ipListId" json:"ipListId"`
IPListIsDeleted bool `yaml:"ipListIsDeleted" json:"ipListIsDeleted"`
Level string `yaml:"level" json:"level"`
Timeout int32 `yaml:"timeout" json:"timeout"`
Scope string `yaml:"scope" json:"scope"`
}
func (this *RecordIPAction) Init(waf *WAF) error {
@@ -132,6 +133,9 @@ func (this *RecordIPAction) WillChange() bool {
}
func (this *RecordIPAction) Perform(waf *WAF, group *RuleGroup, set *RuleSet, request requests.Request, writer http.ResponseWriter) (continueRequest bool, goNextSet bool) {
// 是否已删除
var ipListIsAvailable = this.IPListId > 0 && !this.IPListIsDeleted && !ExistDeletedIPList(this.IPListId)
// 是否在本地白名单中
if SharedIPWhiteList.Contains("set:"+types.String(set.Id), this.Scope, request.WAFServerId(), request.WAFRemoteIP()) {
return true, false
@@ -152,14 +156,18 @@ func (this *RecordIPAction) Perform(waf *WAF, group *RuleGroup, set *RuleSet, re
request.WAFClose()
// 先加入本地的黑名单
SharedIPBlackList.Add(IPTypeAll, this.Scope, request.WAFServerId(), request.WAFRemoteIP(), expiresAt)
if ipListIsAvailable {
SharedIPBlackList.Add(IPTypeAll, this.Scope, request.WAFServerId(), request.WAFRemoteIP(), expiresAt)
}
} else {
// 加入本地白名单
SharedIPWhiteList.Add("set:"+types.String(set.Id), this.Scope, request.WAFServerId(), request.WAFRemoteIP(), expiresAt)
if ipListIsAvailable {
SharedIPWhiteList.Add("set:"+types.String(set.Id), this.Scope, request.WAFServerId(), request.WAFRemoteIP(), expiresAt)
}
}
// 上报
if this.IPListId > 0 {
if this.IPListId > 0 && ipListIsAvailable {
var serverId int64
if this.Scope == firewallconfigs.FirewallScopeService {
serverId = request.WAFServerId()

View File

@@ -0,0 +1,30 @@
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package waf
import (
"github.com/TeaOSLab/EdgeNode/internal/zero"
"sync"
)
var deletedIPListIdMap = map[int64]zero.Zero{} // listId => Zero
var deletedIPListLocker = sync.RWMutex{}
// AddDeletedIPList add deleted ip list
func AddDeletedIPList(ipListId int64) {
if ipListId <= 0 {
return
}
deletedIPListLocker.Lock()
deletedIPListIdMap[ipListId] = zero.Zero{}
deletedIPListLocker.Unlock()
}
// ExistDeletedIPList check if ip list has been deleted
func ExistDeletedIPList(ipListId int64) bool {
deletedIPListLocker.RLock()
_, ok := deletedIPListIdMap[ipListId]
deletedIPListLocker.RUnlock()
return ok
}