Compare commits

..

45 Commits

Author SHA1 Message Date
刘祥超
644ada1da9 优化代码 2023-03-12 20:32:15 +08:00
刘祥超
0c40250849 优化代码 2023-03-12 16:36:59 +08:00
刘祥超
1d1134a86d 优化代码 2023-03-12 16:18:16 +08:00
刘祥超
28e7664eb7 修复源站重试时可能产生的file is writing错误 2023-03-12 16:09:06 +08:00
刘祥超
50f3ad641c 优化统计相关程序 2023-03-12 12:19:25 +08:00
刘祥超
cc7cf5f8c5 限制统计系统和浏览器的最大长度,减少随机UserAgent攻击影响 2023-03-11 11:58:05 +08:00
刘祥超
339f0f6e94 上传统计数据时增加单次上传数量限制 2023-03-11 11:21:03 +08:00
刘祥超
f558e43342 根据系统内存调整访问日志队列长度 2023-03-10 22:31:40 +08:00
刘祥超
e374e5c90c 优化命令识别 2023-03-10 22:05:40 +08:00
刘祥超
563b775e49 优化命令执行速度 2023-03-10 22:01:39 +08:00
刘祥超
de9e1a4515 执行一般命令时不运行init()中内容 2023-03-10 15:14:14 +08:00
刘祥超
f64b36f17a WAF cc防护增加“检查请求来源指纹”选项 2023-03-10 10:41:16 +08:00
刘祥超
f0e8c82d31 增加CC防护(开源用户需要自己实现) 2023-03-09 15:15:22 +08:00
刘祥超
5770d43230 WAF cc2尝试使用指纹统计方法 2023-03-08 16:59:44 +08:00
刘祥超
d4944c236f 修复几处测试用例 2023-03-08 10:13:41 +08:00
刘祥超
33c761a187 修复本地数据库无法异步提交事务的Bug 2023-03-07 16:48:03 +08:00
刘祥超
d7e6da8d2c 对本地数据库文件进行加锁 2023-03-07 16:22:32 +08:00
刘祥超
44d1a2415c 集群服务设置增加“记录找不到网站日志”选项 2023-03-07 10:30:55 +08:00
刘祥超
c98ff50f06 暂时取消GET302和POST307的迟滞 2023-03-07 08:51:39 +08:00
刘祥超
8835fcb09e GET302/POST307增加访问迟滞 2023-03-06 16:28:54 +08:00
刘祥超
77c56e58c0 GET302/POST307兼容safari浏览器 2023-03-06 16:27:06 +08:00
刘祥超
72c65ca4ee 修复GET302和POST307关闭连接后无法响应的问题 2023-03-06 16:10:58 +08:00
刘祥超
ab019b0bdc 修复在连接读写优化模式下fastcgi无法正常工作的Bug 2023-03-06 10:33:54 +08:00
刘祥超
9709e45ad2 修改软内存限制设置错误 2023-03-05 21:19:10 +08:00
刘祥超
be1f80003c 增加软内存最大值限制 2023-03-05 12:34:46 +08:00
刘祥超
252fcca383 增加测试用例 2023-03-03 14:28:58 +08:00
刘祥超
04ae8fa4a0 系统内存不足时,尝试自动回收内存 2023-03-02 10:54:25 +08:00
刘祥超
c95bd7776a WAF拦截动作可以设置最大封禁时间,从而实现封禁时间随机 2023-03-01 19:00:08 +08:00
刘祥超
8219167d05 WAF支持忽略全局WAF规则 2023-03-01 16:46:43 +08:00
刘祥超
e0a6881343 上传带宽数据时同时上传流量数据 2023-02-27 10:48:16 +08:00
刘祥超
6e985d7f06 修复GET302和POST307无限循环的问题 2023-02-24 17:02:59 +08:00
刘祥超
66719b05dd 修复WAF验证码不能输入超出6位数字的Bug 2023-02-16 14:44:56 +08:00
刘祥超
7197583fea 增加变量${requestPathLowerExtension} 2023-02-10 10:43:30 +08:00
刘祥超
ce29024eef 写入Agent IP时,不提示id重复错误 2023-02-01 10:18:08 +08:00
刘祥超
e1ac67f7fa 版本更改为0.6.4 2023-02-01 10:07:30 +08:00
刘祥超
01812144dd 优化带宽统计 2023-01-12 19:09:57 +08:00
刘祥超
1c34e49629 优化代码 2023-01-12 19:02:38 +08:00
刘祥超
f233fbfb25 版本号修改为0.6.3 2023-01-11 15:44:53 +08:00
刘祥超
5387115e4a 优化代码 2023-01-11 15:24:48 +08:00
刘祥超
d82c03db23 修复在HTTPS下无法连接Websocket的问题 2023-01-10 21:20:27 +08:00
刘祥超
230c5c3766 版本号修改为0.6.2 2023-01-10 21:18:53 +08:00
刘祥超
927425149e 优化代码 2023-01-10 09:47:56 +08:00
刘祥超
5ce1aab92c 修复域名跳转时没有携带参数的Bug 2023-01-09 20:06:09 +08:00
刘祥超
195742bb26 修复读超时时间(ReadDeadline)导致WAFGET302、POST307延时关闭连接的问题 2023-01-09 15:56:59 +08:00
刘祥超
006cc2912d 版本修改为0.6.1 2023-01-09 15:49:16 +08:00
81 changed files with 1017 additions and 242 deletions

11
internal/apps/main.go Normal file
View File

@@ -0,0 +1,11 @@
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package apps
import teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
func RunMain(f func()) {
if teaconst.IsMain {
f()
}
}

View File

@@ -7,9 +7,9 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fnv"
"github.com/iwind/TeaGo/types"
_ "github.com/mattn/go-sqlite3"
"os"
"sync/atomic"
"time"
@@ -450,7 +450,7 @@ func (this *FileList) UpgradeV3(oldDir string, brokenOnError bool) error {
remotelogs.Println("CACHE", "upgrading local database finished")
}()
db, err := sql.Open("sqlite3", "file:"+indexDBPath+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF")
db, err := dbs.OpenWriter("file:" + indexDBPath + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF&_locking_mode=EXCLUSIVE")
if err != nil {
return err
}

View File

@@ -3,7 +3,6 @@
package caches
import (
"database/sql"
"errors"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/goman"
@@ -82,14 +81,15 @@ func (this *FileListDB) Open(dbPath string) error {
}
// write db
writeDB, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=private&mode=rwc&_journal_mode=WAL&_sync=OFF&_cache_size="+types.String(cacheSize)+"&_secure_delete=FAST")
// 这里不能加 EXCLUSIVE 锁,不然异步事务可能会失败
writeDB, err := dbs.OpenWriter("file:" + dbPath + "?cache=private&mode=rwc&_journal_mode=WAL&_sync=OFF&_cache_size=" + types.String(cacheSize) + "&_secure_delete=FAST")
if err != nil {
return errors.New("open write database failed: " + err.Error())
}
writeDB.SetMaxOpenConns(1)
this.writeDB = dbs.NewDB(writeDB)
this.writeDB = writeDB
// TODO 耗时过长,暂时不整理数据库
// TODO 需要根据行数来判断是否VACUUM
@@ -109,7 +109,7 @@ func (this *FileListDB) Open(dbPath string) error {
}
}
this.writeBatch = dbs.NewBatch(writeDB, 4)
this.writeBatch = dbs.NewBatch(writeDB.RawDB(), 4)
this.writeBatch.OnFail(func(err error) {
remotelogs.Warn("LIST_FILE_DB", "run batch failed: "+err.Error()+" ("+filepath.Base(this.dbPath)+")")
})
@@ -124,14 +124,14 @@ func (this *FileListDB) Open(dbPath string) error {
}
// read db
readDB, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=private&mode=ro&_journal_mode=WAL&_sync=OFF&_cache_size="+types.String(cacheSize))
readDB, err := dbs.OpenReader("file:" + dbPath + "?cache=private&mode=ro&_journal_mode=WAL&_sync=OFF&_cache_size=" + types.String(cacheSize))
if err != nil {
return errors.New("open read database failed: " + err.Error())
}
readDB.SetMaxOpenConns(runtime.NumCPU())
this.readDB = dbs.NewDB(readDB)
this.readDB = readDB
if teaconst.EnableDBStat {
this.readDB.EnableStat(true)

View File

@@ -3,6 +3,7 @@ package caches
import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/lists"
@@ -14,6 +15,10 @@ import (
var SharedManager = NewManager()
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventQuit, func() {
remotelogs.Println("CACHE", "quiting cache manager")
SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{})

View File

@@ -508,7 +508,11 @@ func (this *MemoryStorage) flushItem(key string) {
if !ok {
return
}
if !item.IsDone || item.IsExpired() {
if !item.IsDone {
remotelogs.Error("CACHE", "flush items failed: open writer failed: item has not been done")
return
}
if item.IsExpired() {
return
}

View File

@@ -11,7 +11,7 @@ import (
var sharedBrotliReaderPool *ReaderPool
func init() {
if teaconst.IsDaemon {
if !teaconst.IsMain {
return
}

View File

@@ -11,7 +11,7 @@ import (
var sharedDeflateReaderPool *ReaderPool
func init() {
if teaconst.IsDaemon {
if !teaconst.IsMain {
return
}

View File

@@ -11,7 +11,7 @@ import (
var sharedGzipReaderPool *ReaderPool
func init() {
if teaconst.IsDaemon {
if !teaconst.IsMain {
return
}

View File

@@ -11,7 +11,7 @@ import (
var sharedZSTDReaderPool *ReaderPool
func init() {
if teaconst.IsDaemon {
if !teaconst.IsMain {
return
}

View File

@@ -12,7 +12,7 @@ import (
var sharedBrotliWriterPool *WriterPool
func init() {
if teaconst.IsDaemon {
if !teaconst.IsMain {
return
}

View File

@@ -12,7 +12,7 @@ import (
var sharedDeflateWriterPool *WriterPool
func init() {
if teaconst.IsDaemon {
if !teaconst.IsMain {
return
}

View File

@@ -12,7 +12,7 @@ import (
var sharedGzipWriterPool *WriterPool
func init() {
if teaconst.IsDaemon {
if !teaconst.IsMain {
return
}

View File

@@ -12,7 +12,7 @@ import (
var sharedZSTDWriterPool *WriterPool
func init() {
if teaconst.IsDaemon {
if !teaconst.IsMain {
return
}

View File

@@ -1,7 +1,7 @@
package teaconst
const (
Version = "0.6.0"
Version = "0.6.4"
ProductName = "Edge Node"
ProcessName = "edge-node"

View File

@@ -15,7 +15,7 @@ var (
NodeId int64 = 0
NodeIdString = ""
IsDaemon = len(os.Args) > 1 && os.Args[1] == "daemon"
IsMain = len(os.Args) == 1 || (len(os.Args) >= 2 && os.Args[1] == "pprof")
GlobalProductName = nodeconfigs.DefaultProductName

View File

@@ -9,6 +9,7 @@ import (
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/ddosconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/firewalls/nftables"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
@@ -27,6 +28,10 @@ import (
var SharedDDoSProtectionManager = NewDDoSProtectionManager()
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventReload, func() {
if nftablesInstance == nil {
return

View File

@@ -3,6 +3,7 @@
package firewalls
import (
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"runtime"
@@ -14,6 +15,10 @@ var firewallLocker = &sync.Mutex{}
// 初始化
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventLoaded, func() {
var firewall = Firewall()
if firewall.Name() != "mock" {

View File

@@ -24,7 +24,7 @@ import (
// check nft status, if being enabled we load it automatically
func init() {
if teaconst.IsDaemon {
if !teaconst.IsMain {
return
}

View File

@@ -5,6 +5,7 @@ package nftables
import (
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
@@ -17,6 +18,10 @@ import (
)
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventReload, func() {
// linux only
if runtime.GOOS != "linux" {

View File

@@ -15,7 +15,7 @@ var instanceId = uint64(0)
// New 新创建goroutine
func New(f func()) {
if teaconst.IsDaemon {
if !teaconst.IsMain {
return
}
@@ -47,7 +47,7 @@ func New(f func()) {
// NewWithArgs 创建带有参数的goroutine
func NewWithArgs(f func(args ...interface{}), args ...interface{}) {
if teaconst.IsDaemon {
if !teaconst.IsMain {
return
}

View File

@@ -3,28 +3,27 @@
package iplibrary
import (
"database/sql"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/iwind/TeaGo/Tea"
_ "github.com/mattn/go-sqlite3"
"os"
"path/filepath"
"time"
)
type IPListDB struct {
db *sql.DB
db *dbs.DB
itemTableName string
deleteExpiredItemsStmt *sql.Stmt
deleteItemStmt *sql.Stmt
insertItemStmt *sql.Stmt
selectItemsStmt *sql.Stmt
selectMaxVersionStmt *sql.Stmt
deleteExpiredItemsStmt *dbs.Stmt
deleteItemStmt *dbs.Stmt
insertItemStmt *dbs.Stmt
selectItemsStmt *dbs.Stmt
selectMaxVersionStmt *dbs.Stmt
cleanTicker *time.Ticker
@@ -56,7 +55,7 @@ func (this *IPListDB) init() error {
var path = this.dir + "/ip_list.db"
db, err := sql.Open("sqlite3", "file:"+path+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF")
db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF&_locking_mode=EXCLUSIVE")
if err != nil {
return err
}

View File

@@ -20,7 +20,7 @@ var SharedIPListManager = NewIPListManager()
var IPListUpdateNotify = make(chan bool, 1)
func init() {
if teaconst.IsDaemon {
if !teaconst.IsMain {
return
}

View File

@@ -4,6 +4,7 @@ package metrics
import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"strconv"
@@ -13,6 +14,10 @@ import (
var SharedManager = NewManager()
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventQuit, func() {
SharedManager.Quit()
})

View File

@@ -3,7 +3,6 @@
package metrics
import (
"database/sql"
"encoding/json"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
@@ -17,7 +16,6 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/types"
_ "github.com/mattn/go-sqlite3"
"os"
"strconv"
"sync"
@@ -50,11 +48,11 @@ type Task struct {
cleanVersion int32
insertStatStmt *sql.Stmt
deleteByVersionStmt *sql.Stmt
deleteByExpiresTimeStmt *sql.Stmt
selectTopStmt *sql.Stmt
sumStmt *sql.Stmt
insertStatStmt *dbs.Stmt
deleteByVersionStmt *dbs.Stmt
deleteByExpiresTimeStmt *dbs.Stmt
selectTopStmt *dbs.Stmt
sumStmt *dbs.Stmt
serverIdMap map[int64]zero.Zero // 所有的服务Ids
timeMap map[string]zero.Zero // time => bool
@@ -92,12 +90,12 @@ func (this *Task) Init() error {
var path = dir + "/metric." + types.String(this.item.Id) + ".db"
db, err := sql.Open("sqlite3", "file:"+path+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF")
db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF&_locking_mode=EXCLUSIVE")
if err != nil {
return err
}
db.SetMaxOpenConns(1)
this.db = dbs.NewDB(db)
this.db = db
// 恢复数据库
var recoverEnv, _ = os.LookupEnv("EdgeRecover")

View File

@@ -5,6 +5,7 @@ package monitor
import (
"encoding/json"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
@@ -16,6 +17,10 @@ import (
var SharedValueQueue = NewValueQueue()
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventLoaded, func() {
goman.New(func() {
SharedValueQueue.Start()

View File

@@ -42,7 +42,11 @@ type ClientConn struct {
lastErr error
readDeadlineTime int64
isShortReading bool // header or handshake
isShortReading bool // reading header or tls handshake
isDebugging bool
autoReadTimeout bool
autoWriteTimeout bool
}
func NewClientConn(rawConn net.Conn, isHTTP bool, isTLS bool, isInAllowList bool) net.Conn {
@@ -59,6 +63,14 @@ func NewClientConn(rawConn net.Conn, isHTTP bool, isTLS bool, isInAllowList bool
createdAt: time.Now().Unix(),
}
var globalServerConfig = sharedNodeConfig.GlobalServerConfig
if globalServerConfig != nil {
var performanceConfig = globalServerConfig.Performance
conn.isDebugging = performanceConfig.Debug
conn.autoReadTimeout = performanceConfig.AutoReadTimeout
conn.autoWriteTimeout = performanceConfig.AutoWriteTimeout
}
if isHTTP {
// TODO 可以在配置中设置此值
_ = conn.SetLinger(nodeconfigs.DefaultTCPLinger)
@@ -71,14 +83,14 @@ func NewClientConn(rawConn net.Conn, isHTTP bool, isTLS bool, isInAllowList bool
}
func (this *ClientConn) Read(b []byte) (n int, err error) {
var globalServerConfig = sharedNodeConfig.GlobalServerConfig
if globalServerConfig != nil && globalServerConfig.Performance.Debug {
if this.isDebugging {
this.lastReadAt = time.Now().Unix()
defer func() {
if err != nil {
this.lastErr = errors.New("read error: " + err.Error())
} else {
this.lastErr = nil
}
}()
}
@@ -93,8 +105,7 @@ func (this *ClientConn) Read(b []byte) (n int, err error) {
}
// 设置读超时时间
var autoReadTimeout = globalServerConfig != nil && globalServerConfig.Performance.AutoReadTimeout
if this.isHTTP && !this.isWebsocket && !this.isShortReading && autoReadTimeout {
if this.isHTTP && !this.isPersistent && !this.isShortReading && this.autoReadTimeout {
this.setHTTPReadTimeout()
}
@@ -134,20 +145,24 @@ func (this *ClientConn) Read(b []byte) (n int, err error) {
}
func (this *ClientConn) Write(b []byte) (n int, err error) {
var globalServerConfig = sharedNodeConfig.GlobalServerConfig
if len(b) == 0 {
return 0, nil
}
if globalServerConfig != nil && globalServerConfig.Performance.Debug {
if this.isDebugging {
this.lastWriteAt = time.Now().Unix()
defer func() {
if err != nil {
this.lastErr = errors.New("write error: " + err.Error())
} else {
this.lastErr = nil
}
}()
}
// 设置写超时时间
if globalServerConfig != nil && globalServerConfig.Performance.AutoWriteTimeout {
if this.autoWriteTimeout {
// TODO L2 -> L1 写入时不限制时间
var timeoutSeconds = len(b) / 1024
if timeoutSeconds < 3 {
@@ -157,18 +172,25 @@ func (this *ClientConn) Write(b []byte) (n int, err error) {
}
// 延长读超时时间
if this.isHTTP && !this.isWebsocket && globalServerConfig != nil && globalServerConfig.Performance.AutoReadTimeout {
if this.isHTTP && !this.isPersistent && this.autoReadTimeout {
this.setHTTPReadTimeout()
}
// 开始写入
var before = time.Now()
n, err = this.rawConn.Write(b)
if n > 0 {
// 统计当前服务带宽
if this.serverId > 0 {
if !this.isLO || Tea.IsTesting() { // 环路不统计带宽,避免缓存预热等行为产生带宽
atomic.AddUint64(&teaconst.OutTrafficBytes, uint64(n))
stats.SharedBandwidthStatManager.Add(this.userId, this.serverId, int64(n))
var cost = time.Since(before).Seconds()
if cost > 1 {
stats.SharedBandwidthStatManager.Add(this.userId, this.serverId, int64(float64(n)/cost), int64(n))
} else {
stats.SharedBandwidthStatManager.Add(this.userId, this.serverId, int64(n), int64(n))
}
}
}
}
@@ -216,8 +238,7 @@ func (this *ClientConn) SetDeadline(t time.Time) error {
func (this *ClientConn) SetReadDeadline(t time.Time) error {
// 如果开启了HTTP自动读超时选项则自动控制超时时间
var globalServerConfig = sharedNodeConfig.GlobalServerConfig
if this.isHTTP && !this.isWebsocket && globalServerConfig != nil && globalServerConfig.Performance.AutoReadTimeout {
if this.isHTTP && !this.isPersistent && this.autoReadTimeout {
this.isShortReading = false
var unixTime = t.Unix()

View File

@@ -16,7 +16,8 @@ type BaseClientConn struct {
remoteAddr string
hasLimit bool
isWebsocket bool
isPersistent bool // 是否为持久化连接
fingerprint []byte
isClosed bool
@@ -125,6 +126,16 @@ func (this *BaseClientConn) SetLinger(seconds int) error {
return nil
}
func (this *BaseClientConn) SetIsWebsocket(isWebsocket bool) {
this.isWebsocket = isWebsocket
func (this *BaseClientConn) SetIsPersistent(isPersistent bool) {
this.isPersistent = isPersistent
}
// SetFingerprint 设置指纹信息
func (this *BaseClientConn) SetFingerprint(fingerprint []byte) {
this.fingerprint = fingerprint
}
// Fingerprint 读取指纹信息
func (this *BaseClientConn) Fingerprint() []byte {
return this.fingerprint
}

View File

@@ -24,6 +24,12 @@ type ClientConnInterface interface {
// UserId 获取当前连接所属服务的用户ID
UserId() int64
// SetIsWebsocket 设置是否为Websocket
SetIsWebsocket(isWebsocket bool)
// SetIsPersistent 设置是否为持久化
SetIsPersistent(isPersistent bool)
// SetFingerprint 设置指纹信息
SetFingerprint(fingerprint []byte)
// Fingerprint 读取指纹信息
Fingerprint() []byte
}

View File

@@ -15,6 +15,10 @@ import (
// 发送监控流量
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventStart, func() {
var ticker = time.NewTicker(1 * time.Minute)
goman.New(func() {

View File

@@ -55,3 +55,30 @@ func (this *ClientTLSConn) SetReadDeadline(t time.Time) error {
func (this *ClientTLSConn) SetWriteDeadline(t time.Time) error {
return this.rawConn.SetWriteDeadline(t)
}
func (this *ClientTLSConn) SetIsPersistent(isPersistent bool) {
tlsConn, ok := this.rawConn.(*tls.Conn)
if ok {
var rawConn = tlsConn.NetConn()
if rawConn != nil {
clientConn, ok := rawConn.(*ClientConn)
if ok {
clientConn.SetIsPersistent(isPersistent)
}
}
}
}
func (this *ClientTLSConn) Fingerprint() []byte {
tlsConn, ok := this.rawConn.(*tls.Conn)
if ok {
var rawConn = tlsConn.NetConn()
if rawConn != nil {
clientConn, ok := rawConn.(*ClientConn)
if ok {
return clientConn.fingerprint
}
}
}
return nil
}

View File

@@ -25,9 +25,12 @@ type HTTPAccessLogQueue struct {
// NewHTTPAccessLogQueue 获取新对象
func NewHTTPAccessLogQueue() *HTTPAccessLogQueue {
// 队列中最大的值,超出此数量的访问日志会被丢弃
// TODO 需要可以在界面中设置
maxSize := 20000
queue := &HTTPAccessLogQueue{
var maxSize = 2_000 * (1 + utils.SystemMemoryGB()/2)
if maxSize > 20_000 {
maxSize = 20_000
}
var queue = &HTTPAccessLogQueue{
queue: make(chan *pb.HTTPAccessLog, maxSize),
}
goman.New(func() {

View File

@@ -9,6 +9,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/compressions"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
@@ -23,6 +24,10 @@ import (
)
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventStart, func() {
goman.New(func() {
SharedHTTPCacheTaskManager.Start()

View File

@@ -221,6 +221,18 @@ func (this *HTTPRequest) Do() {
}
}
// CC
if !isHealthCheck {
if this.web.CC != nil {
if this.web.CC.IsOn {
if this.doCC() {
this.doEnd()
return
}
}
}
}
// WAF
if this.web.FirewallRef != nil && this.web.FirewallRef.IsOn {
if this.doWAFRequest() {
@@ -572,6 +584,11 @@ func (this *HTTPRequest) configureWeb(web *serverconfigs.HTTPWebConfig, isTop bo
this.web.UAM = web.UAM
}
// CC
if web.CC != nil && (web.CC.IsPrior || isTop) {
this.web.CC = web.CC
}
// 重写规则
if len(web.RewriteRefs) > 0 {
for index, ref := range web.RewriteRefs {
@@ -728,6 +745,8 @@ func (this *HTTPRequest) Format(source string) string {
return this.Path()
case "requestPathExtension":
return filepath.Ext(this.Path())
case "requestPathLowerExtension":
return strings.ToLower(filepath.Ext(this.Path()))
case "requestLength":
return strconv.FormatInt(this.requestLength(), 10)
case "requestTime":
@@ -841,7 +860,7 @@ func (this *HTTPRequest) Format(source string) string {
}
// response.xxx.xxx
dotIndex := strings.Index(suffix, ".")
dotIndex = strings.Index(suffix, ".")
if dotIndex < 0 {
return "${" + varName + "}"
}

View File

@@ -0,0 +1,8 @@
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
//go:build !plus
package nodes
func (this *HTTPRequest) doCC() (block bool) {
return
}

View File

@@ -73,6 +73,16 @@ func (this *HTTPRequest) doFastcgi() (shouldStop bool) {
}
}
// 设置为持久化连接
var requestConn = this.RawReq.Context().Value(HTTPConnContextKey)
if requestConn == nil {
return
}
requestClientConn, ok := requestConn.(ClientConnInterface)
if ok {
requestClientConn.SetIsPersistent(true)
}
// 连接池配置
poolSize := fastcgi.PoolSize
if poolSize <= 0 {

View File

@@ -146,6 +146,13 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
u.Status = http.StatusTemporaryRedirect
}
this.processResponseHeaders(this.writer.Header(), u.Status)
// 参数
var qIndex = strings.Index(this.uri, "?")
if qIndex >= 0 {
afterURL += this.uri[qIndex:]
}
http.Redirect(this.RawWriter, this.RawReq, afterURL, u.Status)
return true
}

View File

@@ -67,7 +67,7 @@ func (this *HTTPRequest) doWAFRequest() (blocked bool) {
// 当前服务的独立设置
if this.web.FirewallPolicy != nil && this.web.FirewallPolicy.IsOn {
blocked, breakChecking := this.checkWAFRequest(this.web.FirewallPolicy, forceLog, forceLogRequestBody, forceLogRegionDenying)
blocked, breakChecking := this.checkWAFRequest(this.web.FirewallPolicy, forceLog, forceLogRequestBody, forceLogRegionDenying, false)
if blocked {
return true
}
@@ -78,7 +78,7 @@ func (this *HTTPRequest) doWAFRequest() (blocked bool) {
// 公用的防火墙设置
if this.ReqServer.HTTPFirewallPolicy != nil && this.ReqServer.HTTPFirewallPolicy.IsOn {
blocked, breakChecking := this.checkWAFRequest(this.ReqServer.HTTPFirewallPolicy, forceLog, forceLogRequestBody, forceLogRegionDenying)
blocked, breakChecking := this.checkWAFRequest(this.ReqServer.HTTPFirewallPolicy, forceLog, forceLogRequestBody, forceLogRegionDenying, this.web.FirewallRef.IgnoreGlobalRules)
if blocked {
return true
}
@@ -90,7 +90,7 @@ func (this *HTTPRequest) doWAFRequest() (blocked bool) {
return
}
func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFirewallPolicy, forceLog bool, logRequestBody bool, logDenying bool) (blocked bool, breakChecking bool) {
func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFirewallPolicy, forceLog bool, logRequestBody bool, logDenying bool, ignoreRules bool) (blocked bool, breakChecking bool) {
// 检查配置是否为空
if firewallPolicy == nil || !firewallPolicy.IsOn || firewallPolicy.Inbound == nil || !firewallPolicy.Inbound.IsOn || firewallPolicy.Mode == firewallconfigs.FirewallModeBypass {
return
@@ -211,8 +211,13 @@ func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFir
}
}
// 是否执行规则
if ignoreRules {
return
}
// 规则测试
w := waf.SharedWAFManager.FindWAF(firewallPolicy.Id)
var w = waf.SharedWAFManager.FindWAF(firewallPolicy.Id)
if w == nil {
return
}
@@ -267,7 +272,7 @@ func (this *HTTPRequest) doWAFResponse(resp *http.Response) (blocked bool) {
}
if this.web.FirewallPolicy != nil && this.web.FirewallPolicy.IsOn {
blocked := this.checkWAFResponse(this.web.FirewallPolicy, resp, forceLog, forceLogRequestBody)
blocked := this.checkWAFResponse(this.web.FirewallPolicy, resp, forceLog, forceLogRequestBody, false)
if blocked {
return true
}
@@ -275,7 +280,7 @@ func (this *HTTPRequest) doWAFResponse(resp *http.Response) (blocked bool) {
// 公用的防火墙设置
if this.ReqServer.HTTPFirewallPolicy != nil && this.ReqServer.HTTPFirewallPolicy.IsOn {
blocked := this.checkWAFResponse(this.ReqServer.HTTPFirewallPolicy, resp, forceLog, forceLogRequestBody)
blocked := this.checkWAFResponse(this.ReqServer.HTTPFirewallPolicy, resp, forceLog, forceLogRequestBody, this.web.FirewallRef.IgnoreGlobalRules)
if blocked {
return true
}
@@ -283,12 +288,17 @@ func (this *HTTPRequest) doWAFResponse(resp *http.Response) (blocked bool) {
return
}
func (this *HTTPRequest) checkWAFResponse(firewallPolicy *firewallconfigs.HTTPFirewallPolicy, resp *http.Response, forceLog bool, logRequestBody bool) (blocked bool) {
func (this *HTTPRequest) checkWAFResponse(firewallPolicy *firewallconfigs.HTTPFirewallPolicy, resp *http.Response, forceLog bool, logRequestBody bool, ignoreRules bool) (blocked bool) {
if firewallPolicy == nil || !firewallPolicy.IsOn || !firewallPolicy.Outbound.IsOn || firewallPolicy.Mode == firewallconfigs.FirewallModeBypass {
return
}
w := waf.SharedWAFManager.FindWAF(firewallPolicy.Id)
// 是否执行规则
if ignoreRules {
return
}
var w = waf.SharedWAFManager.FindWAF(firewallPolicy.Id)
if w == nil {
return
}
@@ -392,3 +402,22 @@ func (this *HTTPRequest) WAFOnAction(action interface{}) (goNext bool) {
}
return true
}
func (this *HTTPRequest) WAFFingerprint() []byte {
// 目前只有HTTPS请求才有指纹
if !this.IsHTTPS {
return nil
}
var requestConn = this.RawReq.Context().Value(HTTPConnContextKey)
if requestConn == nil {
return nil
}
clientConn, ok := requestConn.(ClientConnInterface)
if ok {
return clientConn.Fingerprint()
}
return nil
}

View File

@@ -111,7 +111,7 @@ func (this *HTTPRequest) doWebsocket(requestHost string, isLastRetry bool) (shou
requestClientConn, ok := requestConn.(ClientConnInterface)
if ok {
requestClientConn.SetIsWebsocket(true)
requestClientConn.SetIsPersistent(true)
}
clientConn, _, err := this.writer.Hijack()

View File

@@ -10,6 +10,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/compressions"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/readers"
@@ -39,6 +40,10 @@ var webpMaxBufferSize int64 = 1_000_000_000
var webpTotalBufferSize int64 = 0
func init() {
if !teaconst.IsMain {
return
}
var systemMemory = utils.SystemMemoryGB() / 8
if systemMemory > 0 {
webpMaxBufferSize = int64(systemMemory) * 1024 * 1024 * 1024
@@ -99,6 +104,18 @@ func NewHTTPWriter(req *HTTPRequest, httpResponseWriter http.ResponseWriter) *HT
// Prepare 准备输出
func (this *HTTPWriter) Prepare(resp *http.Response, size int64, status int, enableCache bool) (delayHeaders bool) {
// 清理以前数据,防止重试时发生异常错误
if this.compressionCacheWriter != nil {
_ = this.compressionCacheWriter.Discard()
this.compressionCacheWriter = nil
}
if this.cacheWriter != nil {
_ = this.cacheWriter.Discard()
this.cacheWriter = nil
}
// 新的请求相关数据
this.size = size
this.statusCode = status
@@ -315,6 +332,12 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) {
}
}
}
// 先清理以前的
if this.cacheWriter != nil {
_ = this.cacheWriter.Discard()
}
cacheWriter, err := storage.OpenWriter(cacheKey, expiresAt, this.StatusCode(), this.calculateHeaderLength(), totalSize, cacheRef.MaxSizeBytes(), this.isPartial)
if err != nil {
if err == caches.ErrEntityTooLarge && addStatusHeader {
@@ -691,6 +714,9 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) {
}
if compressionCacheWriter != nil {
if this.compressionCacheWriter != nil {
_ = this.compressionCacheWriter.Close()
}
this.compressionCacheWriter = compressionCacheWriter
var teeWriter = writers.NewTeeWriterCloser(this.writer, compressionCacheWriter)
teeWriter.OnFail(func(err error) {

View File

@@ -36,6 +36,15 @@ func (this *BaseListener) buildTLSConfig() *tls.Config {
return &tls.Config{
Certificates: nil,
GetConfigForClient: func(clientInfo *tls.ClientHelloInfo) (config *tls.Config, e error) {
// 指纹信息
var fingerprint = this.calculateFingerprint(clientInfo)
if len(fingerprint) > 0 {
clientConn, ok := clientInfo.Conn.(ClientConnInterface)
if ok {
clientConn.SetFingerprint(fingerprint)
}
}
tlsPolicy, _, err := this.matchSSL(this.helloServerName(clientInfo))
if err != nil {
return nil, err
@@ -50,6 +59,15 @@ func (this *BaseListener) buildTLSConfig() *tls.Config {
return tlsPolicy.TLSConfig(), nil
},
GetCertificate: func(clientInfo *tls.ClientHelloInfo) (certificate *tls.Certificate, e error) {
// 指纹信息
var fingerprint = this.calculateFingerprint(clientInfo)
if len(fingerprint) > 0 {
clientConn, ok := clientInfo.Conn.(ClientConnInterface)
if ok {
clientConn.SetFingerprint(fingerprint)
}
}
tlsPolicy, cert, err := this.matchSSL(this.helloServerName(clientInfo))
if err != nil {
return nil, err

View File

@@ -0,0 +1,10 @@
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
//go:build !plus
package nodes
import "crypto/tls"
func (this *BaseListener) calculateFingerprint(clientInfo *tls.ClientHelloInfo) []byte {
return nil
}

View File

@@ -226,14 +226,21 @@ func (this *HTTPListener) emptyServer() *serverconfigs.ServerConfig {
Type: serverconfigs.ServerTypeHTTPProxy,
}
var accessLogRef = serverconfigs.NewHTTPAccessLogRef()
// TODO 需要配置是否记录日志
accessLogRef.IsOn = true
accessLogRef.Fields = append([]int{}, serverconfigs.HTTPAccessLogDefaultFieldsCodes...)
server.Web = &serverconfigs.HTTPWebConfig{
IsOn: true,
AccessLogRef: accessLogRef,
// 检查是否开启访问日志
if sharedNodeConfig != nil {
var globalServerConfig = sharedNodeConfig.GlobalServerConfig
if globalServerConfig != nil && globalServerConfig.HTTPAccessLog.EnableServerNotFound {
var accessLogRef = serverconfigs.NewHTTPAccessLogRef()
accessLogRef.IsOn = true
accessLogRef.Fields = append([]int{}, serverconfigs.HTTPAccessLogDefaultFieldsCodes...)
server.Web = &serverconfigs.HTTPWebConfig{
IsOn: true,
AccessLogRef: accessLogRef,
}
}
}
// TODO 需要对访问频率过多的IP进行惩罚
return server
}

View File

@@ -4,6 +4,7 @@ import (
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/firewalls"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
@@ -23,7 +24,15 @@ import (
"time"
)
var sharedListenerManager = NewListenerManager()
var sharedListenerManager *ListenerManager
func init() {
if !teaconst.IsMain {
return
}
sharedListenerManager = NewListenerManager()
}
// ListenerManager 端口监听管理器
type ListenerManager struct {

View File

@@ -404,7 +404,7 @@ func NewUDPConn(server *serverconfigs.ServerConfig, addr net.Addr, proxyListener
stats.SharedTrafficStatManager.Add(server.Id, "", int64(n), 0, 0, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId())
// 带宽
stats.SharedBandwidthStatManager.Add(server.UserId, server.Id, int64(n))
stats.SharedBandwidthStatManager.Add(server.UserId, server.Id, int64(n), int64(n))
}
}
if err != nil {

View File

@@ -236,8 +236,6 @@ func (this *Node) Start() {
// Daemon 实现守护进程
func (this *Node) Daemon() {
teaconst.IsDaemon = true
var isDebug = lists.ContainsString(os.Args, "debug")
for {
conn, err := this.sock.Dial()

View File

@@ -8,6 +8,8 @@ import (
"github.com/iwind/TeaGo/maps"
"github.com/shirou/gopsutil/v3/load"
"github.com/shirou/gopsutil/v3/mem"
"runtime"
"runtime/debug"
)
// 更新内存
@@ -31,6 +33,18 @@ func (this *NodeStatusExecutor) updateMem(status *nodeconfigs.NodeStatus) {
"total": status.MemoryTotal,
"used": stat.Used,
})
// 内存严重不足时自动释放内存
if stat.Total > 0 {
var minFreeMemory = stat.Total / 8
if minFreeMemory > 1<<30 {
minFreeMemory = 1 << 30
}
if stat.Free < minFreeMemory {
runtime.GC()
debug.FreeOSMemory()
}
}
}
// 更新负载

View File

@@ -4,6 +4,7 @@ package nodes
import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
@@ -16,6 +17,10 @@ import (
var SharedOriginStateManager = NewOriginStateManager()
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventLoaded, func() {
goman.New(func() {
SharedOriginStateManager.Start()

View File

@@ -19,6 +19,10 @@ import (
)
func init() {
if !teaconst.IsMain {
return
}
var manager = NewSystemServiceManager()
events.On(events.EventReload, func() {
goman.New(func() {

View File

@@ -4,6 +4,7 @@ package nodes
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
@@ -15,6 +16,10 @@ import (
var sharedOCSPTask = NewOCSPUpdateTask()
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventLoaded, func() {
sharedOCSPTask.version = sharedNodeConfig.OCSPVersion

View File

@@ -3,6 +3,7 @@ package nodes
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/configs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
@@ -16,6 +17,10 @@ import (
var sharedSyncAPINodesTask = NewSyncAPINodesTask()
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventStart, func() {
goman.New(func() {
sharedSyncAPINodesTask.Start()

View File

@@ -3,6 +3,7 @@ package nodes
import (
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
@@ -17,6 +18,10 @@ import (
var sharedTOAManager = NewTOAManager()
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventReload, func() {
err := sharedTOAManager.Run(sharedNodeConfig.TOA)
if err != nil {

View File

@@ -20,6 +20,10 @@ import (
var logChan = make(chan *pb.NodeLog, 64) // 队列数量不需要太长,因为日志通常仅仅为调试用
func init() {
if !teaconst.IsMain {
return
}
// 定期上传日志
var ticker = time.NewTicker(60 * time.Second)
if Tea.IsTesting() {

View File

@@ -5,6 +5,7 @@ package stats
import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
@@ -21,6 +22,10 @@ var SharedBandwidthStatManager = NewBandwidthStatManager()
const bandwidthTimestampDelim = 2 // N秒平均更为精确
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventLoaded, func() {
goman.New(func() {
SharedBandwidthStatManager.Start()
@@ -37,6 +42,7 @@ type BandwidthStat struct {
CurrentBytes int64
CurrentTimestamp int64
MaxBytes int64
TotalBytes int64
}
// BandwidthStatManager 服务带宽统计
@@ -107,6 +113,7 @@ func (this *BandwidthStatManager) Loop() error {
Day: stat.Day,
TimeAt: stat.TimeAt,
Bytes: stat.MaxBytes / bandwidthTimestampDelim,
TotalBytes: stat.TotalBytes,
NodeRegionId: regionId,
})
delete(this.m, key)
@@ -132,8 +139,8 @@ func (this *BandwidthStatManager) Loop() error {
}
// Add 添加带宽数据
func (this *BandwidthStatManager) Add(userId int64, serverId int64, bytes int64) {
if serverId <= 0 || bytes == 0 {
func (this *BandwidthStatManager) Add(userId int64, serverId int64, peekBytes int64, totalBytes int64) {
if serverId <= 0 || (peekBytes == 0 && totalBytes == 0) {
return
}
@@ -146,8 +153,8 @@ func (this *BandwidthStatManager) Add(userId int64, serverId int64, bytes int64)
// 增加TCP Header尺寸这里默认MTU为1500且默认为IPv4
const mtu = 1500
const tcpHeaderSize = 20
if bytes > mtu {
bytes += bytes * tcpHeaderSize / mtu
if peekBytes > mtu {
peekBytes += peekBytes * tcpHeaderSize / mtu
}
this.locker.Lock()
@@ -156,22 +163,25 @@ func (this *BandwidthStatManager) Add(userId int64, serverId int64, bytes int64)
// 此刻如果发生用户IDuserId的变化也忽略等N分钟后有新记录后再换
if stat.CurrentTimestamp == timestamp {
stat.CurrentBytes += bytes
stat.CurrentBytes += peekBytes
} else {
stat.CurrentBytes = bytes
stat.CurrentBytes = peekBytes
stat.CurrentTimestamp = timestamp
}
if stat.CurrentBytes > stat.MaxBytes {
stat.MaxBytes = stat.CurrentBytes
}
stat.TotalBytes += totalBytes
} else {
this.m[key] = &BandwidthStat{
Day: day,
TimeAt: timeAt,
UserId: userId,
ServerId: serverId,
CurrentBytes: bytes,
MaxBytes: bytes,
CurrentBytes: peekBytes,
MaxBytes: peekBytes,
TotalBytes: totalBytes,
CurrentTimestamp: timestamp,
}
}

View File

@@ -10,22 +10,22 @@ import (
func TestBandwidthStatManager_Add(t *testing.T) {
var manager = stats.NewBandwidthStatManager()
manager.Add(1, 1, 10)
manager.Add(1, 1, 10)
manager.Add(1, 1, 10)
manager.Add(1, 1, 10, 10)
manager.Add(1, 1, 10, 10)
manager.Add(1, 1, 10, 10)
time.Sleep(1 * time.Second)
manager.Add(1, 1, 85)
manager.Add(1, 1, 85, 85)
time.Sleep(1 * time.Second)
manager.Add(1, 1, 25)
manager.Add(1, 1, 75)
manager.Add(1, 1, 25, 25)
manager.Add(1, 1, 75, 75)
manager.Inspect()
}
func TestBandwidthStatManager_Loop(t *testing.T) {
var manager = stats.NewBandwidthStatManager()
manager.Add(1, 1, 10)
manager.Add(1, 1, 10)
manager.Add(1, 1, 10)
manager.Add(1, 1, 10, 10)
manager.Add(1, 1, 10, 10)
manager.Add(1, 1, 10, 10)
err := manager.Loop()
if err != nil {
t.Fatal(err)

View File

@@ -17,8 +17,10 @@ import (
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time"
"sort"
"strconv"
"strings"
"sync"
"time"
)
@@ -45,7 +47,13 @@ type HTTPRequestStatManager struct {
dailyFirewallRuleGroupMap map[string]int64 // serverId@firewallRuleGroupId@action => count
serverCityCountMap map[string]int16 // serverIdString => count cities
serverSystemCountMap map[string]int16 // serverIdString => count systems
serverBrowserCountMap map[string]int16 // serverIdString => count browsers
totalAttackRequests int64
locker sync.Mutex
}
// NewHTTPRequestStatManager 获取新对象
@@ -59,6 +67,10 @@ func NewHTTPRequestStatManager() *HTTPRequestStatManager {
systemMap: map[string]int64{},
browserMap: map[string]int64{},
dailyFirewallRuleGroupMap: map[string]int64{},
serverCityCountMap: map[string]int16{},
serverSystemCountMap: map[string]int16{},
serverBrowserCountMap: map[string]int16{},
}
}
@@ -78,7 +90,6 @@ func (this *HTTPRequestStatManager) Start() {
}
})
var loopTicker = time.NewTicker(1 * time.Second)
var uploadTicker = time.NewTicker(30 * time.Minute)
if Tea.IsTesting() {
uploadTicker = time.NewTicker(10 * time.Second) // 在测试环境下缩短Ticker时间以方便我们调试
@@ -86,20 +97,12 @@ func (this *HTTPRequestStatManager) Start() {
remotelogs.Println("HTTP_REQUEST_STAT_MANAGER", "start ...")
events.OnKey(events.EventQuit, this, func() {
remotelogs.Println("HTTP_REQUEST_STAT_MANAGER", "quit")
loopTicker.Stop()
uploadTicker.Stop()
})
for range loopTicker.C {
err := this.Loop()
if err != nil {
if rpc.IsConnError(err) {
remotelogs.Warn("HTTP_REQUEST_STAT_MANAGER", err.Error())
} else {
remotelogs.Error("HTTP_REQUEST_STAT_MANAGER", err.Error())
}
}
select {
case <-uploadTicker.C:
// 上传Ticker
goman.New(func() {
for range uploadTicker.C {
var tr = trackers.Begin("UPLOAD_REQUEST_STATS")
err := this.Upload()
tr.End()
@@ -110,9 +113,20 @@ func (this *HTTPRequestStatManager) Start() {
remotelogs.Warn("HTTP_REQUEST_STAT_MANAGER", "upload failed: "+err.Error())
}
}
default:
}
})
// 分析Ticker
for {
err := this.Loop()
if err != nil {
if rpc.IsConnError(err) {
remotelogs.Warn("HTTP_REQUEST_STAT_MANAGER", err.Error())
} else {
remotelogs.Error("HTTP_REQUEST_STAT_MANAGER", err.Error())
}
}
}
}
@@ -148,7 +162,7 @@ func (this *HTTPRequestStatManager) AddRemoteAddr(serverId int64, remoteAddr str
// AddUserAgent 添加UserAgent
func (this *HTTPRequestStatManager) AddUserAgent(serverId int64, userAgent string, ip string) {
if len(userAgent) == 0 {
if len(userAgent) == 0 || strings.ContainsRune(userAgent, '@') /** 非常重要,防止后面组合字符串时出现异常 **/ {
return
}
@@ -183,75 +197,101 @@ func (this *HTTPRequestStatManager) AddFirewallRuleGroupId(serverId int64, firew
// Loop 单个循环
func (this *HTTPRequestStatManager) Loop() error {
var timeout = time.NewTimer(10 * time.Minute) // 执行的最大时间
Loop:
for {
select {
case ipString := <-this.ipChan:
// serverId@ip@bytes@isAttack
var pieces = strings.Split(ipString, "@")
if len(pieces) < 4 {
continue
}
var serverId = pieces[0]
var ip = pieces[1]
var result = iplib.LookupIP(ip)
if result != nil && result.IsOk() {
var key = serverId + "@" + result.CountryName() + "@" + result.ProvinceName() + "@" + result.CityName()
stat, ok := this.cityMap[key]
if !ok {
stat = &StatItem{}
this.cityMap[key] = stat
}
stat.Bytes += types.Int64(pieces[2])
stat.CountRequests++
if types.Int8(pieces[3]) == 1 {
stat.AttackBytes += types.Int64(pieces[2])
stat.CountAttackRequests++
}
if len(result.ProviderName()) > 0 {
this.providerMap[serverId+"@"+result.ProviderName()]++
}
}
case userAgentString := <-this.userAgentChan:
var atIndex = strings.Index(userAgentString, "@")
if atIndex < 0 {
continue
}
var serverId = userAgentString[:atIndex]
var userAgent = userAgentString[atIndex+1:]
var result = SharedUserAgentParser.Parse(userAgent)
var osInfo = result.OS
if len(osInfo.Name) > 0 {
dotIndex := strings.Index(osInfo.Version, ".")
if dotIndex > -1 {
osInfo.Version = osInfo.Version[:dotIndex]
}
this.systemMap[serverId+"@"+osInfo.Name+"@"+osInfo.Version]++
}
var browser, browserVersion = result.BrowserName, result.BrowserVersion
if len(browser) > 0 {
dotIndex := strings.Index(browserVersion, ".")
if dotIndex > -1 {
browserVersion = browserVersion[:dotIndex]
}
this.browserMap[serverId+"@"+browser+"@"+browserVersion]++
}
case firewallRuleGroupString := <-this.firewallRuleGroupChan:
this.dailyFirewallRuleGroupMap[firewallRuleGroupString]++
case <-timeout.C:
break Loop
default:
break Loop
select {
case ipString := <-this.ipChan:
// serverId@ip@bytes@isAttack
var pieces = strings.Split(ipString, "@")
if len(pieces) < 4 {
return nil
}
}
var serverId = pieces[0]
var ip = pieces[1]
timeout.Stop()
var result = iplib.LookupIP(ip)
if result != nil && result.IsOk() {
var key = serverId + "@" + types.String(result.CountryId()) + "@" + types.String(result.ProvinceId()) + "@" + types.String(result.CityId())
this.locker.Lock()
stat, ok := this.cityMap[key]
if !ok {
// 检查数量
if this.serverCityCountMap[key] > 128 { // 限制单个服务的城市数量,防止数量过多
this.locker.Unlock()
return nil
}
this.serverCityCountMap[key]++ // 需要放在限制之后因为使用的是int16
stat = &StatItem{}
this.cityMap[key] = stat
}
stat.Bytes += types.Int64(pieces[2])
stat.CountRequests++
if types.Int8(pieces[3]) == 1 {
stat.AttackBytes += types.Int64(pieces[2])
stat.CountAttackRequests++
}
if result.ProviderId() > 0 {
this.providerMap[serverId+"@"+types.String(result.ProviderId())]++
}
this.locker.Unlock()
}
case userAgentString := <-this.userAgentChan:
var atIndex = strings.Index(userAgentString, "@")
if atIndex < 0 {
return nil
}
var serverId = userAgentString[:atIndex]
var userAgent = userAgentString[atIndex+1:]
var result = SharedUserAgentParser.Parse(userAgent)
var osInfo = result.OS
if len(osInfo.Name) > 0 {
dotIndex := strings.Index(osInfo.Version, ".")
if dotIndex > -1 {
osInfo.Version = osInfo.Version[:dotIndex]
}
this.locker.Lock()
var systemKey = serverId + "@" + osInfo.Name + "@" + osInfo.Version
_, ok := this.systemMap[systemKey]
if !ok {
if this.serverSystemCountMap[serverId] < 128 { // 限制最大数据,防止攻击
this.serverSystemCountMap[serverId]++
ok = true
}
}
if ok {
this.systemMap[systemKey]++
}
this.locker.Unlock()
}
var browser, browserVersion = result.BrowserName, result.BrowserVersion
if len(browser) > 0 {
dotIndex := strings.Index(browserVersion, ".")
if dotIndex > -1 {
browserVersion = browserVersion[:dotIndex]
}
this.locker.Lock()
var browserKey = serverId + "@" + browser + "@" + browserVersion
_, ok := this.browserMap[browserKey]
if !ok {
if this.serverBrowserCountMap[serverId] < 256 { // 限制最大数据,防止攻击
this.serverBrowserCountMap[serverId]++
ok = true
}
}
if ok {
this.browserMap[browserKey]++
}
this.locker.Unlock()
}
case firewallRuleGroupString := <-this.firewallRuleGroupChan:
this.locker.Lock()
this.dailyFirewallRuleGroupMap[firewallRuleGroupString]++
this.locker.Unlock()
}
return nil
}
@@ -264,54 +304,178 @@ func (this *HTTPRequestStatManager) Upload() error {
return err
}
// 月份相关
// 拷贝数据
this.locker.Lock()
var cityMap = this.cityMap
var providerMap = this.providerMap
var systemMap = this.systemMap
var browserMap = this.browserMap
var dailyFirewallRuleGroupMap = this.dailyFirewallRuleGroupMap
this.cityMap = map[string]*StatItem{}
this.providerMap = map[string]int64{}
this.systemMap = map[string]int64{}
this.browserMap = map[string]int64{}
this.dailyFirewallRuleGroupMap = map[string]int64{}
this.serverCityCountMap = map[string]int16{}
this.serverSystemCountMap = map[string]int16{}
this.serverBrowserCountMap = map[string]int16{}
this.locker.Unlock()
// 上传限制
var maxCities int16 = 32
var maxProviders int16 = 32
var maxSystems int16 = 64
var maxBrowsers int16 = 64
nodeConfig, _ := nodeconfigs.SharedNodeConfig()
if nodeConfig != nil {
var serverConfig = nodeConfig.GlobalServerConfig // 复制是为了防止在中途修改
if serverConfig != nil {
var uploadConfig = serverConfig.Stat.Upload
if uploadConfig.MaxCities > 0 {
maxCities = uploadConfig.MaxCities
}
if uploadConfig.MaxProviders > 0 {
maxProviders = uploadConfig.MaxProviders
}
if uploadConfig.MaxSystems > 0 {
maxSystems = uploadConfig.MaxSystems
}
if uploadConfig.MaxBrowsers > 0 {
maxBrowsers = uploadConfig.MaxBrowsers
}
}
}
var pbCities = []*pb.UploadServerHTTPRequestStatRequest_RegionCity{}
var pbProviders = []*pb.UploadServerHTTPRequestStatRequest_RegionProvider{}
var pbSystems = []*pb.UploadServerHTTPRequestStatRequest_System{}
var pbBrowsers = []*pb.UploadServerHTTPRequestStatRequest_Browser{}
for k, stat := range this.cityMap {
// 城市
for k, stat := range cityMap {
var pieces = strings.SplitN(k, "@", 4)
var serverId = types.Int64(pieces[0])
pbCities = append(pbCities, &pb.UploadServerHTTPRequestStatRequest_RegionCity{
ServerId: types.Int64(pieces[0]),
CountryName: pieces[1],
ProvinceName: pieces[2],
CityName: pieces[3],
ServerId: serverId,
CountryId: types.Int64(pieces[1]),
ProvinceId: types.Int64(pieces[2]),
CityId: types.Int64(pieces[3]),
CountRequests: stat.CountRequests,
CountAttackRequests: stat.CountAttackRequests,
Bytes: stat.Bytes,
AttackBytes: stat.AttackBytes,
})
}
for k, count := range this.providerMap {
if len(cityMap) > int(maxCities) {
var newPBCities = []*pb.UploadServerHTTPRequestStatRequest_RegionCity{}
sort.Slice(pbCities, func(i, j int) bool {
return pbCities[i].CountRequests > pbCities[j].CountRequests
})
var serverCountMap = map[int64]int16{}
for _, city := range pbCities {
serverCountMap[city.ServerId]++
if serverCountMap[city.ServerId] > maxCities {
continue
}
newPBCities = append(newPBCities, city)
}
if len(pbCities) != len(newPBCities) {
pbCities = newPBCities
}
}
// 运营商
for k, count := range providerMap {
var pieces = strings.SplitN(k, "@", 2)
var serverId = types.Int64(pieces[0])
pbProviders = append(pbProviders, &pb.UploadServerHTTPRequestStatRequest_RegionProvider{
ServerId: types.Int64(pieces[0]),
Name: pieces[1],
Count: count,
ServerId: serverId,
ProviderId: types.Int64(pieces[1]),
Count: count,
})
}
for k, count := range this.systemMap {
if len(providerMap) > int(maxProviders) {
var newPBProviders = []*pb.UploadServerHTTPRequestStatRequest_RegionProvider{}
sort.Slice(pbProviders, func(i, j int) bool {
return pbProviders[i].Count > pbProviders[j].Count
})
var serverCountMap = map[int64]int16{}
for _, provider := range pbProviders {
serverCountMap[provider.ServerId]++
if serverCountMap[provider.ServerId] > maxProviders {
continue
}
newPBProviders = append(newPBProviders, provider)
}
if len(pbProviders) != len(newPBProviders) {
pbProviders = newPBProviders
}
}
// 操作系统
for k, count := range systemMap {
var pieces = strings.SplitN(k, "@", 3)
var serverId = types.Int64(pieces[0])
pbSystems = append(pbSystems, &pb.UploadServerHTTPRequestStatRequest_System{
ServerId: types.Int64(pieces[0]),
ServerId: serverId,
Name: pieces[1],
Version: pieces[2],
Count: count,
})
}
for k, count := range this.browserMap {
if len(systemMap) > int(maxSystems) {
var newPBSystems = []*pb.UploadServerHTTPRequestStatRequest_System{}
sort.Slice(pbSystems, func(i, j int) bool {
return pbSystems[i].Count > pbSystems[j].Count
})
var serverCountMap = map[int64]int16{}
for _, system := range pbSystems {
serverCountMap[system.ServerId]++
if serverCountMap[system.ServerId] > maxSystems {
continue
}
newPBSystems = append(newPBSystems, system)
}
if len(pbSystems) != len(newPBSystems) {
pbSystems = newPBSystems
}
}
// 浏览器
for k, count := range browserMap {
var pieces = strings.SplitN(k, "@", 3)
var serverId = types.Int64(pieces[0])
pbBrowsers = append(pbBrowsers, &pb.UploadServerHTTPRequestStatRequest_Browser{
ServerId: types.Int64(pieces[0]),
ServerId: serverId,
Name: pieces[1],
Version: pieces[2],
Count: count,
})
}
if len(browserMap) > int(maxBrowsers) {
var newPBBrowsers = []*pb.UploadServerHTTPRequestStatRequest_Browser{}
sort.Slice(pbBrowsers, func(i, j int) bool {
return pbBrowsers[i].Count > pbBrowsers[j].Count
})
var serverCountMap = map[int64]int16{}
for _, browser := range pbBrowsers {
serverCountMap[browser.ServerId]++
if serverCountMap[browser.ServerId] > maxBrowsers {
continue
}
newPBBrowsers = append(newPBBrowsers, browser)
}
if len(pbBrowsers) != len(newPBBrowsers) {
pbBrowsers = newPBBrowsers
}
}
// 防火墙相关
var pbFirewallRuleGroups = []*pb.UploadServerHTTPRequestStatRequest_HTTPFirewallRuleGroup{}
for k, count := range this.dailyFirewallRuleGroupMap {
for k, count := range dailyFirewallRuleGroupMap {
var pieces = strings.SplitN(k, "@", 3)
pbFirewallRuleGroups = append(pbFirewallRuleGroups, &pb.UploadServerHTTPRequestStatRequest_HTTPFirewallRuleGroup{
ServerId: types.Int64(pieces[0]),
@@ -321,13 +485,14 @@ func (this *HTTPRequestStatManager) Upload() error {
})
}
// 重置数据
// 这里需要放到上传数据之前,防止因上传失败而导致统计数据堆积
this.cityMap = map[string]*StatItem{}
this.providerMap = map[string]int64{}
this.systemMap = map[string]int64{}
this.browserMap = map[string]int64{}
this.dailyFirewallRuleGroupMap = map[string]int64{}
// 检查是否有数据
if len(pbCities) == 0 &&
len(pbProviders) == 0 &&
len(pbSystems) == 0 &&
len(pbBrowsers) == 0 &&
len(pbFirewallRuleGroups) == 0 {
return nil
}
// 上传数据
_, err = rpcClient.ServerRPC.UploadServerHTTPRequestStat(rpcClient.Context(), &pb.UploadServerHTTPRequestStatRequest{

View File

@@ -3,16 +3,16 @@
package agents
import (
"database/sql"
"errors"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/types"
_ "github.com/mattn/go-sqlite3"
"log"
"os"
"path/filepath"
"strings"
)
const (
@@ -20,11 +20,11 @@ const (
)
type DB struct {
db *sql.DB
db *dbs.DB
path string
insertAgentIPStmt *sql.Stmt
listAgentIPsStmt *sql.Stmt
insertAgentIPStmt *dbs.Stmt
listAgentIPsStmt *dbs.Stmt
}
func NewDB(path string) *DB {
@@ -51,7 +51,7 @@ func (this *DB) Init() error {
}
// TODO 思考 data.db 的数据安全性
db, err := sql.Open("sqlite3", "file:"+this.path+"?cache=shared&mode=rwc&_journal_mode=WAL")
db, err := dbs.OpenWriter("file:" + this.path + "?cache=shared&mode=rwc&_journal_mode=WAL&_locking_mode=EXCLUSIVE")
if err != nil {
return err
}
@@ -94,9 +94,13 @@ func (this *DB) InsertAgentIP(ipId int64, ip string, agentCode string) error {
return errors.New("db should not be nil")
}
this.log("InsertAgentIP", "id:", ipId, "ip:", ip, "agent:", agentCode)
_, err := this.insertAgentIPStmt.Exec(ipId, ip, agentCode)
if err != nil {
// 不提示ID重复错误
if strings.Contains(err.Error(), "UNIQUE constraint") {
return nil
}
return err
}
@@ -130,7 +134,7 @@ func (this *DB) Close() error {
return nil
}
for _, stmt := range []*sql.Stmt{
for _, stmt := range []*dbs.Stmt{
this.insertAgentIPStmt,
this.listAgentIPsStmt,
} {

View File

@@ -4,6 +4,7 @@ package agents
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
@@ -16,6 +17,10 @@ import (
var SharedManager = NewManager()
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventLoaded, func() {
goman.New(func() {
SharedManager.Start()

View File

@@ -4,6 +4,7 @@ package agents
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
@@ -13,6 +14,10 @@ import (
)
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventLoaded, func() {
goman.New(func() {
SharedQueue.Start()

View File

@@ -6,6 +6,7 @@ import (
"encoding/binary"
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
@@ -21,6 +22,10 @@ var hasSynced = false
var sharedClockManager = NewClockManager()
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventLoaded, func() {
goman.New(sharedClockManager.Start)
})

View File

@@ -7,14 +7,56 @@ import (
"database/sql"
"fmt"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fileutils"
_ "github.com/mattn/go-sqlite3"
"strings"
)
type DB struct {
rawDB *sql.DB
locker *fileutils.Locker
rawDB *sql.DB
enableStat bool
}
func OpenWriter(dsn string) (*DB, error) {
return open(dsn, true)
}
func OpenReader(dsn string) (*DB, error) {
return open(dsn, false)
}
func open(dsn string, lock bool) (*DB, error) {
// locker
var locker *fileutils.Locker
if lock {
var path = dsn
var queryIndex = strings.Index(dsn, "?")
if queryIndex >= 0 {
path = path[:queryIndex]
}
path = strings.TrimSpace(strings.TrimPrefix(path, "file:"))
locker = fileutils.NewLocker(path)
err := locker.Lock()
if err != nil {
remotelogs.Warn("DB", "lock '"+path+"' failed: "+err.Error())
locker = nil
}
}
// open
rawDB, err := sql.Open("sqlite3", dsn)
if err != nil {
return nil, err
}
var db = NewDB(rawDB)
db.locker = locker
return db, nil
}
func NewDB(rawDB *sql.DB) *DB {
var db = &DB{
rawDB: rawDB,
@@ -30,6 +72,10 @@ func NewDB(rawDB *sql.DB) *DB {
return db
}
func (this *DB) SetMaxOpenConns(n int) {
this.rawDB.SetMaxOpenConns(n)
}
func (this *DB) EnableStat(b bool) {
this.enableStat = b
}
@@ -81,6 +127,13 @@ func (this *DB) QueryRow(query string, args ...interface{}) *sql.Row {
func (this *DB) Close() error {
events.Remove(fmt.Sprintf("db_%p", this))
defer func() {
if this.locker != nil {
_ = this.locker.Release()
}
}()
return this.rawDB.Close()
}

View File

@@ -15,6 +15,10 @@ import (
)
func init() {
if !teaconst.IsMain {
return
}
var ticker = time.NewTicker(5 * time.Second)
events.On(events.EventLoaded, func() {

View File

@@ -10,6 +10,7 @@ import (
"encoding/json"
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/maps"
@@ -22,6 +23,10 @@ var (
)
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventReload, func() {
nodeConfig, _ := nodeconfigs.SharedNodeConfig()
if nodeConfig != nil {

View File

@@ -0,0 +1,82 @@
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package fileutils
import (
"os"
"syscall"
"time"
)
type Locker struct {
path string
fp *os.File
}
func NewLocker(path string) *Locker {
return &Locker{
path: path + ".lock",
}
}
func (this *Locker) TryLock() (ok bool, err error) {
if this.fp == nil {
fp, err := os.OpenFile(this.path, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return false, err
}
this.fp = fp
}
return this.tryLock()
}
func (this *Locker) Lock() error {
if this.fp == nil {
fp, err := os.OpenFile(this.path, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return err
}
this.fp = fp
}
for {
b, err := this.tryLock()
if err != nil {
_ = this.fp.Close()
return err
}
if b {
return nil
}
time.Sleep(100 * time.Millisecond)
}
}
func (this *Locker) Release() error {
err := this.fp.Close()
if err != nil {
return err
}
this.fp = nil
return nil
}
func (this *Locker) tryLock() (ok bool, err error) {
err = syscall.Flock(int(this.fp.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err == nil {
return true, nil
}
errno, isErrNo := err.(syscall.Errno)
if !isErrNo {
return
}
if !errno.Temporary() {
return
}
err = nil // 不提示错误
return
}

View File

@@ -0,0 +1,24 @@
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package fileutils_test
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/fileutils"
"testing"
)
func TestLocker_Lock(t *testing.T) {
var path = "/tmp/file-test"
var locker = fileutils.NewLocker(path)
err := locker.Lock()
if err != nil {
t.Fatal(err)
}
_ = locker.Release()
var locker2 = fileutils.NewLocker(path)
err = locker2.Lock()
if err != nil {
t.Fatal(err)
}
}

View File

@@ -15,6 +15,10 @@ import (
var SharedFreeHoursManager = NewFreeHoursManager()
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventLoaded, func() {
goman.New(func() {
SharedFreeHoursManager.Start()

View File

@@ -3,6 +3,7 @@ package utils_test
import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/assert"
"github.com/iwind/TeaGo/types"
"strings"
"testing"
)
@@ -65,3 +66,15 @@ func TestContainsSameStrings(t *testing.T) {
a.IsTrue(utils.EqualStrings([]string{"a", "b"}, []string{"a", "b"}))
a.IsTrue(utils.EqualStrings([]string{"a", "b"}, []string{"b", "a"}))
}
func TestToValidUTF8string(t *testing.T) {
for _, s := range []string{
"https://goedge.cn/",
"提升mysql数据表写入速度",
"😆",
string([]byte{'a', 'b', 130, 131, 132, 133, 134, 'c'}),
} {
var u = utils.ToValidUTF8string(s)
t.Log(s, "["+types.String(len(s))+"]", "=>", u, "["+types.String(len(u))+"]")
}
}

View File

@@ -3,12 +3,17 @@
package utils
import (
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/shirou/gopsutil/v3/mem"
)
var systemTotalMemory = -1
func init() {
if !teaconst.IsMain {
return
}
_ = SystemMemoryGB()
}
@@ -23,5 +28,11 @@ func SystemMemoryGB() int {
}
systemTotalMemory = int(stat.Total / 1024 / 1024 / 1024)
if systemTotalMemory <= 0 {
systemTotalMemory = 1
}
setMaxMemory(systemTotalMemory)
return systemTotalMemory
}

View File

@@ -0,0 +1,23 @@
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
//go:build go1.19
package utils
import (
"runtime/debug"
)
// 设置软内存最大值
func setMaxMemory(memoryGB int) {
if memoryGB <= 0 {
memoryGB = 1
}
var maxMemoryBytes int64 = 0
if memoryGB > 10 {
maxMemoryBytes = int64(memoryGB-2) << 30 // 超过10G内存的允许剩余2G内存
} else {
maxMemoryBytes = (int64(memoryGB) << 30) * 80 / 100 // 默认 80%
}
debug.SetMemoryLimit(maxMemoryBytes)
}

View File

@@ -0,0 +1,9 @@
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
//go:build !go1.19
package utils
// 设置软内存最大值
func setMaxMemory(memoryGB int) {
}

View File

@@ -1,6 +1,7 @@
package utils
import (
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/iwind/TeaGo/types"
"time"
@@ -11,6 +12,10 @@ var unixTimeMilli = time.Now().UnixMilli()
var unixTimeMilliString = types.String(unixTimeMilli)
func init() {
if !teaconst.IsMain {
return
}
var ticker = time.NewTicker(200 * time.Millisecond)
goman.New(func() {
for range ticker.C {

View File

@@ -2,10 +2,6 @@
package waf
import (
"net/http"
)
type BaseAction struct {
currentActionId int64
}
@@ -19,16 +15,3 @@ func (this *BaseAction) ActionId() int64 {
func (this *BaseAction) SetActionId(actionId int64) {
this.currentActionId = actionId
}
// CloseConn 关闭连接
func (this *BaseAction) CloseConn(writer http.ResponseWriter) error {
// 断开连接
hijack, ok := writer.(http.Hijacker)
if ok {
conn, _, err := hijack.Hijack()
if err == nil && conn != nil {
return conn.Close()
}
}
return nil
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/waf/requests"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/rands"
"io"
"net/http"
"os"
@@ -25,6 +26,7 @@ type BlockAction struct {
Body string `yaml:"body" json:"body"` // supports HTML
URL string `yaml:"url" json:"url"`
Timeout int32 `yaml:"timeout" json:"timeout"`
TimeoutMax int32 `yaml:"timeoutMax" json:"timeoutMax"`
Scope string `yaml:"scope" json:"scope"`
}
@@ -41,6 +43,7 @@ func (this *BlockAction) Init(waf *WAF) error {
}
if this.Timeout <= 0 {
this.Timeout = waf.DefaultBlockAction.Timeout
this.TimeoutMax = waf.DefaultBlockAction.TimeoutMax // 只有没有填写封锁时长的时候才会使用默认的封锁时长最大值
}
}
return nil
@@ -65,6 +68,12 @@ func (this *BlockAction) Perform(waf *WAF, group *RuleGroup, set *RuleSet, reque
timeout = 300 // 默认封锁300秒
}
// 随机时长
var timeoutMax = this.TimeoutMax
if timeoutMax > timeout {
timeout = timeout + int32(rands.Int64()%int64(timeoutMax-timeout+1))
}
SharedIPBlackList.RecordIP(IPTypeAll, this.Scope, request.WAFServerId(), request.WAFRemoteIP(), time.Now().Unix()+int64(timeout), waf.Id, waf.UseLocalFirewall, group.Id, set.Id, "")
if writer != nil {

View File

@@ -69,9 +69,10 @@ func (this *Get302Action) Perform(waf *WAF, group *RuleGroup, set *RuleSet, requ
http.Redirect(writer, request.WAFRaw(), Get302Path+"?info="+url.QueryEscape(info), http.StatusFound)
if request.WAFRaw().ProtoMajor == 1 {
_ = this.CloseConn(writer)
flusher, ok := writer.(http.Flusher)
if ok {
flusher.Flush()
}
return false, false
}

View File

@@ -4,6 +4,7 @@ package waf
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
@@ -25,6 +26,10 @@ type notifyTask struct {
var notifyChan = make(chan *notifyTask, 128)
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventLoaded, func() {
goman.New(func() {
rpcClient, err := rpc.SharedRPC()

View File

@@ -6,6 +6,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/waf/requests"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/types"
"io"
"net/http"
"time"
)
@@ -55,7 +56,7 @@ func (this *Post307Action) Perform(waf *WAF, group *RuleGroup, set *RuleSet, req
if life <= 0 {
life = 600 // 默认10分钟
}
var setId = m.GetString("setId")
var setId = types.String(m.GetInt64("setId"))
SharedIPWhiteList.RecordIP("set:"+setId, this.Scope, request.WAFServerId(), request.WAFRemoteIP(), time.Now().Unix()+life, m.GetInt64("policyId"), false, m.GetInt64("groupId"), m.GetInt64("setId"), "")
return true, false
}
@@ -72,10 +73,17 @@ func (this *Post307Action) Perform(waf *WAF, group *RuleGroup, set *RuleSet, req
}
info, err := utils.SimpleEncryptMap(m)
if err != nil {
remotelogs.Error("WAF_POST_302_ACTION", "encode info failed: "+err.Error())
remotelogs.Error("WAF_POST_307_ACTION", "encode info failed: "+err.Error())
return true, false
}
// 清空请求内容
var req = request.WAFRaw()
if req.ContentLength > 0 && req.Body != nil {
_, _ = io.Copy(io.Discard, req.Body)
_ = req.Body.Close()
}
// 设置Cookie
http.SetCookie(writer, &http.Cookie{
Name: cookieName,
@@ -86,8 +94,9 @@ func (this *Post307Action) Perform(waf *WAF, group *RuleGroup, set *RuleSet, req
http.Redirect(writer, request.WAFRaw(), request.WAFRaw().URL.String(), http.StatusTemporaryRedirect)
if request.WAFRaw().ProtoMajor == 1 {
_ = this.CloseConn(writer)
flusher, ok := writer.(http.Flusher)
if ok {
flusher.Flush()
}
return false, false

View File

@@ -33,6 +33,10 @@ type recordIPTask struct {
var recordIPTaskChan = make(chan *recordIPTask, 1024)
func init() {
if !teaconst.IsMain {
return
}
events.On(events.EventLoaded, func() {
goman.New(func() {
rpcClient, err := rpc.SharedRPC()

View File

@@ -162,7 +162,7 @@ func (this *CaptchaValidator) show(actionConfig *CaptchaAction, req requests.Req
</div>
<div class="ui-input">
<p>` + msgPrompt + `</p>
<input type="text" name="GOEDGE_WAF_CAPTCHA_CODE" id="GOEDGE_WAF_CAPTCHA_CODE" maxlength="6" autocomplete="off" z-index="1" class="input"/>
<input type="text" name="GOEDGE_WAF_CAPTCHA_CODE" id="GOEDGE_WAF_CAPTCHA_CODE" size="` + types.String(countLetters*17/10) + `" maxlength="` + types.String(countLetters) + `" autocomplete="off" z-index="1" class="input"/>
</div>
<div class="ui-button">
<button type="submit" style="line-height:24px;margin-top:10px">` + msgButtonTitle + `</button>
@@ -199,7 +199,7 @@ func (this *CaptchaValidator) show(actionConfig *CaptchaAction, req requests.Req
</script>
<style type="text/css">
form { width: 20em; margin: 0 auto; text-align: center; }
.input { font-size:16px;line-height:24px; letter-spacing: 15px; padding-left: 10px; width: 140px; }
.input { font-size:16px;line-height:24px; letter-spacing:0.2em; min-width: 5em; text-align: center; }
address { margin-top: 1em; padding-top: 0.5em; border-top: 1px #ccc solid; text-align: center; }
` + msgCss + `
</style>

View File

@@ -3,6 +3,7 @@
package checkpoints
import (
"fmt"
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
"github.com/TeaOSLab/EdgeNode/internal/waf/requests"
"github.com/TeaOSLab/EdgeNode/internal/zero"
@@ -35,7 +36,11 @@ type CC2Checkpoint struct {
func (this *CC2Checkpoint) RequestValue(req requests.Request, param string, options maps.Map, ruleId int64) (value interface{}, hasRequestBody bool, sysErr error, userErr error) {
var keys = options.GetSlice("keys")
var keyValues = []string{}
var hasRemoteAddr = false
for _, key := range keys {
if key == "${remoteAddr}" || key == "${rawRemoteAddr}" {
hasRemoteAddr = true
}
keyValues = append(keyValues, req.Format(types.String(key)))
}
if len(keyValues) == 0 {
@@ -66,8 +71,33 @@ func (this *CC2Checkpoint) RequestValue(req requests.Request, param string, opti
}
}
var expiresAt = time.Now().Unix() + period
var ccKey = "WAF-CC-" + types.String(ruleId) + "-" + strings.Join(keyValues, "@")
value = ccCache.IncreaseInt64(ccKey, 1, time.Now().Unix()+period, false)
value = ccCache.IncreaseInt64(ccKey, 1, expiresAt, false)
// 基于指纹统计
var enableFingerprint = true
if options.Has("enableFingerprint") && !options.GetBool("enableFingerprint") {
enableFingerprint = false
}
if hasRemoteAddr && enableFingerprint {
var fingerprint = req.WAFFingerprint()
if len(fingerprint) > 0 {
var fpKeyValues = []string{}
for _, key := range keys {
if key == "${remoteAddr}" || key == "${rawRemoteAddr}" {
fpKeyValues = append(fpKeyValues, fmt.Sprintf("%x", fingerprint))
continue
}
fpKeyValues = append(fpKeyValues, req.Format(types.String(key)))
}
var fpCCKey = "WAF-CC-" + types.String(ruleId) + "-" + strings.Join(fpKeyValues, "@")
var fpValue = ccCache.IncreaseInt64(fpCCKey, 1, expiresAt, false)
if fpValue > value.(int64) {
value = fpValue
}
}
}
return
}

View File

@@ -5,6 +5,7 @@ package waf
import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/waf/requests"
"github.com/iwind/TeaGo/types"
"net/http"
"time"
)
@@ -39,11 +40,11 @@ func (this *Get302Validator) Run(request requests.Request, writer http.ResponseW
}
// 加入白名单
life := m.GetInt64("life")
var life = m.GetInt64("life")
if life <= 0 {
life = 600 // 默认10分钟
}
setId := m.GetString("setId")
var setId = types.String(m.GetInt64("setId"))
SharedIPWhiteList.RecordIP("set:"+setId, m.GetString("scope"), request.WAFServerId(), request.WAFRemoteIP(), time.Now().Unix()+life, m.GetInt64("policyId"), false, m.GetInt64("groupId"), m.GetInt64("setId"), "")
// 返回原始URL

View File

@@ -32,6 +32,9 @@ type Request interface {
// WAFOnAction 动作回调
WAFOnAction(action interface{}) (goNext bool)
// WAFFingerprint 读取连接指纹
WAFFingerprint() []byte
// Format 格式化变量
Format(string) string
}

View File

@@ -180,6 +180,7 @@ func (this *WAFManager) ConvertWAF(policy *firewallconfigs.HTTPFirewallPolicy) (
Body: policy.BlockOptions.Body,
URL: policy.BlockOptions.URL,
Timeout: policy.BlockOptions.Timeout,
TimeoutMax: policy.BlockOptions.TimeoutMax,
}
}